afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370832543


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism(
         configuration.set(
                 
PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM,
                 chainingOfOperatorsWithDifferentMaxParallelismEnabled);
-        configuration.set(PipelineOptions.MAX_PARALLELISM, 10);
+        configuration.set(PipelineOptions.MAX_PARALLELISM, 1);
         try (StreamExecutionEnvironment chainEnv =
                 StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration)) {
             chainEnv.fromElements(1)
                     .map(x -> x)
                     // should automatically break chain here
                     .map(x -> x)
-                    .setMaxParallelism(1)
+                    .setMaxParallelism(10)

Review Comment:
   The verifies that the chain gets broken. The legacy source was not enforcing 
max parallelism set to 1, something that we do now by propagating the call to 
super 
(https://github.com/afedulov/flink/blob/cf1a29d47a5bb4fb92e98a36934e525d74bae17b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L208).
 Notice that the   default max parallelism in the config also got changed above 
from 10 to 1. So now we start with the source with max parallelism of 1 and 
break the chain because the second map has parallelism of 10. Previously it was 
doing the same, but in "reverse".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to