Revert "[FLINK-5808] Move max keygroup constants to ExecutionConfig"
This reverts commit d3b275f4b7d49b67013e26d1f29a065d3131c664. This fix was causing more problems than it was solving. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd98e8b8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd98e8b8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd98e8b8 Branch: refs/heads/release-1.2 Commit: fd98e8b8c059b54f82d80d251b72b80459f8fee5 Parents: 04a1d6b Author: Aljoscha Krettek <[email protected]> Authored: Mon Apr 3 18:40:05 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Apr 4 13:42:17 2017 +0200 ---------------------------------------------------------------------- .../apache/flink/api/common/ExecutionConfig.java | 9 --------- .../runtime/executiongraph/ExecutionJobVertex.java | 11 +++-------- .../runtime/executiongraph/ExecutionVertex.java | 4 ++-- .../runtime/state/KeyGroupRangeAssignment.java | 16 ++++++++++++---- .../api/environment/StreamExecutionEnvironment.java | 5 +++-- .../streaming/api/graph/StreamGraphGenerator.java | 6 +++--- 6 files changed, 23 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 14245ed..32ea0a3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -83,15 +83,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut */ public static final int PARALLELISM_UNKNOWN = -2; - /** - * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users - * some degree of scale-up in case they forgot to configure maximum parallelism explicitly. - */ - public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7; - - /** The (inclusive) upper bound for max parallelism */ - public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15; - private static final long DEFAULT_RESTART_DELAY = 10000L; // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 59f9986..e8664f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.Archiveable; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -222,14 +221,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable } private void setMaxParallelismInternal(int maxParallelism) { - if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { - maxParallelism = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM; - } - Preconditions.checkArgument(maxParallelism > 0 - && maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, - "Overriding max parallelism is not in valid bounds (1..%s), found: %s", - ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, maxParallelism); + && maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + "Overriding max parallelism is not in valid bounds (1.." + + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + maxParallelism); this.maxParallelism = maxParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index cde1f6c..09497e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.Archiveable; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.JobException; @@ -41,6 +40,7 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EvictingBoundedList; @@ -609,7 +609,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi //TODO this case only exists for test, currently there has to be exactly one consumer in real jobs! producedPartitions.add(ResultPartitionDeploymentDescriptor.from( partition, - ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, lazyScheduling)); } else { Preconditions.checkState(1 == consumers.size(), http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java index bf0611b..62bf3f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java @@ -18,12 +18,20 @@ package org.apache.flink.runtime.state; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; public final class KeyGroupRangeAssignment { + /** + * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users + * some degree of scale-up in case they forgot to configure maximum parallelism explicitly. + */ + public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7; + + /** The (inclusive) upper bound for max parallelism */ + public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15; + private KeyGroupRangeAssignment() { throw new AssertionError(); } @@ -122,13 +130,13 @@ public final class KeyGroupRangeAssignment { return Math.min( Math.max( MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)), - ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM), - ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM); + DEFAULT_LOWER_BOUND_MAX_PARALLELISM), + UPPER_BOUND_MAX_PARALLELISM); } public static void checkParallelismPreconditions(int parallelism) { Preconditions.checkArgument(parallelism > 0 - && parallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, + && parallelism <= UPPER_BOUND_MAX_PARALLELISM, "Operator parallelism not within bounds: " + parallelism); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 640915c..6ac3622 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -48,6 +48,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -184,9 +185,9 @@ public abstract class StreamExecutionEnvironment { */ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) { Preconditions.checkArgument(maxParallelism > 0 && - maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, + maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, "maxParallelism is out of bounds 0 < maxParallelism <= " + - ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); config.setMaxParallelism(maxParallelism); return this; http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index e796629..333e4f9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation; @@ -78,8 +78,8 @@ public class StreamGraphGenerator { private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class); - public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; - public static final int UPPER_BOUND_MAX_PARALLELISM = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM; + public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; + public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; // The StreamGraph that is being built, this is initialized at the beginning. private StreamGraph streamGraph;
