[FLINK-5808] Move max keygroup constants to ExecutionConfig

We need to have them there if we want to properly test the arguments of
setMaxParallelism() in the ExecutionConfig itself.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3b275f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3b275f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3b275f4

Branch: refs/heads/release-1.2
Commit: d3b275f4b7d49b67013e26d1f29a065d3131c664
Parents: b563f0a
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Mar 10 14:37:26 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Sat Mar 18 07:43:42 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/api/common/ExecutionConfig.java    |  9 +++++++++
 .../runtime/executiongraph/ExecutionJobVertex.java  | 11 ++++++++---
 .../runtime/executiongraph/ExecutionVertex.java     |  4 ++--
 .../runtime/state/KeyGroupRangeAssignment.java      | 16 ++++------------
 .../api/environment/StreamExecutionEnvironment.java |  5 ++---
 .../streaming/api/graph/StreamGraphGenerator.java   |  6 +++---
 6 files changed, 28 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 32ea0a3..14245ed 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -83,6 +83,15 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         */
        public static final int PARALLELISM_UNKNOWN = -2;
 
+       /**
+        * The default lower bound for max parallelism if nothing was 
configured by the user. We have this so allow users
+        * some degree of scale-up in case they forgot to configure maximum 
parallelism explicitly.
+        */
+       public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
+
+       /** The (inclusive) upper bound for max parallelism */
+       public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
+
        private static final long DEFAULT_RESTART_DELAY = 10000L;
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e8664f7..59f9986 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -221,10 +222,14 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
        }
 
        private void setMaxParallelismInternal(int maxParallelism) {
+               if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+                       maxParallelism = 
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM;
+               }
+
                Preconditions.checkArgument(maxParallelism > 0
-                                               && maxParallelism <= 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
-                               "Overriding max parallelism is not in valid 
bounds (1.." +
-                                               
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + 
maxParallelism);
+                                               && maxParallelism <= 
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
+                               "Overriding max parallelism is not in valid 
bounds (1..%s), found: %s",
+                               ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, 
maxParallelism);
 
                this.maxParallelism = maxParallelism;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 09497e3..cde1f6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
@@ -40,7 +41,6 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -609,7 +609,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                                //TODO this case only exists for test, 
currently there has to be exactly one consumer in real jobs!
                                
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
                                                partition,
-                                               
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+                                               
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
                                                lazyScheduling));
                        } else {
                                Preconditions.checkState(1 == consumers.size(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index 62bf3f6..bf0611b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -18,20 +18,12 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 
 public final class KeyGroupRangeAssignment {
 
-       /**
-        * The default lower bound for max parallelism if nothing was 
configured by the user. We have this so allow users
-        * some degree of scale-up in case they forgot to configure maximum 
parallelism explicitly.
-        */
-       public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
-
-       /** The (inclusive) upper bound for max parallelism */
-       public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
-
        private KeyGroupRangeAssignment() {
                throw new AssertionError();
        }
@@ -130,13 +122,13 @@ public final class KeyGroupRangeAssignment {
                return Math.min(
                                Math.max(
                                                
MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
-                                               
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
-                               UPPER_BOUND_MAX_PARALLELISM);
+                                               
ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
+                               ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM);
        }
 
        public static void checkParallelismPreconditions(int parallelism) {
                Preconditions.checkArgument(parallelism > 0
-                                               && parallelism <= 
UPPER_BOUND_MAX_PARALLELISM,
+                                               && parallelism <= 
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
                                "Operator parallelism not within bounds: " + 
parallelism);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 6ac3622..640915c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -48,7 +48,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -185,9 +184,9 @@ public abstract class StreamExecutionEnvironment {
         */
        public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) 
{
                Preconditions.checkArgument(maxParallelism > 0 &&
-                                               maxParallelism <= 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+                                               maxParallelism <= 
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
                                "maxParallelism is out of bounds 0 < 
maxParallelism <= " +
-                                               
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + 
maxParallelism);
+                                               
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
 
                config.setMaxParallelism(maxParallelism);
                return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 333e4f9..e796629 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
@@ -78,8 +78,8 @@ public class StreamGraphGenerator {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamGraphGenerator.class);
 
-       public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 
KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
-       public static final int UPPER_BOUND_MAX_PARALLELISM = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+       public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 
ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
+       public static final int UPPER_BOUND_MAX_PARALLELISM = 
ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM;
 
        // The StreamGraph that is being built, this is initialized at the 
beginning.
        private StreamGraph streamGraph;

Reply via email to