This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new c43b3d23bde [FLINK-31337][python] Fix python keyed broadcast batch
operator input not sorted
c43b3d23bde is described below
commit c43b3d23bde1047e675793bf3e64cfe5c514f088
Author: Juntao Hu <[email protected]>
AuthorDate: Mon Mar 6 20:48:56 2023 +0800
[FLINK-31337][python] Fix python keyed broadcast batch operator input not
sorted
This closes #22107.
---
...PythonBatchKeyedCoBroadcastProcessOperator.java | 13 +---------
...PythonBatchKeyedCoBroadcastProcessOperator.java | 14 +----------
...eyedBroadcastStateTransformationTranslator.java | 29 +++++++++++++++-------
.../runtime/translators/BatchExecutionUtils.java | 4 +--
4 files changed, 24 insertions(+), 36 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java
index a5335780756..232f97a5e95 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
-import org.apache.flink.streaming.api.operators.InputSelectable;
-import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
@@ -38,7 +36,7 @@ import org.apache.flink.util.Preconditions;
@Internal
public class EmbeddedPythonBatchKeyedCoBroadcastProcessOperator<K, IN1, IN2,
OUT>
extends EmbeddedPythonKeyedCoProcessOperator<K, IN1, IN2, OUT>
- implements BoundedMultiInput, InputSelectable {
+ implements BoundedMultiInput {
private static final long serialVersionUID = 1L;
@@ -60,15 +58,6 @@ public class
EmbeddedPythonBatchKeyedCoBroadcastProcessOperator<K, IN1, IN2, OUT
}
}
- @Override
- public InputSelection nextSelection() {
- if (!isBroadcastSideDone) {
- return InputSelection.SECOND;
- } else {
- return InputSelection.FIRST;
- }
- }
-
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
Preconditions.checkState(
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java
index 9184265daec..c75d1662529 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
-import org.apache.flink.streaming.api.operators.InputSelectable;
-import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
@@ -38,8 +36,7 @@ import org.apache.flink.util.Preconditions;
*/
@Internal
public class ExternalPythonBatchKeyedCoBroadcastProcessOperator<OUT>
- extends ExternalPythonKeyedCoProcessOperator<OUT>
- implements BoundedMultiInput, InputSelectable {
+ extends ExternalPythonKeyedCoProcessOperator<OUT> implements
BoundedMultiInput {
private static final long serialVersionUID = 1L;
@@ -61,15 +58,6 @@ public class
ExternalPythonBatchKeyedCoBroadcastProcessOperator<OUT>
}
}
- @Override
- public InputSelection nextSelection() {
- if (!isBroadcastSideDone) {
- return InputSelection.SECOND;
- } else {
- return InputSelection.FIRST;
- }
- }
-
@Override
public void processElement1(StreamRecord<Row> element) throws Exception {
Preconditions.checkState(
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java
b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java
index 9fac56d246f..a17373add23 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.translators.python;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import
org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonBatchKeyedCoBroadcastProcessOperator;
@@ -29,6 +30,7 @@ import
org.apache.flink.streaming.api.operators.python.process.ExternalPythonKey
import
org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation;
import
org.apache.flink.streaming.api.transformations.python.PythonKeyedBroadcastStateTransformation;
import
org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator;
+import org.apache.flink.streaming.runtime.translators.BatchExecutionUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
@@ -76,15 +78,24 @@ public class
PythonKeyedBroadcastStateTransformationTranslator<OUT>
DelegateOperatorTransformation.configureOperator(transformation,
operator);
- return translateInternal(
- transformation,
- transformation.getRegularInput(),
- transformation.getBroadcastInput(),
- SimpleOperatorFactory.of(operator),
- transformation.getStateKeyType(),
- transformation.getKeySelector(),
- null,
- context);
+ Collection<Integer> result =
+ translateInternal(
+ transformation,
+ transformation.getRegularInput(),
+ transformation.getBroadcastInput(),
+ SimpleOperatorFactory.of(operator),
+ transformation.getStateKeyType(),
+ transformation.getKeySelector(),
+ null,
+ context);
+
+ BatchExecutionUtils.applyBatchExecutionSettings(
+ transformation.getId(),
+ context,
+ StreamConfig.InputRequirement.SORTED,
+ StreamConfig.InputRequirement.PASS_THROUGH);
+
+ return result;
}
@Override
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
index 0a922ae3088..0eaa0b65310 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
@@ -39,10 +39,10 @@ import java.util.Map;
import static org.apache.flink.util.Preconditions.checkState;
/** A utility class for applying sorting inputs. */
-class BatchExecutionUtils {
+public class BatchExecutionUtils {
private static final Logger LOG =
LoggerFactory.getLogger(BatchExecutionUtils.class);
- static void applyBatchExecutionSettings(
+ public static void applyBatchExecutionSettings(
int transformationId,
TransformationTranslator.Context context,
StreamConfig.InputRequirement... inputRequirements) {