Revert "[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()"

This reverts commit f31a55e08ddb7b4bc9e47577a187bac31ad42f4b.

The fixes around FLINK-5808 introduced follow-up issues.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b4dd411
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b4dd411
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b4dd411

Branch: refs/heads/master
Commit: 5b4dd4117d111514590dfb1bd0a1f4bba3db2b9e
Parents: 04aae30
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Apr 4 14:02:37 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Apr 18 17:42:10 2017 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 35 ++++---------
 .../environment/StreamExecutionEnvironment.java |  8 +++
 .../StreamExecutionEnvironmentTest.java         | 54 --------------------
 .../api/graph/StreamGraphGeneratorTest.java     |  3 --
 ...tractEventTimeWindowCheckpointingITCase.java |  2 +-
 5 files changed, 18 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index c18db52..9af9cff 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -84,9 +84,8 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        public static final int PARALLELISM_UNKNOWN = -2;
 
        /**
-        * The default lower bound for max parallelism if nothing was 
configured by the user. We have
-        * this to allow users some degree of scale-up in case they forgot to 
configure maximum
-        * parallelism explicitly.
+        * The default lower bound for max parallelism if nothing was 
configured by the user. We have this so allow users
+        * some degree of scale-up in case they forgot to configure maximum 
parallelism explicitly.
         */
        public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
 
@@ -293,18 +292,13 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         * @param parallelism The parallelism to use
         */
        public ExecutionConfig setParallelism(int parallelism) {
-               checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot 
specify UNKNOWN_PARALLELISM.");
-               checkArgument(
-                               parallelism >= 1 || parallelism == 
PARALLELISM_DEFAULT,
-                               "Parallelism must be at least one, or 
ExecutionConfig.PARALLELISM_DEFAULT " +
-                                               "(use system default).");
-               checkArgument(
-                               maxParallelism == -1 || parallelism <= 
maxParallelism,
-                               "The specified parallelism must be smaller or 
equal to the maximum parallelism.");
-               checkArgument(
-                               maxParallelism == -1 || parallelism != 
PARALLELISM_DEFAULT,
-                               "Default parallelism cannot be specified when 
maximum parallelism is specified");
-               this.parallelism = parallelism;
+               if (parallelism != PARALLELISM_UNKNOWN) {
+                       if (parallelism < 1 && parallelism != 
PARALLELISM_DEFAULT) {
+                               throw new IllegalArgumentException(
+                                       "Parallelism must be at least one, or 
ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
+                       }
+                       this.parallelism = parallelism;
+               }
                return this;
        }
 
@@ -331,18 +325,7 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         */
        @PublicEvolving
        public void setMaxParallelism(int maxParallelism) {
-               checkArgument(
-                               parallelism != PARALLELISM_DEFAULT,
-                               "A maximum parallelism can only be specified 
with an explicitly specified " +
-                                               "parallelism.");
                checkArgument(maxParallelism > 0, "The maximum parallelism must 
be greater than 0.");
-               checkArgument(
-                               maxParallelism >= parallelism,
-                               "The maximum parallelism must be larger than 
the parallelism.");
-               checkArgument(
-                               maxParallelism > 0 && maxParallelism <= 
UPPER_BOUND_MAX_PARALLELISM,
-                               "maxParallelism is out of bounds 0 < 
maxParallelism <= " +
-                                               UPPER_BOUND_MAX_PARALLELISM + 
". Found: " + maxParallelism);
                this.maxParallelism = maxParallelism;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 88db04e..1801315 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -168,6 +168,9 @@ public abstract class StreamExecutionEnvironment {
         * @param parallelism The parallelism
         */
        public StreamExecutionEnvironment setParallelism(int parallelism) {
+               if (parallelism < 1) {
+                       throw new IllegalArgumentException("parallelism must be 
at least one.");
+               }
                config.setParallelism(parallelism);
                return this;
        }
@@ -181,6 +184,11 @@ public abstract class StreamExecutionEnvironment {
         * @param maxParallelism Maximum degree of parallelism to be used for 
the program., with 0 < maxParallelism <= 2^15 - 1
         */
        public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) 
{
+               Preconditions.checkArgument(maxParallelism > 0 &&
+                                               maxParallelism <= 
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
+                               "maxParallelism is out of bounds 0 < 
maxParallelism <= " +
+                                               
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
+
                config.setMaxParallelism(maxParallelism);
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index fd27179..d29c833 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -36,9 +36,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.net.URL;
 import java.util.Arrays;
@@ -54,10 +52,6 @@ import static org.mockito.Mockito.mock;
 
 public class StreamExecutionEnvironmentTest {
 
-       @Rule
-       public final ExpectedException exception = ExpectedException.none();
-
-
        @Test
        public void fromElementsWithBaseTypeTest1() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -162,53 +156,6 @@ public class StreamExecutionEnvironmentTest {
        }
 
        @Test
-       public void testMaxParallelismMustBeBiggerEqualParallelism() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               env.setParallelism(10);
-
-               exception.expect(IllegalArgumentException.class);
-               env.setMaxParallelism(5);
-       }
-
-       @Test
-       public void testParallelismMustBeSmallerEqualMaxParallelism() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               env.setParallelism(10);
-               env.setMaxParallelism(20);
-
-               exception.expect(IllegalArgumentException.class);
-               env.setParallelism(30);
-       }
-
-       @Test
-       public void 
testSetDefaultParallelismNotAllowedWhenMaxParallelismSpecified() {
-               final int defaultParallelism = 20;
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
-
-               env.setParallelism(10);
-               env.setMaxParallelism(15);
-
-               exception.expect(IllegalArgumentException.class);
-               env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
-       }
-
-       @Test
-       public void testSetMaxParallelismNotAllowedWithDefaultParallelism() {
-               final int defaultParallelism = 20;
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
-
-               env.setParallelism(10);
-               env.setMaxParallelism(15);
-
-               exception.expect(IllegalArgumentException.class);
-               env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
-       }
-
-       @Test
        public void testParallelismBounds() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -259,7 +206,6 @@ public class StreamExecutionEnvironmentTest {
                Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
 
                // configured value after generating
-               env.setParallelism(21);
                env.setMaxParallelism(42);
                env.getStreamGraph().getJobGraph();
                Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index fbbb5d2..5fdacd4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -247,7 +247,6 @@ public class StreamGraphGeneratorTest {
        public void testSetupOfKeyGroupPartitioner() {
                int maxParallelism = 42;
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().setParallelism(12);
                env.getConfig().setMaxParallelism(maxParallelism);
 
                DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -279,7 +278,6 @@ public class StreamGraphGeneratorTest {
                int keyedResult2MaxParallelism = 17;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().setParallelism(12);
                env.getConfig().setMaxParallelism(globalMaxParallelism);
 
                DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -386,7 +384,6 @@ public class StreamGraphGeneratorTest {
                DataStream<Integer> input1 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(128);
                DataStream<Integer> input2 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(129);
 
-               env.setParallelism(12);
                env.getConfig().setMaxParallelism(maxParallelism);
 
                DataStream<Integer> keyedResult = input1.connect(input2).keyBy(

http://git-wip-us.apache.org/repos/asf/flink/blob/5b4dd411/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 462d3a4..e8ceeba 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -299,8 +299,8 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
                                        "localhost", 
cluster.getLeaderRPCPort());
 
-                       env.setParallelism(PARALLELISM);
                        env.setMaxParallelism(2 * PARALLELISM);
+                       env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
                        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));

Reply via email to