aljoscha closed pull request #3616: [FLINK-6188] Correctly handle 
PARALLELISM_DEFAULT in stream operator
URL: https://github.com/apache/flink/pull/3616
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index c18db529d79..ed9eb2cc0c4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -76,13 +76,6 @@
         */
        public static final int PARALLELISM_DEFAULT = -1;
 
-       /**
-        * The flag value indicating an unknown or unset parallelism. This 
value is
-        * not a valid parallelism and indicates that the parallelism should 
remain
-        * unchanged.
-        */
-       public static final int PARALLELISM_UNKNOWN = -2;
-
        /**
         * The default lower bound for max parallelism if nothing was 
configured by the user. We have
         * this to allow users some degree of scale-up in case they forgot to 
configure maximum
@@ -293,7 +286,6 @@ public int getParallelism() {
         * @param parallelism The parallelism to use
         */
        public ExecutionConfig setParallelism(int parallelism) {
-               checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot 
specify UNKNOWN_PARALLELISM.");
                checkArgument(
                                parallelism >= 1 || parallelism == 
PARALLELISM_DEFAULT,
                                "Parallelism must be at least one, or 
ExecutionConfig.PARALLELISM_DEFAULT " +
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 9c5d9f072b8..705e3946aef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -134,9 +134,6 @@ public String getName() {
         * @return The operator with set parallelism.
         */
        public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
-               Preconditions.checkArgument(parallelism > 0,
-                               "The parallelism of an operator must be at 
least 1.");
-
                Preconditions.checkArgument(canBeParallel() || parallelism == 1,
                                "The parallelism of non parallel operator must 
be 1.");
 
@@ -156,9 +153,6 @@ public String getName() {
         */
        @PublicEvolving
        public SingleOutputStreamOperator<T> setMaxParallelism(int 
maxParallelism) {
-               Preconditions.checkArgument(maxParallelism > 0,
-                               "The maximum parallelism must be greater than 
0.");
-
                Preconditions.checkArgument(canBeParallel() || maxParallelism 
== 1,
                                "The maximum parallelism of non parallel 
operator must be 1.");
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a99efb1c64f..645cc4e2e42 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -657,7 +657,8 @@ public JobGraph getJobGraph() {
                                                        + "\nThe user can force 
enable state checkpoints with the reduced guarantees by calling: 
env.enableCheckpointing(interval,true)");
                }
 
-               StreamingJobGraphGenerator jobgraphGenerator = new 
StreamingJobGraphGenerator(this, defaultParallelism);
+               StreamingJobGraphGenerator jobgraphGenerator =
+                               new StreamingJobGraphGenerator(this, 
defaultParallelism, executionConfig.getMaxParallelism());
 
                return jobgraphGenerator.createJobGraph();
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index df10ae4476f..48dbf0980e9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -151,16 +151,6 @@ private StreamGraph 
generateInternal(List<StreamTransformation<?>> transformatio
 
                LOG.debug("Transforming " + transform);
 
-               if (transform.getMaxParallelism() <= 0) {
-
-                       // if the max parallelism hasn't been set, then first 
use the job wide max parallelism
-                       // from theExecutionConfig.
-                       int globalMaxParallelismFromConfig = 
env.getConfig().getMaxParallelism();
-                       if (globalMaxParallelismFromConfig > 0) {
-                               
transform.setMaxParallelism(globalMaxParallelismFromConfig);
-                       }
-               }
-
                // call at least once to trigger exceptions about 
MissingTypeInfo
                transform.getOutputType();
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0896eb75c18..92ce314d9ef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -94,12 +94,14 @@
        private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
        private final int defaultParallelism;
+       private final int defaultMaxParallelism;
 
-       public StreamingJobGraphGenerator(StreamGraph streamGraph, int 
defaultParallelism) {
+       public StreamingJobGraphGenerator(StreamGraph streamGraph, int 
defaultParallelism, int defaultMaxParallelism) {
                this.streamGraph = streamGraph;
                this.defaultStreamGraphHasher = new StreamGraphHasherV2();
                this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphHasherV1(), new StreamGraphUserHashHasher());
                this.defaultParallelism = defaultParallelism;
+               this.defaultMaxParallelism = defaultMaxParallelism;
        }
 
        private void init() {
@@ -341,14 +343,15 @@ private StreamConfig createJobVertex(
                jobVertex.setInvokableClass(streamNode.getJobVertexClass());
 
                int parallelism = streamNode.getParallelism();
-
+               int maxParallelism = streamNode.getMaxParallelism();
                if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
                        parallelism = defaultParallelism;
                }
-
+               if (maxParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+                       maxParallelism = defaultMaxParallelism;
+               }
                jobVertex.setParallelism(parallelism);
-
-               jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
+               jobVertex.setMaxParallelism(maxParallelism);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Parallelism set: {} for {}", parallelism, 
streamNodeId);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index e86b3e8f04f..0da77357e4f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -19,17 +19,18 @@
 
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -126,7 +127,7 @@ public static int getNewNodeId() {
         * The maximum parallelism for this stream transformation. It defines 
the upper limit for
         * dynamic scaling and the number of key groups used for partitioned 
state.
         */
-       private int maxParallelism = -1;
+       private int maxParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
        /**
         *  The minimum resources for this stream transformation. It defines 
the lower limit for
@@ -202,7 +203,16 @@ public int getParallelism() {
         * @param parallelism The new parallelism to set on this {@code 
StreamTransformation}
         */
        public void setParallelism(int parallelism) {
-               Preconditions.checkArgument(parallelism > 0, "Parallelism must 
be bigger than zero.");
+               checkArgument(
+                               parallelism >= 1 || parallelism == 
ExecutionConfig.PARALLELISM_DEFAULT,
+                               "Parallelism must be at least one, or 
ExecutionConfig.PARALLELISM_DEFAULT " +
+                                               "(use system default).");
+               checkArgument(
+                               maxParallelism == -1 || parallelism <= 
maxParallelism,
+                               "The specified parallelism must be smaller or 
equal to the maximum parallelism.");
+               checkArgument(
+                               maxParallelism == -1 || parallelism != 
ExecutionConfig.PARALLELISM_DEFAULT,
+                               "Default parallelism cannot be specified when 
maximum parallelism is specified");
                this.parallelism = parallelism;
        }
 
@@ -221,10 +231,19 @@ public int getMaxParallelism() {
         * @param maxParallelism Maximum parallelism for this stream 
transformation.
         */
        public void setMaxParallelism(int maxParallelism) {
-               Preconditions.checkArgument(maxParallelism > 0
-                                               && maxParallelism <= 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
-                               "Maximum parallelism must be between 1 and " + 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
-                                               + ". Found: " + maxParallelism);
+               checkArgument(
+                               parallelism != 
ExecutionConfig.PARALLELISM_DEFAULT,
+                               "A maximum parallelism can only be specified 
with an explicitly specified " +
+                                               "parallelism.");
+               checkArgument(maxParallelism > 0, "The maximum parallelism must 
be greater than 0.");
+               checkArgument(
+                               maxParallelism >= parallelism,
+                               "The maximum parallelism must be larger than 
the parallelism. (parallelism = " +
+                                               parallelism + " max-parallelism 
= " + maxParallelism + ")");
+               checkArgument(
+                               maxParallelism > 0 && maxParallelism <= 
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
+                               "maxParallelism is out of bounds 0 < 
maxParallelism <= " +
+                                               
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
                this.maxParallelism = maxParallelism;
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TimestampAssignerTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TimestampAssignerTranslationTest.java
new file mode 100644
index 00000000000..dc053d3110d
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TimestampAssignerTranslationTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator;
+import 
org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * These tests verify that the api calls on {@link DataStream} correctly 
instantiate
+ * timestamp/watermark assignment operators.
+ *
+ * <p>We also create a test harness and push one element into the operator to 
verify
+ * that we get some output.
+ */
+@SuppressWarnings("serial")
+public class TimestampAssignerTranslationTest {
+
+       /**
+        * When the upstream operator has the default parallelism it has 
parallelism {@code -1}. This
+        * test makes sure that code API code can deal with that.
+        */
+       @Test
+       public void testPunctuatedAssignerWorksWithDefaultParallelism() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Tuple2<String, Integer>> source =
+                               env.fromElements(Tuple2.of("hello", 1), 
Tuple2.of("hello", 2));
+
+               SingleOutputStreamOperator<Tuple2<String, Integer>> assigner = 
source
+                               .map(new IdentityMap())
+                               
.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
+                               .assignTimestampsAndWatermarks(new 
DummyPunctuatedAssigner());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) assigner.getTransformation();
+
+               assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, 
transform.getParallelism());
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               assertTrue(operator instanceof 
TimestampsAndPunctuatedWatermarksOperator);
+               TimestampsAndPunctuatedWatermarksOperator<Tuple2<String, 
Integer>> assignerOperator =
+                               
(TimestampsAndPunctuatedWatermarksOperator<Tuple2<String, Integer>>) operator;
+
+               processElementAndEnsureOutput(assignerOperator, new 
Tuple2<>("hello", 1));
+       }
+
+       /**
+        * When the upstream operator has the default parallelism it has 
parallelism {@code -1}. This
+        * test makes sure that code API code can deal with that.
+        */
+       @Test
+       public void testPeriodicAssignerWorksWithDefaultParallelism() throws 
Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Tuple2<String, Integer>> source =
+                               env.fromElements(Tuple2.of("hello", 1), 
Tuple2.of("hello", 2));
+
+               SingleOutputStreamOperator<Tuple2<String, Integer>> assigner = 
source
+                               .map(new IdentityMap())
+                               
.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
+                               .assignTimestampsAndWatermarks(new 
DummyPeriodicAssigner());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) assigner.getTransformation();
+
+               assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, 
transform.getParallelism());
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               assertTrue(operator instanceof 
TimestampsAndPeriodicWatermarksOperator);
+               TimestampsAndPeriodicWatermarksOperator<Tuple2<String, 
Integer>> assignerOperator =
+                               
(TimestampsAndPeriodicWatermarksOperator<Tuple2<String, Integer>>) operator;
+
+               processElementAndEnsureOutput(assignerOperator, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       public void testPunctuatedAssignerPicksUpUpstreamParallelism() throws 
Exception {
+               final int parallelism = 13;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Tuple2<String, Integer>> source =
+                               env.fromElements(Tuple2.of("hello", 1), 
Tuple2.of("hello", 2));
+
+               SingleOutputStreamOperator<Tuple2<String, Integer>> assigner = 
source
+                               .map(new IdentityMap())
+                               .setParallelism(parallelism)
+                               .assignTimestampsAndWatermarks(new 
DummyPunctuatedAssigner());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) assigner.getTransformation();
+
+               assertEquals(parallelism, transform.getParallelism());
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof 
TimestampsAndPunctuatedWatermarksOperator);
+               TimestampsAndPunctuatedWatermarksOperator<Tuple2<String, 
Integer>> assignerOperator =
+                               
(TimestampsAndPunctuatedWatermarksOperator<Tuple2<String, Integer>>) operator;
+
+               processElementAndEnsureOutput(assignerOperator, new 
Tuple2<>("hello", 1));
+       }
+
+       @Test
+       public void testPeriodicAssignerPicksUpUpstreamParallelism() throws 
Exception {
+               final int parallelism = 13;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Tuple2<String, Integer>> source =
+                               env.fromElements(Tuple2.of("hello", 1), 
Tuple2.of("hello", 2));
+
+               SingleOutputStreamOperator<Tuple2<String, Integer>> assigner = 
source
+                               .map(new IdentityMap())
+                               .setParallelism(parallelism)
+                               .assignTimestampsAndWatermarks(new 
DummyPeriodicAssigner());
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
+                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) assigner.getTransformation();
+
+               assertEquals(parallelism, transform.getParallelism());
+
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
+               Assert.assertTrue(operator instanceof 
TimestampsAndPeriodicWatermarksOperator);
+               TimestampsAndPeriodicWatermarksOperator<Tuple2<String, 
Integer>> assignerOperator =
+                               
(TimestampsAndPeriodicWatermarksOperator<Tuple2<String, Integer>>) operator;
+
+               processElementAndEnsureOutput(assignerOperator, new 
Tuple2<>("hello", 1));
+       }
+
+       /**
+        * Ensure that we get some output from the given operator when pushing 
in an element and
+        * setting watermark and processing time to {@code Long.MAX_VALUE}.
+        */
+       private static <IN, OUT> void processElementAndEnsureOutput(
+                       OneInputStreamOperator<IN, OUT> operator,
+                       IN element) throws Exception {
+
+               OneInputStreamOperatorTestHarness<IN, OUT> testHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(operator);
+
+               if (operator instanceof OutputTypeConfigurable) {
+                       // use a dummy type since window functions just need 
the ExecutionConfig
+                       // this is also only needed for Fold, which we're 
getting rid off soon.
+                       ((OutputTypeConfigurable) 
operator).setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
+               }
+
+               testHarness.open();
+
+               testHarness.setProcessingTime(0);
+               testHarness.processWatermark(Long.MIN_VALUE);
+
+               testHarness.processElement(new StreamRecord<>(element, 0));
+
+               // provoke any processing-time/event-time triggers
+               testHarness.setProcessingTime(Long.MAX_VALUE);
+               testHarness.processWatermark(Long.MAX_VALUE);
+
+               // we at least get the record and the passed-through 
Long.MAX_VALUE watermark
+               assertTrue(testHarness.getOutput().size() >= 2);
+
+               testHarness.close();
+       }
+
+
+       private static class IdentityMap implements 
MapFunction<Tuple2<String,Integer>, Tuple2<String, Integer>> {
+               @Override
+               public Tuple2<String, Integer> map(Tuple2<String, Integer> 
value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static class DummyPunctuatedAssigner implements 
AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
+               @Nullable
+               @Override
+               public Watermark checkAndGetNextWatermark(
+                               Tuple2<String, Integer> lastElement, long 
extractedTimestamp) {
+                       return null;
+               }
+
+               @Override
+               public long extractTimestamp(
+                               Tuple2<String, Integer> element,
+                               long previousElementTimestamp) {
+                       return 0;
+               }
+       }
+
+       private static class DummyPeriodicAssigner implements 
AssignerWithPeriodicWatermarks<Tuple2<String, Integer>> {
+               @Override
+               public long extractTimestamp(Tuple2<String, Integer> element,
+                               long previousElementTimestamp) {
+                       return 0;
+               }
+
+               @Nullable
+               @Override
+               public Watermark getCurrentWatermark() {
+                       return null;
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index fd271791711..fa07709bfc9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -258,12 +258,6 @@ public void flatMap(Integer value, Collector<Object> out) 
throws Exception {
                env.getStreamGraph().getJobGraph();
                Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
 
-               // configured value after generating
-               env.setParallelism(21);
-               env.setMaxParallelism(42);
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());
-
                // bounds configured parallelism 1
                try {
                        env.setMaxParallelism(0);
@@ -293,6 +287,7 @@ public void flatMap(Integer value, Collector<Object> out) 
throws Exception {
                }
 
                // bounds for max parallelism 3
+               operator.setParallelism(1);
                operator.setMaxParallelism(1);
                Assert.assertEquals(1, 
operator.getTransformation().getMaxParallelism());
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index fbbb5d23146..544f69ac89b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -270,49 +270,6 @@ public Integer getKey(Integer value) throws Exception {
                StreamPartitioner<?> streamPartitioner = 
keyedResultNode.getInEdges().get(0).getPartitioner();
        }
 
-       /**
-        * Tests that the global and operator-wide max parallelism setting is 
respected
-        */
-       @Test
-       public void testMaxParallelismForwarding() {
-               int globalMaxParallelism = 42;
-               int keyedResult2MaxParallelism = 17;
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().setParallelism(12);
-               env.getConfig().setMaxParallelism(globalMaxParallelism);
-
-               DataStream<Integer> source = env.fromElements(1, 2, 3);
-
-               DataStream<Integer> keyedResult1 = source.keyBy(new 
KeySelector<Integer, Integer>() {
-                       private static final long serialVersionUID = 
9205556348021992189L;
-
-                       @Override
-                       public Integer getKey(Integer value) throws Exception {
-                               return value;
-                       }
-               }).map(new NoOpIntMap());
-
-               DataStream<Integer> keyedResult2 = keyedResult1.keyBy(new 
KeySelector<Integer, Integer>() {
-                       private static final long serialVersionUID = 
1250168178707154838L;
-
-                       @Override
-                       public Integer getKey(Integer value) throws Exception {
-                               return value;
-                       }
-               }).map(new 
NoOpIntMap()).setMaxParallelism(keyedResult2MaxParallelism);
-
-               keyedResult2.addSink(new DiscardingSink<Integer>());
-
-               StreamGraph graph = env.getStreamGraph();
-
-               StreamNode keyedResult1Node = 
graph.getStreamNode(keyedResult1.getId());
-               StreamNode keyedResult2Node = 
graph.getStreamNode(keyedResult2.getId());
-
-               assertEquals(globalMaxParallelism, 
keyedResult1Node.getMaxParallelism());
-               assertEquals(keyedResult2MaxParallelism, 
keyedResult2Node.getMaxParallelism());
-       }
-
        /**
         * Tests that the max parallelism is automatically set to the 
parallelism if it has not been
         * specified.
@@ -321,7 +278,7 @@ public Integer getKey(Integer value) throws Exception {
        public void testAutoMaxParallelism() {
                int globalParallelism = 42;
                int mapParallelism = 17;
-               int maxParallelism = 21;
+               int maxParallelism = 84;
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(globalParallelism);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index abf51abef80..d8e1c046779 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -37,6 +38,7 @@
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -50,6 +52,83 @@
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
 
+       /**
+        * Verify that the default parallelism and max parallelism are 
manifested in
+        * the generated job graph when no parallelism is set on operator or 
execution environment.
+        */
+       @Test
+       public void testDefaultParallelismManifestation() {
+               final int customParallelism = 5;
+               final int customMaxParallelism = 10;
+               final int defaultParallelism = 20;
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+               env.disableOperatorChaining();
+
+               assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, 
env.getParallelism());
+
+               env
+                               .fromElements("a")
+                               .map(new 
IdentityMap()).name("map1").setParallelism(customParallelism).setMaxParallelism(customMaxParallelism)
+                               .map(new IdentityMap()).name("map2");
+
+               StreamGraph streamGraph = env.getStreamGraph();
+               streamGraph.setJobName("test job");
+               JobGraph jobGraph = streamGraph.getJobGraph();
+
+               assertEquals(3, 
jobGraph.getVerticesSortedTopologicallyFromSources().size());
+
+               JobVertex map1Vertex = getOnlyVertex(jobGraph, "map1");
+               JobVertex map2Vertex = getOnlyVertex(jobGraph, "map2");
+
+               assertEquals(customParallelism, map1Vertex.getParallelism());
+               assertEquals(defaultParallelism, map2Vertex.getParallelism());
+
+               assertEquals(customMaxParallelism, 
map1Vertex.getMaxParallelism());
+               assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, 
map2Vertex.getMaxParallelism());
+       }
+
+       /**
+        * Verify that the execution environment parallelism and max 
parallelism are manifested in
+        * the generated job graph when no parallelism is set on operator or 
execution environment.
+        */
+       @Test
+       public void testEnvironmentParallelismManifestation() {
+               final int customParallelism = 5;
+               final int customMaxParallelism = 10;
+               final int defaultParallelism = 20;
+               final int envParallelism = 25;
+               final int envMaxParallelism = 30;
+
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+               env.disableOperatorChaining();
+               env.setParallelism(envParallelism);
+               env.setMaxParallelism(envMaxParallelism);
+
+               env
+                               .fromElements("a")
+                               .map(new 
IdentityMap()).name("map1").setParallelism(customParallelism).setMaxParallelism(customMaxParallelism)
+                               .map(new IdentityMap()).name("map2");
+
+               // test once, then change the env parallelism and generate again
+               StreamGraph streamGraph = env.getStreamGraph();
+               streamGraph.setJobName("test job");
+               JobGraph jobGraph = streamGraph.getJobGraph();
+
+               assertEquals(3, 
jobGraph.getVerticesSortedTopologicallyFromSources().size());
+
+               JobVertex map1Vertex = getOnlyVertex(jobGraph, "map1");
+               JobVertex map2Vertex = getOnlyVertex(jobGraph, "map2");
+
+               assertEquals(customParallelism, map1Vertex.getParallelism());
+               assertEquals(envParallelism, map2Vertex.getParallelism());
+
+               assertEquals(customMaxParallelism, 
map1Vertex.getMaxParallelism());
+               assertEquals(envMaxParallelism, map2Vertex.getMaxParallelism());
+       }
+
+
        @Test
        public void testParallelismOneNotChained() {
 
@@ -115,7 +194,8 @@ public void testDisabledCheckpointing() throws Exception {
                StreamGraph streamGraph = new StreamGraph(env, 1 /* default 
parallelism */);
                assertFalse("Checkpointing enabled", 
streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
-               StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */);
+               StreamingJobGraphGenerator jobGraphGenerator =
+                               new StreamingJobGraphGenerator(streamGraph, 1 
/* default parallelism */, 1 /* max parallelism */);
                JobGraph jobGraph = jobGraphGenerator.createJobGraph();
 
                JobSnapshottingSettings snapshottingSettings = 
jobGraph.getSnapshotSettings();
@@ -137,7 +217,9 @@ public Integer map(Integer value) throws Exception {
                                }
                        })
                        .print();
-               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism 
*/).createJobGraph();
+
+               JobGraph jobGraph = new StreamingJobGraphGenerator(
+                               env.getStreamGraph(), 1 /* default parallelism 
*/, -1 /* default max parallelism */).createJobGraph();
 
                List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
                JobVertex sourceVertex = verticesSorted.get(0);
@@ -224,7 +306,8 @@ public void invoke(Tuple2<Integer, Integer> value) throws 
Exception {
                });
                sinkMethod.invoke(sink, resource5);
 
-               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism 
*/).createJobGraph();
+               JobGraph jobGraph = new StreamingJobGraphGenerator(
+                               env.getStreamGraph(), 1 /* default parallelism 
*/, -1 /* default max parallelism */).createJobGraph();
 
                JobVertex sourceMapFilterVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
                JobVertex reduceSinkVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
@@ -291,7 +374,8 @@ public void invoke(Integer value) throws Exception {
                }).disableChaining().name("test_sink");
                sinkMethod.invoke(sink, resource5);
 
-               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism 
*/).createJobGraph();
+               JobGraph jobGraph = new StreamingJobGraphGenerator(
+                               env.getStreamGraph(), 1 /* default parallelism 
*/, -1 /* default max parallelism */).createJobGraph();
 
                for (JobVertex jobVertex : jobGraph.getVertices()) {
                        if (jobVertex.getName().contains("test_source")) {
@@ -307,4 +391,33 @@ public void invoke(Integer value) throws Exception {
                        }
                }
        }
+
+       /**
+        * Returns the only vertex whose name contains the given name. Throws 
an {@link AssertionError}
+        * if more than one vertex matches.
+        */
+       private static JobVertex getOnlyVertex(JobGraph jobGraph, String name) {
+               JobVertex result = null;
+               for (JobVertex v : jobGraph.getVertices()) {
+                       if (v.getName().equals(name)) {
+                               if (result != null) {
+                                       Assert.fail("More than one vertex 
matches the name.");
+                               }
+                               result = v;
+                       }
+               }
+               if (result == null) {
+                       Assert.fail("No vertex matches the name.");
+               }
+               return result;
+       }
+
+       private static class IdentityMap implements MapFunction<String, String> 
{
+               private static final long serialVersionUID = 
471891682418382583L;
+
+               @Override
+               public String map(String value) {
+                       return value;
+               }
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to