This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a55ae1c6b42 [FLINK-30808][runtime] Properly handle 
ForwardForConsecutiveHashPartitioner when creating edges in StreamGraph
a55ae1c6b42 is described below

commit a55ae1c6b426acdf499fc4df766fc43daa2dcce4
Author: JunRuiLee <[email protected]>
AuthorDate: Sat Jan 28 12:11:21 2023 +0800

    [FLINK-30808][runtime] Properly handle ForwardForConsecutiveHashPartitioner 
when creating edges in StreamGraph
    
    This closes #21769.
---
 .../flink/streaming/api/graph/StreamGraph.java     | 29 ++++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 636c4893178..5b85b9e7773 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import 
org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
 import 
org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -697,17 +698,23 @@ public class StreamGraph implements Pipeline {
 
         if (partitioner instanceof ForwardPartitioner) {
             if (upstreamNode.getParallelism() != 
downstreamNode.getParallelism()) {
-                throw new UnsupportedOperationException(
-                        "Forward partitioning does not allow "
-                                + "change of parallelism. Upstream operation: "
-                                + upstreamNode
-                                + " parallelism: "
-                                + upstreamNode.getParallelism()
-                                + ", downstream operation: "
-                                + downstreamNode
-                                + " parallelism: "
-                                + downstreamNode.getParallelism()
-                                + " You must use another partitioning 
strategy, such as broadcast, rebalance, shuffle or global.");
+                if (partitioner instanceof 
ForwardForConsecutiveHashPartitioner) {
+                    partitioner =
+                            ((ForwardForConsecutiveHashPartitioner<?>) 
partitioner)
+                                    .getHashPartitioner();
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Forward partitioning does not allow "
+                                    + "change of parallelism. Upstream 
operation: "
+                                    + upstreamNode
+                                    + " parallelism: "
+                                    + upstreamNode.getParallelism()
+                                    + ", downstream operation: "
+                                    + downstreamNode
+                                    + " parallelism: "
+                                    + downstreamNode.getParallelism()
+                                    + " You must use another partitioning 
strategy, such as broadcast, rebalance, shuffle or global.");
+                }
             }
         }
 

Reply via email to