[FLINK-5808] Move max keygroup constants to ExecutionConfig We need to have them there if we want to properly test the arguments of setMaxParallelism() in the ExecutionConfig itself.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3b275f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3b275f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3b275f4 Branch: refs/heads/release-1.2 Commit: d3b275f4b7d49b67013e26d1f29a065d3131c664 Parents: b563f0a Author: Aljoscha Krettek <[email protected]> Authored: Fri Mar 10 14:37:26 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Sat Mar 18 07:43:42 2017 +0100 ---------------------------------------------------------------------- .../apache/flink/api/common/ExecutionConfig.java | 9 +++++++++ .../runtime/executiongraph/ExecutionJobVertex.java | 11 ++++++++--- .../runtime/executiongraph/ExecutionVertex.java | 4 ++-- .../runtime/state/KeyGroupRangeAssignment.java | 16 ++++------------ .../api/environment/StreamExecutionEnvironment.java | 5 ++--- .../streaming/api/graph/StreamGraphGenerator.java | 6 +++--- 6 files changed, 28 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 32ea0a3..14245ed 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -83,6 +83,15 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut */ public static final int PARALLELISM_UNKNOWN = -2; + /** + * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users + * some degree of scale-up in case they forgot to configure maximum parallelism explicitly. + */ + public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7; + + /** The (inclusive) upper bound for max parallelism */ + public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15; + private static final long DEFAULT_RESTART_DELAY = 10000L; // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index e8664f7..59f9986 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.Archiveable; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -221,10 +222,14 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable } private void setMaxParallelismInternal(int maxParallelism) { + if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { + maxParallelism = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM; + } + Preconditions.checkArgument(maxParallelism > 0 - && maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, - "Overriding max parallelism is not in valid bounds (1.." + - KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + maxParallelism); + && maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, + "Overriding max parallelism is not in valid bounds (1..%s), found: %s", + ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, maxParallelism); this.maxParallelism = maxParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 09497e3..cde1f6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.Archiveable; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.JobException; @@ -40,7 +41,6 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EvictingBoundedList; @@ -609,7 +609,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi //TODO this case only exists for test, currently there has to be exactly one consumer in real jobs! producedPartitions.add(ResultPartitionDeploymentDescriptor.from( partition, - KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, lazyScheduling)); } else { Preconditions.checkState(1 == consumers.size(), http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java index 62bf3f6..bf0611b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java @@ -18,20 +18,12 @@ package org.apache.flink.runtime.state; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; public final class KeyGroupRangeAssignment { - /** - * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users - * some degree of scale-up in case they forgot to configure maximum parallelism explicitly. - */ - public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7; - - /** The (inclusive) upper bound for max parallelism */ - public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15; - private KeyGroupRangeAssignment() { throw new AssertionError(); } @@ -130,13 +122,13 @@ public final class KeyGroupRangeAssignment { return Math.min( Math.max( MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)), - DEFAULT_LOWER_BOUND_MAX_PARALLELISM), - UPPER_BOUND_MAX_PARALLELISM); + ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM), + ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM); } public static void checkParallelismPreconditions(int parallelism) { Preconditions.checkArgument(parallelism > 0 - && parallelism <= UPPER_BOUND_MAX_PARALLELISM, + && parallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, "Operator parallelism not within bounds: " + parallelism); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 6ac3622..640915c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -48,7 +48,6 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -185,9 +184,9 @@ public abstract class StreamExecutionEnvironment { */ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) { Preconditions.checkArgument(maxParallelism > 0 && - maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, "maxParallelism is out of bounds 0 < maxParallelism <= " + - KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); + ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); config.setMaxParallelism(maxParallelism); return this; http://git-wip-us.apache.org/repos/asf/flink/blob/d3b275f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 333e4f9..e796629 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation; @@ -78,8 +78,8 @@ public class StreamGraphGenerator { private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class); - public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; - public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; + public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; + public static final int UPPER_BOUND_MAX_PARALLELISM = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM; // The StreamGraph that is being built, this is initialized at the beginning. private StreamGraph streamGraph;
