This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new c43b3d23bde [FLINK-31337][python] Fix python keyed broadcast batch 
operator input not sorted
c43b3d23bde is described below

commit c43b3d23bde1047e675793bf3e64cfe5c514f088
Author: Juntao Hu <[email protected]>
AuthorDate: Mon Mar 6 20:48:56 2023 +0800

    [FLINK-31337][python] Fix python keyed broadcast batch operator input not 
sorted
    
    This closes #22107.
---
 ...PythonBatchKeyedCoBroadcastProcessOperator.java | 13 +---------
 ...PythonBatchKeyedCoBroadcastProcessOperator.java | 14 +----------
 ...eyedBroadcastStateTransformationTranslator.java | 29 +++++++++++++++-------
 .../runtime/translators/BatchExecutionUtils.java   |  4 +--
 4 files changed, 24 insertions(+), 36 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java
index a5335780756..232f97a5e95 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
-import org.apache.flink.streaming.api.operators.InputSelectable;
-import org.apache.flink.streaming.api.operators.InputSelection;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
@@ -38,7 +36,7 @@ import org.apache.flink.util.Preconditions;
 @Internal
 public class EmbeddedPythonBatchKeyedCoBroadcastProcessOperator<K, IN1, IN2, 
OUT>
         extends EmbeddedPythonKeyedCoProcessOperator<K, IN1, IN2, OUT>
-        implements BoundedMultiInput, InputSelectable {
+        implements BoundedMultiInput {
 
     private static final long serialVersionUID = 1L;
 
@@ -60,15 +58,6 @@ public class 
EmbeddedPythonBatchKeyedCoBroadcastProcessOperator<K, IN1, IN2, OUT
         }
     }
 
-    @Override
-    public InputSelection nextSelection() {
-        if (!isBroadcastSideDone) {
-            return InputSelection.SECOND;
-        } else {
-            return InputSelection.FIRST;
-        }
-    }
-
     @Override
     public void processElement1(StreamRecord<IN1> element) throws Exception {
         Preconditions.checkState(
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java
index 9184265daec..c75d1662529 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
-import org.apache.flink.streaming.api.operators.InputSelectable;
-import org.apache.flink.streaming.api.operators.InputSelection;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
@@ -38,8 +36,7 @@ import org.apache.flink.util.Preconditions;
  */
 @Internal
 public class ExternalPythonBatchKeyedCoBroadcastProcessOperator<OUT>
-        extends ExternalPythonKeyedCoProcessOperator<OUT>
-        implements BoundedMultiInput, InputSelectable {
+        extends ExternalPythonKeyedCoProcessOperator<OUT> implements 
BoundedMultiInput {
 
     private static final long serialVersionUID = 1L;
 
@@ -61,15 +58,6 @@ public class 
ExternalPythonBatchKeyedCoBroadcastProcessOperator<OUT>
         }
     }
 
-    @Override
-    public InputSelection nextSelection() {
-        if (!isBroadcastSideDone) {
-            return InputSelection.SECOND;
-        } else {
-            return InputSelection.FIRST;
-        }
-    }
-
     @Override
     public void processElement1(StreamRecord<Row> element) throws Exception {
         Preconditions.checkState(
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java
index 9fac56d246f..a17373add23 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.translators.python;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
 import 
org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonBatchKeyedCoBroadcastProcessOperator;
@@ -29,6 +30,7 @@ import 
org.apache.flink.streaming.api.operators.python.process.ExternalPythonKey
 import 
org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation;
 import 
org.apache.flink.streaming.api.transformations.python.PythonKeyedBroadcastStateTransformation;
 import 
org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator;
+import org.apache.flink.streaming.runtime.translators.BatchExecutionUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
@@ -76,15 +78,24 @@ public class 
PythonKeyedBroadcastStateTransformationTranslator<OUT>
 
         DelegateOperatorTransformation.configureOperator(transformation, 
operator);
 
-        return translateInternal(
-                transformation,
-                transformation.getRegularInput(),
-                transformation.getBroadcastInput(),
-                SimpleOperatorFactory.of(operator),
-                transformation.getStateKeyType(),
-                transformation.getKeySelector(),
-                null,
-                context);
+        Collection<Integer> result =
+                translateInternal(
+                        transformation,
+                        transformation.getRegularInput(),
+                        transformation.getBroadcastInput(),
+                        SimpleOperatorFactory.of(operator),
+                        transformation.getStateKeyType(),
+                        transformation.getKeySelector(),
+                        null,
+                        context);
+
+        BatchExecutionUtils.applyBatchExecutionSettings(
+                transformation.getId(),
+                context,
+                StreamConfig.InputRequirement.SORTED,
+                StreamConfig.InputRequirement.PASS_THROUGH);
+
+        return result;
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
index 0a922ae3088..0eaa0b65310 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
@@ -39,10 +39,10 @@ import java.util.Map;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** A utility class for applying sorting inputs. */
-class BatchExecutionUtils {
+public class BatchExecutionUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(BatchExecutionUtils.class);
 
-    static void applyBatchExecutionSettings(
+    public static void applyBatchExecutionSettings(
             int transformationId,
             TransformationTranslator.Context context,
             StreamConfig.InputRequirement... inputRequirements) {

Reply via email to