[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism() Before, there where some checks in StreamExecutionEnvironment.set(Max)Parallelism() but a user would circumvent these if using the ExecutionConfig directly. Now, all checks are moved to the ExecutionConfig.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99fb80be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99fb80be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99fb80be Branch: refs/heads/release-1.2 Commit: 99fb80be773499907d379553010dd999214f64fb Parents: d3b275f Author: Aljoscha Krettek <[email protected]> Authored: Fri Mar 10 14:35:37 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Sat Mar 18 07:43:42 2017 +0100 ---------------------------------------------------------------------- .../flink/api/common/ExecutionConfig.java | 35 +++++++++---- .../environment/StreamExecutionEnvironment.java | 8 --- .../StreamExecutionEnvironmentTest.java | 54 ++++++++++++++++++++ .../api/graph/StreamGraphGeneratorTest.java | 3 ++ ...tractEventTimeWindowCheckpointingITCase.java | 2 +- 5 files changed, 84 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 14245ed..8d5fc90 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -84,8 +84,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut public static final int PARALLELISM_UNKNOWN = -2; /** - * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users - * some degree of scale-up in case they forgot to configure maximum parallelism explicitly. + * The default lower bound for max parallelism if nothing was configured by the user. We have + * this to allow users some degree of scale-up in case they forgot to configure maximum + * parallelism explicitly. */ public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7; @@ -289,13 +290,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut * @param parallelism The parallelism to use */ public ExecutionConfig setParallelism(int parallelism) { - if (parallelism != PARALLELISM_UNKNOWN) { - if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) { - throw new IllegalArgumentException( - "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default)."); - } - this.parallelism = parallelism; - } + checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); + checkArgument( + parallelism >= 1 || parallelism == PARALLELISM_DEFAULT, + "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT " + + "(use system default)."); + checkArgument( + maxParallelism == -1 || parallelism <= maxParallelism, + "The specified parallelism must be smaller or equal to the maximum parallelism."); + checkArgument( + maxParallelism == -1 || parallelism != PARALLELISM_DEFAULT, + "Default parallelism cannot be specified when maximum parallelism is specified"); + this.parallelism = parallelism; return this; } @@ -322,7 +328,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut */ @PublicEvolving public void setMaxParallelism(int maxParallelism) { + checkArgument( + parallelism != PARALLELISM_DEFAULT, + "A maximum parallelism can only be specified with an explicitly specified " + + "parallelism."); checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0."); + checkArgument( + maxParallelism >= parallelism, + "The maximum parallelism must be larger than the parallelism."); + checkArgument( + maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM, + "maxParallelism is out of bounds 0 < maxParallelism <= " + + UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); this.maxParallelism = maxParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 640915c..b16298c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -167,9 +167,6 @@ public abstract class StreamExecutionEnvironment { * @param parallelism The parallelism */ public StreamExecutionEnvironment setParallelism(int parallelism) { - if (parallelism < 1) { - throw new IllegalArgumentException("parallelism must be at least one."); - } config.setParallelism(parallelism); return this; } @@ -183,11 +180,6 @@ public abstract class StreamExecutionEnvironment { * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1 */ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) { - Preconditions.checkArgument(maxParallelism > 0 && - maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, - "maxParallelism is out of bounds 0 < maxParallelism <= " + - ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); - config.setMaxParallelism(maxParallelism); return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java index d29c833..fd27179 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java @@ -36,7 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.net.URL; import java.util.Arrays; @@ -52,6 +54,10 @@ import static org.mockito.Mockito.mock; public class StreamExecutionEnvironmentTest { + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Test public void fromElementsWithBaseTypeTest1() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -156,6 +162,53 @@ public class StreamExecutionEnvironmentTest { } @Test + public void testMaxParallelismMustBeBiggerEqualParallelism() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setParallelism(10); + + exception.expect(IllegalArgumentException.class); + env.setMaxParallelism(5); + } + + @Test + public void testParallelismMustBeSmallerEqualMaxParallelism() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setParallelism(10); + env.setMaxParallelism(20); + + exception.expect(IllegalArgumentException.class); + env.setParallelism(30); + } + + @Test + public void testSetDefaultParallelismNotAllowedWhenMaxParallelismSpecified() { + final int defaultParallelism = 20; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism); + + env.setParallelism(10); + env.setMaxParallelism(15); + + exception.expect(IllegalArgumentException.class); + env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT); + } + + @Test + public void testSetMaxParallelismNotAllowedWithDefaultParallelism() { + final int defaultParallelism = 20; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism); + + env.setParallelism(10); + env.setMaxParallelism(15); + + exception.expect(IllegalArgumentException.class); + env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT); + } + + @Test public void testParallelismBounds() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -206,6 +259,7 @@ public class StreamExecutionEnvironmentTest { Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); // configured value after generating + env.setParallelism(21); env.setMaxParallelism(42); env.getStreamGraph().getJobGraph(); Assert.assertEquals(42, operator.getTransformation().getMaxParallelism()); http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 5fdacd4..fbbb5d2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -247,6 +247,7 @@ public class StreamGraphGeneratorTest { public void testSetupOfKeyGroupPartitioner() { int maxParallelism = 42; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setParallelism(12); env.getConfig().setMaxParallelism(maxParallelism); DataStream<Integer> source = env.fromElements(1, 2, 3); @@ -278,6 +279,7 @@ public class StreamGraphGeneratorTest { int keyedResult2MaxParallelism = 17; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setParallelism(12); env.getConfig().setMaxParallelism(globalMaxParallelism); DataStream<Integer> source = env.fromElements(1, 2, 3); @@ -384,6 +386,7 @@ public class StreamGraphGeneratorTest { DataStream<Integer> input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128); DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129); + env.setParallelism(12); env.getConfig().setMaxParallelism(maxParallelism); DataStream<Integer> keyedResult = input1.connect(input2).keyBy( http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 1911f44..ee417ac 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -290,8 +290,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( "localhost", cluster.getLeaderRPCPort()); - env.setMaxParallelism(2 * PARALLELISM); env.setParallelism(PARALLELISM); + env.setMaxParallelism(2 * PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
