[FLINK-5473] Limit max parallelism to 1 for non-parallel operators [FLINK-5473] Better default behaviours for unspecified maximum parallelism
This closes #3182. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/993a2e2f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/993a2e2f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/993a2e2f Branch: refs/heads/release-1.2 Commit: 993a2e2fa0ceecff0979a267ace7cd7b8e05d359 Parents: 908376b Author: Stefan Richter <[email protected]> Authored: Mon Jan 16 14:31:22 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Jan 24 09:49:52 2017 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 4 +- .../checkpoint/StateAssignmentOperation.java | 285 ++++++++++--------- .../checkpoint/savepoint/SavepointLoader.java | 8 +- .../ResultPartitionDeploymentDescriptor.java | 18 +- .../runtime/executiongraph/ExecutionGraph.java | 8 +- .../executiongraph/ExecutionJobVertex.java | 115 ++++++-- .../runtime/executiongraph/ExecutionVertex.java | 31 +- .../api/writer/ResultPartitionWriter.java | 4 + .../io/network/partition/ResultPartition.java | 8 + .../flink/runtime/jobgraph/JobVertex.java | 7 +- .../runtime/state/KeyGroupRangeAssignment.java | 42 ++- .../apache/flink/runtime/taskmanager/Task.java | 1 + .../checkpoint/CheckpointCoordinatorTest.java | 1 + .../savepoint/SavepointLoaderTest.java | 1 + ...ResultPartitionDeploymentDescriptorTest.java | 1 + .../executiongraph/ExecutionJobVertexTest.java | 140 +++++++++ .../ExecutionVertexDeploymentTest.java | 27 +- .../network/partition/ResultPartitionTest.java | 1 + .../consumer/LocalInputChannelTest.java | 1 + .../runtime/jobmanager/JobManagerTest.java | 2 + .../runtime/taskmanager/TaskManagerTest.java | 5 +- .../streaming/api/datastream/KeyedStream.java | 13 +- .../datastream/SingleOutputStreamOperator.java | 29 +- .../environment/StreamExecutionEnvironment.java | 12 +- .../flink/streaming/api/graph/StreamGraph.java | 37 +-- .../api/graph/StreamGraphGenerator.java | 26 +- .../flink/streaming/api/graph/StreamNode.java | 2 - .../api/graph/StreamingJobGraphGenerator.java | 13 +- .../transformations/StreamTransformation.java | 5 + .../partitioner/KeyGroupStreamPartitioner.java | 1 + .../streaming/runtime/tasks/OperatorChain.java | 10 + .../api/StreamExecutionEnvironmentTest.java | 119 +++++++- .../api/graph/StreamGraphGeneratorTest.java | 9 +- .../test/checkpointing/RescalingITCase.java | 25 +- 34 files changed, 723 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 9132897..78cad91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -885,8 +885,10 @@ public class CheckpointCoordinator { LOG.info("Restoring from latest valid checkpoint: {}.", latest); + final Map<JobVertexID, TaskState> taskStates = latest.getTaskStates(); + StateAssignmentOperation stateAssignmentOperation = - new StateAssignmentOperation(LOG, tasks, latest, allowNonRestoredState); + new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState); stateAssignmentOperation.assignStates(); http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index 6c23f02..6d075db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -45,29 +45,34 @@ public class StateAssignmentOperation { private final Logger logger; private final Map<JobVertexID, ExecutionJobVertex> tasks; - private final CompletedCheckpoint latest; + private final Map<JobVertexID, TaskState> taskStates; private final boolean allowNonRestoredState; public StateAssignmentOperation( Logger logger, Map<JobVertexID, ExecutionJobVertex> tasks, - CompletedCheckpoint latest, + Map<JobVertexID, TaskState> taskStates, boolean allowNonRestoredState) { - this.logger = logger; - this.tasks = tasks; - this.latest = latest; + this.logger = Preconditions.checkNotNull(logger); + this.tasks = Preconditions.checkNotNull(tasks); + this.taskStates = Preconditions.checkNotNull(taskStates); this.allowNonRestoredState = allowNonRestoredState; } public boolean assignStates() throws Exception { + // this tracks if we find missing node hash ids and already use secondary mappings boolean expandedToLegacyIds = false; + Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks; - for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) { + for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) { TaskState taskState = taskGroupStateEntry.getValue(); + + //----------------------------------------find vertex for state--------------------------------------------- + ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, @@ -89,8 +94,31 @@ public class StateAssignmentOperation { } } - // check that the number of key groups have not changed - if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { + checkParallelismPreconditions(taskState, executionJobVertex); + + assignTaskStatesToOperatorInstances(taskState, executionJobVertex); + } + + return true; + } + + private void checkParallelismPreconditions(TaskState taskState, ExecutionJobVertex executionJobVertex) { + //----------------------------------------max parallelism preconditions------------------------------------- + + // check that the number of key groups have not changed or if we need to override it to satisfy the restored state + if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { + + if (!executionJobVertex.isMaxParallelismConfigured()) { + // if the max parallelism was not explicitly specified by the user, we derive it from the state + + if (logger.isDebugEnabled()) { + logger.debug("Overriding maximum parallelism for JobVertex " + executionJobVertex.getJobVertexId() + + " from " + executionJobVertex.getMaxParallelism() + " to " + taskState.getMaxParallelism()); + } + + executionJobVertex.setMaxParallelism(taskState.getMaxParallelism()); + } else { + // if the max parallelism was explicitly specified, we complain on mismatch throw new IllegalStateException("The maximum parallelism (" + taskState.getMaxParallelism() + ") with which the latest " + "checkpoint of the execution job vertex " + executionJobVertex + @@ -98,159 +126,162 @@ public class StateAssignmentOperation { executionJobVertex.getMaxParallelism() + ") changed. This " + "is currently not supported."); } + } - //------------------------------------------------------------------- + //----------------------------------------parallelism preconditions----------------------------------------- - final int oldParallelism = taskState.getParallelism(); - final int newParallelism = executionJobVertex.getParallelism(); - final boolean parallelismChanged = oldParallelism != newParallelism; - final boolean hasNonPartitionedState = taskState.hasNonPartitionedState(); + final int oldParallelism = taskState.getParallelism(); + final int newParallelism = executionJobVertex.getParallelism(); - if (hasNonPartitionedState && parallelismChanged) { - throw new IllegalStateException("Cannot restore the latest checkpoint because " + - "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + - "state and its parallelism changed. The operator " + executionJobVertex.getJobVertexId() + - " has parallelism " + newParallelism + " whereas the corresponding " + - "state object has a parallelism of " + oldParallelism); - } + if (taskState.hasNonPartitionedState() && (oldParallelism != newParallelism)) { + throw new IllegalStateException("Cannot restore the latest checkpoint because " + + "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + + "state and its parallelism changed. The operator " + executionJobVertex.getJobVertexId() + + " has parallelism " + newParallelism + " whereas the corresponding " + + "state object has a parallelism of " + oldParallelism); + } + } - List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions( - executionJobVertex.getMaxParallelism(), - newParallelism); + private static void assignTaskStatesToOperatorInstances( + TaskState taskState, ExecutionJobVertex executionJobVertex) { - final int chainLength = taskState.getChainLength(); + final int oldParallelism = taskState.getParallelism(); + final int newParallelism = executionJobVertex.getParallelism(); - // operator chain idx -> list of the stored op states from all parallel instances for this chain idx - @SuppressWarnings("unchecked") - List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength]; - @SuppressWarnings("unchecked") - List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength]; + List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions( + executionJobVertex.getMaxParallelism(), + newParallelism); - List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism); - List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism); + final int chainLength = taskState.getChainLength(); - for (int p = 0; p < oldParallelism; ++p) { - SubtaskState subtaskState = taskState.getState(p); + // operator chain idx -> list of the stored op states from all parallel instances for this chain idx + @SuppressWarnings("unchecked") + List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength]; + @SuppressWarnings("unchecked") + List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength]; - if (null != subtaskState) { - collectParallelStatesByChainOperator( - parallelOpStatesBackend, subtaskState.getManagedOperatorState()); + List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism); + List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism); - collectParallelStatesByChainOperator( - parallelOpStatesStream, subtaskState.getRawOperatorState()); + for (int p = 0; p < oldParallelism; ++p) { + SubtaskState subtaskState = taskState.getState(p); - KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); - if (null != keyedStateBackend) { - parallelKeyedStatesBackend.add(keyedStateBackend); - } + if (null != subtaskState) { + collectParallelStatesByChainOperator( + parallelOpStatesBackend, subtaskState.getManagedOperatorState()); - KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState(); - if (null != keyedStateStream) { - parallelKeyedStateStream.add(keyedStateStream); - } + collectParallelStatesByChainOperator( + parallelOpStatesStream, subtaskState.getRawOperatorState()); + + KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); + if (null != keyedStateBackend) { + parallelKeyedStatesBackend.add(keyedStateBackend); + } + + KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState(); + if (null != keyedStateStream) { + parallelKeyedStateStream.add(keyedStateStream); } } + } - // operator chain index -> lists with collected states (one collection for each parallel subtasks) - @SuppressWarnings("unchecked") - List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength]; + // operator chain index -> lists with collected states (one collection for each parallel subtasks) + @SuppressWarnings("unchecked") + List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength]; - @SuppressWarnings("unchecked") - List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength]; + @SuppressWarnings("unchecked") + List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength]; - //TODO here we can employ different redistribution strategies for state, e.g. union state. - // For now we only offer round robin as the default. - OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE; + //TODO here we can employ different redistribution strategies for state, e.g. union state. + // For now we only offer round robin as the default. + OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE; - for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) { + for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) { - List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx]; - List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx]; + List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx]; + List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx]; - partitionedParallelStatesBackend[chainIdx] = applyRepartitioner( - opStateRepartitioner, - chainOpParallelStatesBackend, - oldParallelism, - newParallelism); + partitionedParallelStatesBackend[chainIdx] = applyRepartitioner( + opStateRepartitioner, + chainOpParallelStatesBackend, + oldParallelism, + newParallelism); - partitionedParallelStatesStream[chainIdx] = applyRepartitioner( - opStateRepartitioner, - chainOpParallelStatesStream, - oldParallelism, - newParallelism); - } + partitionedParallelStatesStream[chainIdx] = applyRepartitioner( + opStateRepartitioner, + chainOpParallelStatesStream, + oldParallelism, + newParallelism); + } - for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) { - // non-partitioned state - ChainedStateHandle<StreamStateHandle> nonPartitionableState = null; + for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) { + // non-partitioned state + ChainedStateHandle<StreamStateHandle> nonPartitionableState = null; - if (!parallelismChanged) { - if (taskState.getState(subTaskIdx) != null) { - nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState(); - } + if (oldParallelism == newParallelism) { + if (taskState.getState(subTaskIdx) != null) { + nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState(); } + } - // partitionable state - @SuppressWarnings("unchecked") - Collection<OperatorStateHandle>[] iab = new Collection[chainLength]; - @SuppressWarnings("unchecked") - Collection<OperatorStateHandle>[] ias = new Collection[chainLength]; - List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab); - List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias); - - for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) { - List<Collection<OperatorStateHandle>> redistributedOpStateBackend = - partitionedParallelStatesBackend[chainIdx]; + // partitionable state + @SuppressWarnings("unchecked") + Collection<OperatorStateHandle>[] iab = new Collection[chainLength]; + @SuppressWarnings("unchecked") + Collection<OperatorStateHandle>[] ias = new Collection[chainLength]; + List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab); + List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias); - List<Collection<OperatorStateHandle>> redistributedOpStateStream = - partitionedParallelStatesStream[chainIdx]; + for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) { + List<Collection<OperatorStateHandle>> redistributedOpStateBackend = + partitionedParallelStatesBackend[chainIdx]; - if (redistributedOpStateBackend != null) { - operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx)); - } + List<Collection<OperatorStateHandle>> redistributedOpStateStream = + partitionedParallelStatesStream[chainIdx]; - if (redistributedOpStateStream != null) { - operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx)); - } + if (redistributedOpStateBackend != null) { + operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx)); } - Execution currentExecutionAttempt = executionJobVertex - .getTaskVertices()[subTaskIdx] - .getCurrentExecutionAttempt(); + if (redistributedOpStateStream != null) { + operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx)); + } + } - List<KeyGroupsStateHandle> newKeyedStatesBackend; - List<KeyGroupsStateHandle> newKeyedStateStream; - if (parallelismChanged) { - KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx); - newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds); - newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds); + Execution currentExecutionAttempt = executionJobVertex + .getTaskVertices()[subTaskIdx] + .getCurrentExecutionAttempt(); + + List<KeyGroupsStateHandle> newKeyedStatesBackend; + List<KeyGroupsStateHandle> newKeyedStateStream; + if (oldParallelism == newParallelism) { + SubtaskState subtaskState = taskState.getState(subTaskIdx); + if (subtaskState != null) { + KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState(); + KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState(); + newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList( + oldKeyedStatesBackend) : null; + newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList( + oldKeyedStatesStream) : null; } else { - SubtaskState subtaskState = taskState.getState(subTaskIdx); - if (subtaskState != null) { - KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState(); - KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState(); - newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList( - oldKeyedStatesBackend) : null; - newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList( - oldKeyedStatesStream) : null; - } else { - newKeyedStatesBackend = null; - newKeyedStateStream = null; - } + newKeyedStatesBackend = null; + newKeyedStateStream = null; } + } else { + KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx); + newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds); + newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds); + } - TaskStateHandles taskStateHandles = new TaskStateHandles( - nonPartitionableState, - operatorStateFromBackend, - operatorStateFromStream, - newKeyedStatesBackend, - newKeyedStateStream); + TaskStateHandles taskStateHandles = new TaskStateHandles( + nonPartitionableState, + operatorStateFromBackend, + operatorStateFromStream, + newKeyedStatesBackend, + newKeyedStateStream); - currentExecutionAttempt.setInitialState(taskStateHandles); - } + currentExecutionAttempt.setInitialState(taskStateHandles); } - - return true; } /** @@ -298,7 +329,7 @@ public class StateAssignmentOperation { /** * @param chainParallelOpStates array = chain ops, array[idx] = parallel states for this chain op. - * @param chainOpState + * @param chainOpState the operator chain */ private static void collectParallelStatesByChainOperator( List<OperatorStateHandle>[] chainParallelOpStates, ChainedStateHandle<OperatorStateHandle> chainOpState) { @@ -359,4 +390,4 @@ public class StateAssignmentOperation { return repackStream; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java index d6be482..950a9a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java @@ -81,10 +81,11 @@ public class SavepointLoader { } if (executionJobVertex != null) { - if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()) { + + if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism() + || !executionJobVertex.isMaxParallelismConfigured()) { taskStates.put(taskState.getJobVertexID(), taskState); - } - else { + } else { String msg = String.format("Failed to rollback to savepoint %s. " + "Max parallelism mismatch between savepoint state and new program. " + "Cannot map operator %s with max parallelism %d to new program with " + @@ -106,6 +107,7 @@ public class SavepointLoader { "you want to allow to skip this, you can set the --allowNonRestoredState " + "option on the CLI.", savepointPath, taskState.getJobVertexID()); + throw new IllegalStateException(msg); } } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index 14c7d2a..061f925 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import java.io.Serializable; @@ -36,6 +37,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ResultPartitionDeploymentDescriptor implements Serializable { + private static final long serialVersionUID = 6343547936086963705L; + /** The ID of the result this partition belongs to. */ private final IntermediateDataSetID resultId; @@ -47,6 +50,9 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { /** The number of subpartitions. */ private final int numberOfSubpartitions; + + /** The maximum parallelism */ + private final int maxParallelism; /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ private final boolean sendScheduleOrUpdateConsumersMessage; @@ -56,14 +62,17 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { IntermediateResultPartitionID partitionId, ResultPartitionType partitionType, int numberOfSubpartitions, + int maxParallelism, boolean lazyScheduling) { this.resultId = checkNotNull(resultId); this.partitionId = checkNotNull(partitionId); this.partitionType = checkNotNull(partitionType); + KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism); checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; + this.maxParallelism = maxParallelism; this.sendScheduleOrUpdateConsumersMessage = lazyScheduling; } @@ -83,6 +92,10 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { return numberOfSubpartitions; } + public int getMaxParallelism() { + return maxParallelism; + } + public boolean sendScheduleOrUpdateConsumersMessage() { return sendScheduleOrUpdateConsumersMessage; } @@ -96,7 +109,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { // ------------------------------------------------------------------------ - public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition, boolean lazyScheduling) { + public static ResultPartitionDeploymentDescriptor from( + IntermediateResultPartition partition, int maxParallelism, boolean lazyScheduling) { final IntermediateDataSetID resultId = partition.getIntermediateResult().getId(); final IntermediateResultPartitionID partitionId = partition.getPartitionId(); @@ -118,6 +132,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { } return new ResultPartitionDeploymentDescriptor( - resultId, partitionId, partitionType, numberOfSubpartitions, lazyScheduling); + resultId, partitionId, partitionType, numberOfSubpartitions, maxParallelism, lazyScheduling); } } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index f31eada..b9e6c84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -663,12 +663,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } // create the execution job vertex and attach it to the graph - ExecutionJobVertex ejv = null; - try { - ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp); - } catch (IOException e) { - throw new JobException("Could not create a execution job vertex for " + jobVertex.getID() + '.', e); - } + ExecutionJobVertex ejv = + new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp); ejv.connectToPredecessors(this.intermediateResults); ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index fbab572..e8664f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -41,7 +41,7 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; @@ -57,8 +57,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable /** Use the same log for all ExecutionGraph classes */ private static final Logger LOG = ExecutionGraph.LOG; - - private final SerializableObject stateMonitor = new SerializableObject(); + + public static final int VALUE_NOT_SET = -1; + + private final Object stateMonitor = new Object(); private final ExecutionGraph graph; @@ -66,30 +68,32 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable private final ExecutionVertex[] taskVertices; - private IntermediateResult[] producedDataSets; + private final IntermediateResult[] producedDataSets; private final List<IntermediateResult> inputs; private final int parallelism; - private final int maxParallelism; - private final boolean[] finishedSubtasks; - - private volatile int numSubtasksInFinalState; - + private final SlotSharingGroup slotSharingGroup; - + private final CoLocationGroup coLocationGroup; - + private final InputSplit[] inputSplits; + private final boolean maxParallelismConfigured; + + private int maxParallelism; + + private volatile int numSubtasksInFinalState; + /** * Serialized task information which is for all sub tasks the same. Thus, it avoids to * serialize the same information multiple times in order to create the * TaskDeploymentDescriptors. */ - private final SerializedValue<TaskInformation> serializedTaskInformation; + private SerializedValue<TaskInformation> serializedTaskInformation; private InputSplitAssigner splitAssigner; @@ -97,7 +101,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, - Time timeout) throws JobException, IOException { + Time timeout) throws JobException { this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis()); } @@ -107,7 +111,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable JobVertex jobVertex, int defaultParallelism, Time timeout, - long createTimestamp) throws JobException, IOException { + long createTimestamp) throws JobException { if (graph == null || jobVertex == null) { throw new NullPointerException(); @@ -121,24 +125,19 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable this.parallelism = numTaskVertices; - int maxP = jobVertex.getMaxParallelism(); + final int configuredMaxParallelism = jobVertex.getMaxParallelism(); + + this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism); - Preconditions.checkArgument(maxP >= parallelism, "The maximum parallelism (" + - maxP + ") must be greater or equal than the parallelism (" + parallelism + - ")."); - this.maxParallelism = maxP; + // if no max parallelism was configured by the user, we calculate and set a default + setMaxParallelismInternal(maxParallelismConfigured ? + configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism)); - this.serializedTaskInformation = new SerializedValue<>(new TaskInformation( - jobVertex.getID(), - jobVertex.getName(), - parallelism, - maxParallelism, - jobVertex.getInvokableClassName(), - jobVertex.getConfiguration())); + this.serializedTaskInformation = null; this.taskVertices = new ExecutionVertex[numTaskVertices]; - this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size()); + this.inputs = new ArrayList<>(jobVertex.getInputs().size()); // take the sharing group this.slotSharingGroup = jobVertex.getSlotSharingGroup(); @@ -212,6 +211,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable finishedSubtasks = new boolean[parallelism]; } + public void setMaxParallelism(int maxParallelismDerived) { + + Preconditions.checkState(!maxParallelismConfigured, + "Attempt to override a configured max parallelism. Configured: " + this.maxParallelism + + ", argument: " + maxParallelismDerived); + + setMaxParallelismInternal(maxParallelismDerived); + } + + private void setMaxParallelismInternal(int maxParallelism) { + Preconditions.checkArgument(maxParallelism > 0 + && maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + "Overriding max parallelism is not in valid bounds (1.." + + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + maxParallelism); + + this.maxParallelism = maxParallelism; + } + public ExecutionGraph getGraph() { return graph; } @@ -235,6 +252,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable return maxParallelism; } + public boolean isMaxParallelismConfigured() { + return maxParallelismConfigured; + } + public JobID getJobId() { return graph.getJobID(); } @@ -269,24 +290,56 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable return inputs; } - public SerializedValue<TaskInformation> getSerializedTaskInformation() { + public SerializedValue<TaskInformation> getSerializedTaskInformation() throws IOException { + + if (null == serializedTaskInformation) { + + int parallelism = getParallelism(); + int maxParallelism = getMaxParallelism(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Creating task information for " + generateDebugString()); + } + + serializedTaskInformation = new SerializedValue<>( + new TaskInformation( + jobVertex.getID(), + jobVertex.getName(), + parallelism, + maxParallelism, + jobVertex.getInvokableClassName(), + jobVertex.getConfiguration())); + } + return serializedTaskInformation; } - + public boolean isInFinalState() { return numSubtasksInFinalState == parallelism; } - + @Override public ExecutionState getAggregateState() { int[] num = new int[ExecutionState.values().length]; for (ExecutionVertex vertex : this.taskVertices) { num[vertex.getExecutionState().ordinal()]++; } - + return getAggregateJobVertexState(num, parallelism); } + private String generateDebugString() { + + return "ExecutionJobVertex" + + "(" + jobVertex.getName() + " | " + jobVertex.getID() + ")" + + "{" + + "parallelism=" + parallelism + + ", maxParallelism=" + getMaxParallelism() + + ", maxParallelismConfigured=" + maxParallelismConfigured + + '}'; + } + + //--------------------------------------------------------------------------------------------- public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException { http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 5cbd1c1..09497e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -40,13 +40,16 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EvictingBoundedList; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -599,7 +602,24 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment(); for (IntermediateResultPartition partition : resultPartitions.values()) { - producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling)); + + List<List<ExecutionEdge>> consumers = partition.getConsumers(); + + if (consumers.isEmpty()) { + //TODO this case only exists for test, currently there has to be exactly one consumer in real jobs! + producedPartitions.add(ResultPartitionDeploymentDescriptor.from( + partition, + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + lazyScheduling)); + } else { + Preconditions.checkState(1 == consumers.size(), + "Only one consumer supported in the current implementation! Found: " + consumers.size()); + + List<ExecutionEdge> consumer = consumers.get(0); + ExecutionJobVertex vertex = consumer.get(0).getTarget().getJobVertex(); + int maxParallelism = vertex.getMaxParallelism(); + producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling)); + } } @@ -620,7 +640,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi } SerializedValue<JobInformation> serializedJobInformation = getExecutionGraph().getSerializedJobInformation(); - SerializedValue<TaskInformation> serializedJobVertexInformation = jobVertex.getSerializedTaskInformation(); + SerializedValue<TaskInformation> serializedJobVertexInformation = null; + + try { + serializedJobVertexInformation = jobVertex.getSerializedTaskInformation(); + } catch (IOException e) { + throw new ExecutionGraphException( + "Could not create a serialized JobVertexInformation for " + jobVertex.getJobVertexId(), e); + } return new TaskDeploymentDescriptor( serializedJobInformation, http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index cfab34d..2a1bed8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -63,6 +63,10 @@ public class ResultPartitionWriter implements EventListener<TaskEvent> { return partition.getNumberOfSubpartitions(); } + public int getNumTargetKeyGroups() { + return partition.getNumTargetKeyGroups(); + } + // ------------------------------------------------------------------------ // Data processing // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 474c25c..3d92584 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -95,6 +95,8 @@ public class ResultPartition implements BufferPoolOwner { private final ResultPartitionConsumableNotifier partitionConsumableNotifier; + public final int numTargetKeyGroups; + private final boolean sendScheduleOrUpdateConsumersMessage; // - Runtime state -------------------------------------------------------- @@ -131,6 +133,7 @@ public class ResultPartition implements BufferPoolOwner { ResultPartitionID partitionId, ResultPartitionType partitionType, int numberOfSubpartitions, + int numTargetKeyGroups, ResultPartitionManager partitionManager, ResultPartitionConsumableNotifier partitionConsumableNotifier, IOManager ioManager, @@ -142,6 +145,7 @@ public class ResultPartition implements BufferPoolOwner { this.partitionId = checkNotNull(partitionId); this.partitionType = checkNotNull(partitionType); this.subpartitions = new ResultSubpartition[numberOfSubpartitions]; + this.numTargetKeyGroups = numTargetKeyGroups; this.partitionManager = checkNotNull(partitionManager); this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier); this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage; @@ -356,6 +360,10 @@ public class ResultPartition implements BufferPoolOwner { return cause; } + public int getNumTargetKeyGroups() { + return numTargetKeyGroups; + } + /** * Releases buffers held by this result partition. * http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index d24100e..9dcaeeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -40,7 +40,6 @@ public class JobVertex implements java.io.Serializable { private static final String DEFAULT_NAME = "(unnamed vertex)"; - // -------------------------------------------------------------------------------------------- // Members that define the structure / topology of the graph // -------------------------------------------------------------------------------------------- @@ -60,7 +59,7 @@ public class JobVertex implements java.io.Serializable { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; /** Maximum number of subtasks to split this taks into a runtime. */ - private int maxParallelism = Short.MAX_VALUE; + private int maxParallelism = -1; /** Custom configuration passed to the assigned task at runtime. */ private Configuration configuration; @@ -276,10 +275,6 @@ public class JobVertex implements java.io.Serializable { * @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE. */ public void setMaxParallelism(int maxParallelism) { - org.apache.flink.util.Preconditions.checkArgument( - maxParallelism > 0 && maxParallelism <= (1 << 15), - "The max parallelism must be at least 1 and smaller than 2^15."); - this.maxParallelism = maxParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java index 894f721..62bf3f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java @@ -23,7 +23,14 @@ import org.apache.flink.util.Preconditions; public final class KeyGroupRangeAssignment { - public static final int DEFAULT_MAX_PARALLELISM = 128; + /** + * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users + * some degree of scale-up in case they forgot to configure maximum parallelism explicitly. + */ + public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7; + + /** The (inclusive) upper bound for max parallelism */ + public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15; private KeyGroupRangeAssignment() { throw new AssertionError(); @@ -79,9 +86,12 @@ public final class KeyGroupRangeAssignment { int maxParallelism, int parallelism, int operatorIndex) { - Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); - Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); - Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + checkParallelismPreconditions(parallelism); + checkParallelismPreconditions(maxParallelism); + + Preconditions.checkArgument(maxParallelism >= parallelism, + "Maximum parallelism must not be smaller than parallelism."); int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; @@ -105,4 +115,28 @@ public final class KeyGroupRangeAssignment { public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { return keyGroupId * parallelism / maxParallelism; } + + /** + * Computes a default maximum parallelism from the operator parallelism. This is used in case the user has not + * explicitly configured a maximum parallelism to still allow a certain degree of scale-up. + * + * @param operatorParallelism the operator parallelism as basis for computation. + * @return the computed default maximum parallelism. + */ + public static int computeDefaultMaxParallelism(int operatorParallelism) { + + checkParallelismPreconditions(operatorParallelism); + + return Math.min( + Math.max( + MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)), + DEFAULT_LOWER_BOUND_MAX_PARALLELISM), + UPPER_BOUND_MAX_PARALLELISM); + } + + public static void checkParallelismPreconditions(int parallelism) { + Preconditions.checkArgument(parallelism > 0 + && parallelism <= UPPER_BOUND_MAX_PARALLELISM, + "Operator parallelism not within bounds: " + parallelism); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index e945b93..3c57e3f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -343,6 +343,7 @@ public class Task implements Runnable, TaskActions { partitionId, desc.getPartitionType(), desc.getNumberOfSubpartitions(), + desc.getMaxParallelism(), networkEnvironment.getResultPartitionManager(), resultPartitionConsumableNotifier, ioManager, http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index ca9dbc2..6ba557b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2607,6 +2607,7 @@ public class CheckpointCoordinatorTest { when(executionJobVertex.getTaskVertices()).thenReturn(executionVertices); when(executionJobVertex.getParallelism()).thenReturn(parallelism); when(executionJobVertex.getMaxParallelism()).thenReturn(maxParallelism); + when(executionJobVertex.isMaxParallelismConfigured()).thenReturn(true); return executionJobVertex; } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java index 67575d6..6471d6f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java @@ -87,6 +87,7 @@ public class SavepointLoaderTest { // 2) Load and validate: max parallelism mismatch when(vertex.getMaxParallelism()).thenReturn(222); + when(vertex.isMaxParallelismConfigured()).thenReturn(true); try { SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false); http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 3ed8236..aac2e13 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -46,6 +46,7 @@ public class ResultPartitionDeploymentDescriptorTest { partitionId, partitionType, numberOfSubpartitions, + numberOfSubpartitions, true); ResultPartitionDeploymentDescriptor copy = http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java new file mode 100644 index 0000000..f0f6248 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ExecutionJobVertexTest { + + private static final int NOT_CONFIGURED = -1; + + @Test + public void testMaxParallelismDefaulting() throws Exception { + + // default minimum + ExecutionJobVertex executionJobVertex = createExecutionJobVertex(1, NOT_CONFIGURED); + Assert.assertEquals(128, executionJobVertex.getMaxParallelism()); + + // test round up part 1 + executionJobVertex = createExecutionJobVertex(171, NOT_CONFIGURED); + Assert.assertEquals(256, executionJobVertex.getMaxParallelism()); + + // test round up part 2 + executionJobVertex = createExecutionJobVertex(172, NOT_CONFIGURED); + Assert.assertEquals(512, executionJobVertex.getMaxParallelism()); + + // test round up limit + executionJobVertex = createExecutionJobVertex(1 << 15, NOT_CONFIGURED); + Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism()); + + // test upper bound + try { + executionJobVertex = createExecutionJobVertex(1 + (1 << 15), NOT_CONFIGURED); + executionJobVertex.getMaxParallelism(); + Assert.fail(); + } catch (IllegalArgumentException ignore) { + } + + // test configured / trumps computed default + executionJobVertex = createExecutionJobVertex(172, 4); + Assert.assertEquals(4, executionJobVertex.getMaxParallelism()); + + + // test configured / trumps computed default + executionJobVertex = createExecutionJobVertex(4, 1 << 15); + Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism()); + + // test upper bound configured + try { + executionJobVertex = createExecutionJobVertex(4, 1 + (1 << 15)); + Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); + } catch (IllegalArgumentException ignore) { + } + + // test lower bound configured + try { + executionJobVertex = createExecutionJobVertex(4, 0); + Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); + } catch (IllegalArgumentException ignore) { + } + + // test override trumps test configured 2 + executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED); + executionJobVertex.setMaxParallelism(7); + Assert.assertEquals(7, executionJobVertex.getMaxParallelism()); + + // test lower bound with derived value + executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED); + try { + executionJobVertex.setMaxParallelism(0); + Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); + } catch (IllegalArgumentException ignore) { + } + + // test upper bound with derived value + executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED); + try { + executionJobVertex.setMaxParallelism(1 + (1 << 15)); + Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); + } catch (IllegalArgumentException ignore) { + } + + // test complain on setting derived value in presence of a configured value + executionJobVertex = createExecutionJobVertex(4, 16); + try { + executionJobVertex.setMaxParallelism(7); + Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); + } catch (IllegalStateException ignore) { + } + + } + + //------------------------------------------------------------------------------------------------------ + + private static ExecutionJobVertex createExecutionJobVertex( + int parallelism, + int preconfiguredMaxParallelism) throws JobException, IOException { + + JobVertex jobVertex = new JobVertex("testVertex"); + jobVertex.setInvokableClass(AbstractInvokable.class); + jobVertex.setParallelism(parallelism); + + if (NOT_CONFIGURED != preconfiguredMaxParallelism) { + jobVertex.setMaxParallelism(preconfiguredMaxParallelism); + } + + ExecutionGraph executionGraphMock = mock(ExecutionGraph.class); + when(executionGraphMock.getFutureExecutor()).thenReturn(Executors.directExecutor()); + ExecutionJobVertex executionJobVertex = + new ExecutionJobVertex(executionGraphMock, jobVertex, 1, Time.seconds(10)); + + return executionJobVertex; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 8bc39a7..8becd7f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -354,8 +354,17 @@ public class ExecutionVertexDeploymentTest { public void testTddProducedPartitionsLazyScheduling() throws Exception { TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext(); ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), context); - IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED); - ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + + IntermediateResult result = + new IntermediateResult(new IntermediateDataSetID(), jobVertex, 1, ResultPartitionType.PIPELINED); + + ExecutionVertex vertex = + new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + + ExecutionEdge mockEdge = createMockExecutionEdge(1); + + result.getPartitions()[0].addConsumerGroup(); + result.getPartitions()[0].addConsumer(mockEdge, 0); Slot root = mock(Slot.class); when(root.getSlotNumber()).thenReturn(1); @@ -374,4 +383,18 @@ public class ExecutionVertexDeploymentTest { assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage()); } } + + + + private ExecutionEdge createMockExecutionEdge(int maxParallelism) { + ExecutionVertex targetVertex = mock(ExecutionVertex.class); + ExecutionJobVertex targetJobVertex = mock(ExecutionJobVertex.class); + + when(targetVertex.getJobVertex()).thenReturn(targetJobVertex); + when(targetJobVertex.getMaxParallelism()).thenReturn(maxParallelism); + + ExecutionEdge edge = mock(ExecutionEdge.class); + when(edge.getTarget()).thenReturn(targetVertex); + return edge; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 4eb4fd1..f6562a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -83,6 +83,7 @@ public class ResultPartitionTest { new ResultPartitionID(), type, 1, + 1, mock(ResultPartitionManager.class), notifier, mock(IOManager.class), http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 37ec751..e05fb56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -120,6 +120,7 @@ public class LocalInputChannelTest { partitionIds[i], ResultPartitionType.PIPELINED, parallelism, + parallelism, partitionManager, partitionConsumableNotifier, ioManager, http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 1fea6f6..2fd2b98 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -624,10 +624,12 @@ public class JobManagerTest { JobGraph jobGraph = new JobGraph("croissant"); JobVertex jobVertex1 = new JobVertex("cappuccino"); jobVertex1.setParallelism(4); + jobVertex1.setMaxParallelism(16); jobVertex1.setInvokableClass(Tasks.BlockingNoOpInvokable.class); JobVertex jobVertex2 = new JobVertex("americano"); jobVertex2.setParallelism(4); + jobVertex2.setMaxParallelism(16); jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); jobGraph.addVertex(jobVertex1); http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index e24410e..2fb5fa8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -631,7 +631,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, 1, true)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -776,7 +776,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, 1, true)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -1486,6 +1486,7 @@ public class TaskManagerTest extends TestLogger { new IntermediateResultPartitionID(), ResultPartitionType.PIPELINED, 1, + 1, true); final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 3e3afd3..7f33275 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -17,9 +17,9 @@ package org.apache.flink.streaming.api.datastream; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; @@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.RichProcessFunction; @@ -40,17 +39,18 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator; import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.operators.StreamGroupedFold; import org.apache.flink.streaming.api.operators.StreamGroupedReduce; -import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; -import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.time.Time; @@ -113,8 +113,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { dataStream.getExecutionEnvironment(), new PartitionTransformation<>( dataStream.getTransformation(), - new KeyGroupStreamPartitioner<>( - keySelector, KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM))); + new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; this.keyType = keyType; } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 3fe21fb..9dd60b7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -17,8 +17,8 @@ package org.apache.flink.streaming.api.datastream; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -115,18 +115,18 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { /** * Sets the parallelism for this operator. The degree must be 1 or more. - * + * * @param parallelism * The parallelism for this operator. * @return The operator with set parallelism. */ public SingleOutputStreamOperator<T> setParallelism(int parallelism) { - if (parallelism < 1) { - throw new IllegalArgumentException("The parallelism of an operator must be at least 1."); - } - if (nonParallel && parallelism > 1) { - throw new IllegalArgumentException("The parallelism of non parallel operator must be 1."); - } + Preconditions.checkArgument(parallelism > 0, + "The parallelism of an operator must be at least 1."); + + Preconditions.checkArgument(canBeParallel() || parallelism == 1, + "The parallelism of non parallel operator must be 1."); + transformation.setParallelism(parallelism); return this; @@ -143,15 +143,23 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { */ @PublicEvolving public SingleOutputStreamOperator<T> setMaxParallelism(int maxParallelism) { - Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0."); + Preconditions.checkArgument(maxParallelism > 0, + "The maximum parallelism must be greater than 0."); + + Preconditions.checkArgument(canBeParallel() || maxParallelism == 1, + "The maximum parallelism of non parallel operator must be 1."); transformation.setMaxParallelism(maxParallelism); return this; } + private boolean canBeParallel() { + return !nonParallel; + } + /** - * Sets the parallelism of this operator to one. + * Sets the parallelism and maximum parallelism of this operator to one. * And mark this operator cannot set a non-1 degree of parallelism. * * @return The operator with only one parallelism. @@ -159,6 +167,7 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { @PublicEvolving public SingleOutputStreamOperator<T> forceNonParallel() { transformation.setParallelism(1); + transformation.setMaxParallelism(1); nonParallel = true; return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 5b4b901..dab0a06 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -48,6 +48,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -166,16 +167,19 @@ public abstract class StreamExecutionEnvironment { } /** - * Sets the maximum degree of parallelism defined for the program. + * Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) is Short.MAX_VALUE. * * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also * defines the number of key groups used for partitioned state. * - * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 + * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1 */ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) { - Preconditions.checkArgument(maxParallelism > 0 && maxParallelism <= (1 << 15), - "maxParallelism is out of bounds 0 < maxParallelism <= 2^15. Found: " + maxParallelism); + Preconditions.checkArgument(maxParallelism > 0 && + maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + "maxParallelism is out of bounds 0 < maxParallelism <= " + + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); + config.setMaxParallelism(maxParallelism); return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index a4a8dc7..2f80764 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -17,6 +17,18 @@ package org.apache.flink.streaming.api.graph; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; @@ -38,7 +50,6 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -51,18 +62,6 @@ import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * Class representing the streaming topology. It contains all the information * necessary to build the jobgraph for the execution. @@ -358,18 +357,6 @@ public class StreamGraph extends StreamingPlan { if (partitioner == null) { partitioner = virtuaPartitionNodes.get(virtualId).f1; } - - if (partitioner instanceof ConfigurableStreamPartitioner) { - StreamNode downstreamNode = getStreamNode(downStreamVertexID); - - ConfigurableStreamPartitioner configurableStreamPartitioner = (ConfigurableStreamPartitioner) partitioner; - - // Configure the partitioner with the max parallelism. This is necessary if the - // partitioner has been created before the maximum parallelism has been set. The - // maximum parallelism is necessary for the key group mapping. - configurableStreamPartitioner.configure(downstreamNode.getMaxParallelism()); - } - addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames); } else { StreamNode upstreamNode = getStreamNode(upStreamVertexID); http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 7ab7494..ddd0515 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.transformations.SplitTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.transformations.UnionTransformation; -import org.apache.flink.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +78,9 @@ public class StreamGraphGenerator { private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class); + public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; + public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; + // The StreamGraph that is being built, this is initialized at the beginning. private StreamGraph streamGraph; @@ -149,25 +151,11 @@ public class StreamGraphGenerator { if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set, then first use the job wide max parallelism - // from theExecutionConfig. If this value has not been specified either, then use the - // parallelism of the operator. - int maxParallelism = env.getConfig().getMaxParallelism(); - - if (maxParallelism <= 0) { - - int parallelism = transform.getParallelism(); - - if(parallelism <= 0) { - parallelism = 1; - transform.setParallelism(parallelism); - } - - maxParallelism = Math.max( - MathUtils.roundUpToPowerOfTwo(parallelism + (parallelism / 2)), - KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM); + // from theExecutionConfig. + int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); + if (globalMaxParallelismFromConfig > 0) { + transform.setMaxParallelism(globalMaxParallelismFromConfig); } - - transform.setMaxParallelism(maxParallelism); } // call at least once to trigger exceptions about MissingTypeInfo http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 0d58ed2..19a3699 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.util.Preconditions; import java.io.Serializable; import java.util.ArrayList; @@ -163,7 +162,6 @@ public class StreamNode implements Serializable { * @param maxParallelism Maximum parallelism to be set */ void setMaxParallelism(int maxParallelism) { - Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be at least 1."); this.maxParallelism = maxParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index e306f30..8877c80 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -310,18 +310,7 @@ public class StreamingJobGraphGenerator { parallelism = jobVertex.getParallelism(); } - int maxParallelism = streamNode.getMaxParallelism(); - - // the maximum parallelism specifies the upper bound for the parallelism - if (parallelism > maxParallelism) { - // the parallelism should always be smaller or equal than the max parallelism - throw new IllegalStateException("The maximum parallelism (" + maxParallelism + ") of " + - "the stream node " + streamNode + " is smaller than the parallelism (" + - parallelism + "). Increase the maximum parallelism or decrease the parallelism of " + - "this operator."); - } else { - jobVertex.setMaxParallelism(streamNode.getMaxParallelism()); - } + jobVertex.setMaxParallelism(streamNode.getMaxParallelism()); if (LOG.isDebugEnabled()) { LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId); http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java index f7aecdb..5e1b3e2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.util.Preconditions; @@ -205,6 +206,10 @@ public abstract class StreamTransformation<T> { * @param maxParallelism Maximum parallelism for this stream transformation. */ public void setMaxParallelism(int maxParallelism) { + Preconditions.checkArgument(maxParallelism > 0 + && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM, + "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM + + ". Found: " + maxParallelism); this.maxParallelism = maxParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java index 256fee1..ddbdaea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java @@ -76,6 +76,7 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem @Override public void configure(int maxParallelism) { + KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism); this.maxParallelism = maxParallelism; } } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 7771064..6d01795 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.StreamRecordWriter; +import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -94,6 +95,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { try { for (int i = 0; i < outEdgesInOrder.size(); i++) { StreamEdge outEdge = outEdgesInOrder.get(i); + RecordWriterOutput<?> streamOutput = createStreamOutput( outEdge, chainedConfigs.get(outEdge.getSourceId()), i, containingTask.getEnvironment(), containingTask.getName()); @@ -330,6 +332,14 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex); + // we initialize the partitioner here with the number of key groups (aka max. parallelism) + if (outputPartitioner instanceof ConfigurableStreamPartitioner) { + int numKeyGroups = bufferWriter.getNumTargetKeyGroups(); + if (0 < numKeyGroups) { + ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups); + } + } + StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output = new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout()); output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
