Vancior commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r854718202


##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java:
##########
@@ -102,85 +106,144 @@ public BeamDataStreamPythonFunctionRunner(
 
     @Override
     protected void buildTransforms(RunnerApi.Components.Builder 
componentsBuilder) {
-        for (int i = 0; i < userDefinedDataStreamFunctions.size() + 1; i++) {
-            String functionUrn;
-            if (i == 0) {
-                functionUrn = headOperatorFunctionUrn;
-            } else {
-                functionUrn = STATELESS_FUNCTION_URN;
-            }
+        for (int i = 0; i < userDefinedDataStreamFunctions.size(); i++) {
 
-            FlinkFnApi.UserDefinedDataStreamFunction functionProto;
-            if (i < userDefinedDataStreamFunctions.size()) {
-                functionProto = userDefinedDataStreamFunctions.get(i);
-            } else {
-                // the last function in the operation tree is used to prune 
the watermark column
-                functionProto = createReviseOutputDataStreamFunctionProto();
-            }
+            final Map<String, String> outputCollectionMap = new HashMap<>();
 
-            // Use ParDoPayload as a wrapper of the actual payload as timer is 
only supported in
-            // ParDo
-            final RunnerApi.ParDoPayload.Builder payloadBuilder =
-                    RunnerApi.ParDoPayload.newBuilder()
-                            .setDoFn(
-                                    RunnerApi.FunctionSpec.newBuilder()
-                                            .setUrn(functionUrn)
-                                            .setPayload(
-                                                    
org.apache.beam.vendor.grpc.v1p26p0.com.google
-                                                            
.protobuf.ByteString.copyFrom(
-                                                            
functionProto.toByteArray()))
-                                            .build());
-
-            // Timer is only available in the head operator
-            if (i == 0 && timerCoderDescriptor != null) {
-                payloadBuilder.putTimerFamilySpecs(
-                        TIMER_ID,
-                        RunnerApi.TimerFamilySpec.newBuilder()
-                                // this field is not used, always set it as 
event time
-                                
.setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
-                                .setTimerFamilyCoderId(WRAPPER_TIMER_CODER_ID)
-                                .build());
+            // Prepare side outputs
+            if (i == userDefinedDataStreamFunctions.size() - 1) {

Review Comment:
   This add outputs to local variable `outputCollectionMap` in the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to