This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a55ae1c6b42 [FLINK-30808][runtime] Properly handle
ForwardForConsecutiveHashPartitioner when creating edges in StreamGraph
a55ae1c6b42 is described below
commit a55ae1c6b426acdf499fc4df766fc43daa2dcce4
Author: JunRuiLee <[email protected]>
AuthorDate: Sat Jan 28 12:11:21 2023 +0800
[FLINK-30808][runtime] Properly handle ForwardForConsecutiveHashPartitioner
when creating edges in StreamGraph
This closes #21769.
---
.../flink/streaming/api/graph/StreamGraph.java | 29 ++++++++++++++--------
1 file changed, 18 insertions(+), 11 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 636c4893178..5b85b9e7773 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -51,6 +51,7 @@ import
org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import
org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import
org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -697,17 +698,23 @@ public class StreamGraph implements Pipeline {
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() !=
downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException(
- "Forward partitioning does not allow "
- + "change of parallelism. Upstream operation: "
- + upstreamNode
- + " parallelism: "
- + upstreamNode.getParallelism()
- + ", downstream operation: "
- + downstreamNode
- + " parallelism: "
- + downstreamNode.getParallelism()
- + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
+ if (partitioner instanceof
ForwardForConsecutiveHashPartitioner) {
+ partitioner =
+ ((ForwardForConsecutiveHashPartitioner<?>)
partitioner)
+ .getHashPartitioner();
+ } else {
+ throw new UnsupportedOperationException(
+ "Forward partitioning does not allow "
+ + "change of parallelism. Upstream
operation: "
+ + upstreamNode
+ + " parallelism: "
+ + upstreamNode.getParallelism()
+ + ", downstream operation: "
+ + downstreamNode
+ + " parallelism: "
+ + downstreamNode.getParallelism()
+ + " You must use another partitioning
strategy, such as broadcast, rebalance, shuffle or global.");
+ }
}
}