Revert "[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()"
This reverts commit f31a55e08ddb7b4bc9e47577a187bac31ad42f4b. The fixes around FLINK-5808 introduced follow-up issues. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b4dd411 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b4dd411 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b4dd411 Branch: refs/heads/master Commit: 5b4dd4117d111514590dfb1bd0a1f4bba3db2b9e Parents: 04aae30 Author: Aljoscha Krettek <[email protected]> Authored: Tue Apr 4 14:02:37 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Apr 18 17:42:10 2017 +0200 ---------------------------------------------------------------------- .../flink/api/common/ExecutionConfig.java | 35 ++++--------- .../environment/StreamExecutionEnvironment.java | 8 +++ .../StreamExecutionEnvironmentTest.java | 54 -------------------- .../api/graph/StreamGraphGeneratorTest.java | 3 -- ...tractEventTimeWindowCheckpointingITCase.java | 2 +- 5 files changed, 18 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index c18db52..9af9cff 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -84,9 +84,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut public static final int PARALLELISM_UNKNOWN = -2; /** - * The default lower bound for max parallelism if nothing was configured by the user. We have - * this to allow users some degree of scale-up in case they forgot to configure maximum - * parallelism explicitly. + * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users + * some degree of scale-up in case they forgot to configure maximum parallelism explicitly. */ public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7; @@ -293,18 +292,13 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut * @param parallelism The parallelism to use */ public ExecutionConfig setParallelism(int parallelism) { - checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); - checkArgument( - parallelism >= 1 || parallelism == PARALLELISM_DEFAULT, - "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT " + - "(use system default)."); - checkArgument( - maxParallelism == -1 || parallelism <= maxParallelism, - "The specified parallelism must be smaller or equal to the maximum parallelism."); - checkArgument( - maxParallelism == -1 || parallelism != PARALLELISM_DEFAULT, - "Default parallelism cannot be specified when maximum parallelism is specified"); - this.parallelism = parallelism; + if (parallelism != PARALLELISM_UNKNOWN) { + if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) { + throw new IllegalArgumentException( + "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default)."); + } + this.parallelism = parallelism; + } return this; } @@ -331,18 +325,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut */ @PublicEvolving public void setMaxParallelism(int maxParallelism) { - checkArgument( - parallelism != PARALLELISM_DEFAULT, - "A maximum parallelism can only be specified with an explicitly specified " + - "parallelism."); checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0."); - checkArgument( - maxParallelism >= parallelism, - "The maximum parallelism must be larger than the parallelism."); - checkArgument( - maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM, - "maxParallelism is out of bounds 0 < maxParallelism <= " + - UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); this.maxParallelism = maxParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 88db04e..1801315 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -168,6 +168,9 @@ public abstract class StreamExecutionEnvironment { * @param parallelism The parallelism */ public StreamExecutionEnvironment setParallelism(int parallelism) { + if (parallelism < 1) { + throw new IllegalArgumentException("parallelism must be at least one."); + } config.setParallelism(parallelism); return this; } @@ -181,6 +184,11 @@ public abstract class StreamExecutionEnvironment { * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1 */ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) { + Preconditions.checkArgument(maxParallelism > 0 && + maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, + "maxParallelism is out of bounds 0 < maxParallelism <= " + + ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); + config.setMaxParallelism(maxParallelism); return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java index fd27179..d29c833 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java @@ -36,9 +36,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.net.URL; import java.util.Arrays; @@ -54,10 +52,6 @@ import static org.mockito.Mockito.mock; public class StreamExecutionEnvironmentTest { - @Rule - public final ExpectedException exception = ExpectedException.none(); - - @Test public void fromElementsWithBaseTypeTest1() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -162,53 +156,6 @@ public class StreamExecutionEnvironmentTest { } @Test - public void testMaxParallelismMustBeBiggerEqualParallelism() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - env.setParallelism(10); - - exception.expect(IllegalArgumentException.class); - env.setMaxParallelism(5); - } - - @Test - public void testParallelismMustBeSmallerEqualMaxParallelism() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - env.setParallelism(10); - env.setMaxParallelism(20); - - exception.expect(IllegalArgumentException.class); - env.setParallelism(30); - } - - @Test - public void testSetDefaultParallelismNotAllowedWhenMaxParallelismSpecified() { - final int defaultParallelism = 20; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism); - - env.setParallelism(10); - env.setMaxParallelism(15); - - exception.expect(IllegalArgumentException.class); - env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT); - } - - @Test - public void testSetMaxParallelismNotAllowedWithDefaultParallelism() { - final int defaultParallelism = 20; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism); - - env.setParallelism(10); - env.setMaxParallelism(15); - - exception.expect(IllegalArgumentException.class); - env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT); - } - - @Test public void testParallelismBounds() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -259,7 +206,6 @@ public class StreamExecutionEnvironmentTest { Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); // configured value after generating - env.setParallelism(21); env.setMaxParallelism(42); env.getStreamGraph().getJobGraph(); Assert.assertEquals(42, operator.getTransformation().getMaxParallelism()); http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index fbbb5d2..5fdacd4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -247,7 +247,6 @@ public class StreamGraphGeneratorTest { public void testSetupOfKeyGroupPartitioner() { int maxParallelism = 42; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setParallelism(12); env.getConfig().setMaxParallelism(maxParallelism); DataStream<Integer> source = env.fromElements(1, 2, 3); @@ -279,7 +278,6 @@ public class StreamGraphGeneratorTest { int keyedResult2MaxParallelism = 17; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setParallelism(12); env.getConfig().setMaxParallelism(globalMaxParallelism); DataStream<Integer> source = env.fromElements(1, 2, 3); @@ -386,7 +384,6 @@ public class StreamGraphGeneratorTest { DataStream<Integer> input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128); DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129); - env.setParallelism(12); env.getConfig().setMaxParallelism(maxParallelism); DataStream<Integer> keyedResult = input1.connect(input2).keyBy( http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 462d3a4..e8ceeba 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -299,8 +299,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( "localhost", cluster.getLeaderRPCPort()); - env.setParallelism(PARALLELISM); env.setMaxParallelism(2 * PARALLELISM); + env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
