dianfu commented on code in PR #19453:
URL: https://github.com/apache/flink/pull/19453#discussion_r852939230


##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java:
##########
@@ -190,7 +194,8 @@ public BeamPythonFunctionRunner(
             MemoryManager memoryManager,
             double managedMemoryFraction,
             FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor,
-            FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) {
+            FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor,
+            @Nullable Map<String, FlinkFnApi.CoderInfoDescriptor> 
sideOutputCoderDescriptors) {

Review Comment:
   Remove the @Nullable annotation and use Preconditions.checkNotNull to make 
sure that given value is not null.



##########
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java:
##########
@@ -102,85 +106,144 @@ public BeamDataStreamPythonFunctionRunner(
 
     @Override
     protected void buildTransforms(RunnerApi.Components.Builder 
componentsBuilder) {
-        for (int i = 0; i < userDefinedDataStreamFunctions.size() + 1; i++) {
-            String functionUrn;
-            if (i == 0) {
-                functionUrn = headOperatorFunctionUrn;
-            } else {
-                functionUrn = STATELESS_FUNCTION_URN;
-            }
+        for (int i = 0; i < userDefinedDataStreamFunctions.size(); i++) {
 
-            FlinkFnApi.UserDefinedDataStreamFunction functionProto;
-            if (i < userDefinedDataStreamFunctions.size()) {
-                functionProto = userDefinedDataStreamFunctions.get(i);
-            } else {
-                // the last function in the operation tree is used to prune 
the watermark column
-                functionProto = createReviseOutputDataStreamFunctionProto();
-            }
+            final Map<String, String> outputCollectionMap = new HashMap<>();
 
-            // Use ParDoPayload as a wrapper of the actual payload as timer is 
only supported in
-            // ParDo
-            final RunnerApi.ParDoPayload.Builder payloadBuilder =
-                    RunnerApi.ParDoPayload.newBuilder()
-                            .setDoFn(
-                                    RunnerApi.FunctionSpec.newBuilder()
-                                            .setUrn(functionUrn)
-                                            .setPayload(
-                                                    
org.apache.beam.vendor.grpc.v1p26p0.com.google
-                                                            
.protobuf.ByteString.copyFrom(
-                                                            
functionProto.toByteArray()))
-                                            .build());
-
-            // Timer is only available in the head operator
-            if (i == 0 && timerCoderDescriptor != null) {
-                payloadBuilder.putTimerFamilySpecs(
-                        TIMER_ID,
-                        RunnerApi.TimerFamilySpec.newBuilder()
-                                // this field is not used, always set it as 
event time
-                                
.setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
-                                .setTimerFamilyCoderId(WRAPPER_TIMER_CODER_ID)
-                                .build());
+            // Prepare side outputs
+            if (i == userDefinedDataStreamFunctions.size() - 1) {

Review Comment:
   Does it make sense to move this out of the for loop?



##########
flink-python/pyflink/datastream/tests/test_window.py:
##########
@@ -346,6 +349,40 @@ def test_session_window_late_merge(self):
         expected = ['(hi,3)']
         self.assert_equals_sorted(expected, results)
 
+    def test_side_output_late_data(self):
+        self.env.set_parallelism(1)
+        config = Configuration(
+            
j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment)
+        )
+        config.set_integer('python.fn-execution.bundle.size', 1)
+        jvm = get_gateway().jvm
+        watermark_strategy = WatermarkStrategy(
+            
jvm.org.apache.flink.api.common.eventtime.WatermarkStrategy.forGenerator(
+                jvm.org.apache.flink.streaming.api.functions.python.eventtime.
+                PerElementWatermarkGenerator.getSupplier()
+            )
+        ).with_timestamp_assigner(SecondColumnTimestampAssigner())
+
+        tag = OutputTag('late-data', type_info=Types.ROW([Types.STRING(), 
Types.INT()]))
+        ds1 = self.env.from_collection([('a', 0), ('a', 8), ('a', 4), ('a', 
6)],
+                                       type_info=Types.ROW([Types.STRING(), 
Types.INT()]))
+        ds2 = ds1.assign_timestamps_and_watermarks(watermark_strategy) \
+            .key_by(lambda e: e[0]) \
+            .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
+            .allowed_lateness(0) \
+            .side_output_late_data(tag) \
+            .process(CountWindowProcessFunction(), 
Types.TUPLE([Types.STRING(), Types.INT()]))

Review Comment:
   There is some changes for CountWindowProcessFunction in the latest master. 
Need to rebase the PR and change this a bit.



##########
flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java:
##########
@@ -98,12 +100,15 @@ public class PythonOperatorChainingOptimizer {
      */
     @SuppressWarnings("unchecked")
     public static void apply(StreamExecutionEnvironment env) throws Exception {
+        final Field transformationsField =
+                
StreamExecutionEnvironment.class.getDeclaredField("transformations");
+        transformationsField.setAccessible(true);
+        final List<Transformation<?>> transformations =
+                (List<Transformation<?>>) transformationsField.get(env);
+
+        preprocessSideOutput(transformations);

Review Comment:
   What about moving this to PythonConfigUtils?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to