[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()

Before, there where some checks in
StreamExecutionEnvironment.set(Max)Parallelism() but a user would
circumvent these if using the ExecutionConfig directly. Now, all checks
are moved to the ExecutionConfig.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99fb80be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99fb80be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99fb80be

Branch: refs/heads/release-1.2
Commit: 99fb80be773499907d379553010dd999214f64fb
Parents: d3b275f
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Mar 10 14:35:37 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Sat Mar 18 07:43:42 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 35 +++++++++----
 .../environment/StreamExecutionEnvironment.java |  8 ---
 .../StreamExecutionEnvironmentTest.java         | 54 ++++++++++++++++++++
 .../api/graph/StreamGraphGeneratorTest.java     |  3 ++
 ...tractEventTimeWindowCheckpointingITCase.java |  2 +-
 5 files changed, 84 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 14245ed..8d5fc90 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -84,8 +84,9 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        public static final int PARALLELISM_UNKNOWN = -2;
 
        /**
-        * The default lower bound for max parallelism if nothing was 
configured by the user. We have this so allow users
-        * some degree of scale-up in case they forgot to configure maximum 
parallelism explicitly.
+        * The default lower bound for max parallelism if nothing was 
configured by the user. We have
+        * this to allow users some degree of scale-up in case they forgot to 
configure maximum
+        * parallelism explicitly.
         */
        public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
 
@@ -289,13 +290,18 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         * @param parallelism The parallelism to use
         */
        public ExecutionConfig setParallelism(int parallelism) {
-               if (parallelism != PARALLELISM_UNKNOWN) {
-                       if (parallelism < 1 && parallelism != 
PARALLELISM_DEFAULT) {
-                               throw new IllegalArgumentException(
-                                       "Parallelism must be at least one, or 
ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
-                       }
-                       this.parallelism = parallelism;
-               }
+               checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot 
specify UNKNOWN_PARALLELISM.");
+               checkArgument(
+                               parallelism >= 1 || parallelism == 
PARALLELISM_DEFAULT,
+                               "Parallelism must be at least one, or 
ExecutionConfig.PARALLELISM_DEFAULT " +
+                                               "(use system default).");
+               checkArgument(
+                               maxParallelism == -1 || parallelism <= 
maxParallelism,
+                               "The specified parallelism must be smaller or 
equal to the maximum parallelism.");
+               checkArgument(
+                               maxParallelism == -1 || parallelism != 
PARALLELISM_DEFAULT,
+                               "Default parallelism cannot be specified when 
maximum parallelism is specified");
+               this.parallelism = parallelism;
                return this;
        }
 
@@ -322,7 +328,18 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         */
        @PublicEvolving
        public void setMaxParallelism(int maxParallelism) {
+               checkArgument(
+                               parallelism != PARALLELISM_DEFAULT,
+                               "A maximum parallelism can only be specified 
with an explicitly specified " +
+                                               "parallelism.");
                checkArgument(maxParallelism > 0, "The maximum parallelism must 
be greater than 0.");
+               checkArgument(
+                               maxParallelism >= parallelism,
+                               "The maximum parallelism must be larger than 
the parallelism.");
+               checkArgument(
+                               maxParallelism > 0 && maxParallelism <= 
UPPER_BOUND_MAX_PARALLELISM,
+                               "maxParallelism is out of bounds 0 < 
maxParallelism <= " +
+                                               UPPER_BOUND_MAX_PARALLELISM + 
". Found: " + maxParallelism);
                this.maxParallelism = maxParallelism;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 640915c..b16298c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -167,9 +167,6 @@ public abstract class StreamExecutionEnvironment {
         * @param parallelism The parallelism
         */
        public StreamExecutionEnvironment setParallelism(int parallelism) {
-               if (parallelism < 1) {
-                       throw new IllegalArgumentException("parallelism must be 
at least one.");
-               }
                config.setParallelism(parallelism);
                return this;
        }
@@ -183,11 +180,6 @@ public abstract class StreamExecutionEnvironment {
         * @param maxParallelism Maximum degree of parallelism to be used for 
the program., with 0 < maxParallelism <= 2^15 - 1
         */
        public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) 
{
-               Preconditions.checkArgument(maxParallelism > 0 &&
-                                               maxParallelism <= 
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
-                               "maxParallelism is out of bounds 0 < 
maxParallelism <= " +
-                                               
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
-
                config.setMaxParallelism(maxParallelism);
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index d29c833..fd27179 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -36,7 +36,9 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.net.URL;
 import java.util.Arrays;
@@ -52,6 +54,10 @@ import static org.mockito.Mockito.mock;
 
 public class StreamExecutionEnvironmentTest {
 
+       @Rule
+       public final ExpectedException exception = ExpectedException.none();
+
+
        @Test
        public void fromElementsWithBaseTypeTest1() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -156,6 +162,53 @@ public class StreamExecutionEnvironmentTest {
        }
 
        @Test
+       public void testMaxParallelismMustBeBiggerEqualParallelism() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               env.setParallelism(10);
+
+               exception.expect(IllegalArgumentException.class);
+               env.setMaxParallelism(5);
+       }
+
+       @Test
+       public void testParallelismMustBeSmallerEqualMaxParallelism() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               env.setParallelism(10);
+               env.setMaxParallelism(20);
+
+               exception.expect(IllegalArgumentException.class);
+               env.setParallelism(30);
+       }
+
+       @Test
+       public void 
testSetDefaultParallelismNotAllowedWhenMaxParallelismSpecified() {
+               final int defaultParallelism = 20;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+
+               env.setParallelism(10);
+               env.setMaxParallelism(15);
+
+               exception.expect(IllegalArgumentException.class);
+               env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+       }
+
+       @Test
+       public void testSetMaxParallelismNotAllowedWithDefaultParallelism() {
+               final int defaultParallelism = 20;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+
+               env.setParallelism(10);
+               env.setMaxParallelism(15);
+
+               exception.expect(IllegalArgumentException.class);
+               env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+       }
+
+       @Test
        public void testParallelismBounds() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -206,6 +259,7 @@ public class StreamExecutionEnvironmentTest {
                Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
 
                // configured value after generating
+               env.setParallelism(21);
                env.setMaxParallelism(42);
                env.getStreamGraph().getJobGraph();
                Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 5fdacd4..fbbb5d2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -247,6 +247,7 @@ public class StreamGraphGeneratorTest {
        public void testSetupOfKeyGroupPartitioner() {
                int maxParallelism = 42;
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().setParallelism(12);
                env.getConfig().setMaxParallelism(maxParallelism);
 
                DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -278,6 +279,7 @@ public class StreamGraphGeneratorTest {
                int keyedResult2MaxParallelism = 17;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().setParallelism(12);
                env.getConfig().setMaxParallelism(globalMaxParallelism);
 
                DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -384,6 +386,7 @@ public class StreamGraphGeneratorTest {
                DataStream<Integer> input1 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(128);
                DataStream<Integer> input2 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(129);
 
+               env.setParallelism(12);
                env.getConfig().setMaxParallelism(maxParallelism);
 
                DataStream<Integer> keyedResult = input1.connect(input2).keyBy(

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 1911f44..ee417ac 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -290,8 +290,8 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
                                        "localhost", 
cluster.getLeaderRPCPort());
 
-                       env.setMaxParallelism(2 * PARALLELISM);
                        env.setParallelism(PARALLELISM);
+                       env.setMaxParallelism(2 * PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
                        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));

Reply via email to