[FLINK-5473] Limit max parallelism to 1 for non-parallel operators

[FLINK-5473] Better default behaviours for unspecified maximum parallelism

This closes #3182.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/993a2e2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/993a2e2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/993a2e2f

Branch: refs/heads/release-1.2
Commit: 993a2e2fa0ceecff0979a267ace7cd7b8e05d359
Parents: 908376b
Author: Stefan Richter <[email protected]>
Authored: Mon Jan 16 14:31:22 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Jan 24 09:49:52 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   4 +-
 .../checkpoint/StateAssignmentOperation.java    | 285 ++++++++++---------
 .../checkpoint/savepoint/SavepointLoader.java   |   8 +-
 .../ResultPartitionDeploymentDescriptor.java    |  18 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionJobVertex.java      | 115 ++++++--
 .../runtime/executiongraph/ExecutionVertex.java |  31 +-
 .../api/writer/ResultPartitionWriter.java       |   4 +
 .../io/network/partition/ResultPartition.java   |   8 +
 .../flink/runtime/jobgraph/JobVertex.java       |   7 +-
 .../runtime/state/KeyGroupRangeAssignment.java  |  42 ++-
 .../apache/flink/runtime/taskmanager/Task.java  |   1 +
 .../checkpoint/CheckpointCoordinatorTest.java   |   1 +
 .../savepoint/SavepointLoaderTest.java          |   1 +
 ...ResultPartitionDeploymentDescriptorTest.java |   1 +
 .../executiongraph/ExecutionJobVertexTest.java  | 140 +++++++++
 .../ExecutionVertexDeploymentTest.java          |  27 +-
 .../network/partition/ResultPartitionTest.java  |   1 +
 .../consumer/LocalInputChannelTest.java         |   1 +
 .../runtime/jobmanager/JobManagerTest.java      |   2 +
 .../runtime/taskmanager/TaskManagerTest.java    |   5 +-
 .../streaming/api/datastream/KeyedStream.java   |  13 +-
 .../datastream/SingleOutputStreamOperator.java  |  29 +-
 .../environment/StreamExecutionEnvironment.java |  12 +-
 .../flink/streaming/api/graph/StreamGraph.java  |  37 +--
 .../api/graph/StreamGraphGenerator.java         |  26 +-
 .../flink/streaming/api/graph/StreamNode.java   |   2 -
 .../api/graph/StreamingJobGraphGenerator.java   |  13 +-
 .../transformations/StreamTransformation.java   |   5 +
 .../partitioner/KeyGroupStreamPartitioner.java  |   1 +
 .../streaming/runtime/tasks/OperatorChain.java  |  10 +
 .../api/StreamExecutionEnvironmentTest.java     | 119 +++++++-
 .../api/graph/StreamGraphGeneratorTest.java     |   9 +-
 .../test/checkpointing/RescalingITCase.java     |  25 +-
 34 files changed, 723 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9132897..78cad91 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -885,8 +885,10 @@ public class CheckpointCoordinator {
 
                        LOG.info("Restoring from latest valid checkpoint: {}.", 
latest);
 
+                       final Map<JobVertexID, TaskState> taskStates = 
latest.getTaskStates();
+
                        StateAssignmentOperation stateAssignmentOperation =
-                                       new StateAssignmentOperation(LOG, 
tasks, latest, allowNonRestoredState);
+                                       new StateAssignmentOperation(LOG, 
tasks, taskStates, allowNonRestoredState);
 
                        stateAssignmentOperation.assignStates();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 6c23f02..6d075db 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -45,29 +45,34 @@ public class StateAssignmentOperation {
 
        private final Logger logger;
        private final Map<JobVertexID, ExecutionJobVertex> tasks;
-       private final CompletedCheckpoint latest;
+       private final Map<JobVertexID, TaskState> taskStates;
        private final boolean allowNonRestoredState;
 
        public StateAssignmentOperation(
                        Logger logger,
                        Map<JobVertexID, ExecutionJobVertex> tasks,
-                       CompletedCheckpoint latest,
+                       Map<JobVertexID, TaskState> taskStates,
                        boolean allowNonRestoredState) {
 
-               this.logger = logger;
-               this.tasks = tasks;
-               this.latest = latest;
+               this.logger = Preconditions.checkNotNull(logger);
+               this.tasks = Preconditions.checkNotNull(tasks);
+               this.taskStates = Preconditions.checkNotNull(taskStates);
                this.allowNonRestoredState = allowNonRestoredState;
        }
 
        public boolean assignStates() throws Exception {
 
+               // this tracks if we find missing node hash ids and already use 
secondary mappings
                boolean expandedToLegacyIds = false;
+
                Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
 
-               for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : 
latest.getTaskStates().entrySet()) {
+               for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : 
taskStates.entrySet()) {
 
                        TaskState taskState = taskGroupStateEntry.getValue();
+
+                       //----------------------------------------find vertex 
for state---------------------------------------------
+
                        ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
 
                        // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
@@ -89,8 +94,31 @@ public class StateAssignmentOperation {
                                }
                        }
 
-                       // check that the number of key groups have not changed
-                       if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
+                       checkParallelismPreconditions(taskState, 
executionJobVertex);
+
+                       assignTaskStatesToOperatorInstances(taskState, 
executionJobVertex);
+               }
+
+               return true;
+       }
+
+       private void checkParallelismPreconditions(TaskState taskState, 
ExecutionJobVertex executionJobVertex) {
+               //----------------------------------------max parallelism 
preconditions-------------------------------------
+
+               // check that the number of key groups have not changed or if 
we need to override it to satisfy the restored state
+               if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
+
+                       if (!executionJobVertex.isMaxParallelismConfigured()) {
+                               // if the max parallelism was not explicitly 
specified by the user, we derive it from the state
+
+                               if (logger.isDebugEnabled()) {
+                                       logger.debug("Overriding maximum 
parallelism for JobVertex " + executionJobVertex.getJobVertexId()
+                                                       + " from " + 
executionJobVertex.getMaxParallelism() + " to " + 
taskState.getMaxParallelism());
+                               }
+
+                               
executionJobVertex.setMaxParallelism(taskState.getMaxParallelism());
+                       } else {
+                               // if the max parallelism was explicitly 
specified, we complain on mismatch
                                throw new IllegalStateException("The maximum 
parallelism (" +
                                                taskState.getMaxParallelism() + 
") with which the latest " +
                                                "checkpoint of the execution 
job vertex " + executionJobVertex +
@@ -98,159 +126,162 @@ public class StateAssignmentOperation {
                                                
executionJobVertex.getMaxParallelism() + ") changed. This " +
                                                "is currently not supported.");
                        }
+               }
 
-                       
//-------------------------------------------------------------------
+               //----------------------------------------parallelism 
preconditions-----------------------------------------
 
-                       final int oldParallelism = taskState.getParallelism();
-                       final int newParallelism = 
executionJobVertex.getParallelism();
-                       final boolean parallelismChanged = oldParallelism != 
newParallelism;
-                       final boolean hasNonPartitionedState = 
taskState.hasNonPartitionedState();
+               final int oldParallelism = taskState.getParallelism();
+               final int newParallelism = executionJobVertex.getParallelism();
 
-                       if (hasNonPartitionedState && parallelismChanged) {
-                               throw new IllegalStateException("Cannot restore 
the latest checkpoint because " +
-                                               "the operator " + 
executionJobVertex.getJobVertexId() + " has non-partitioned " +
-                                               "state and its parallelism 
changed. The operator " + executionJobVertex.getJobVertexId() +
-                                               " has parallelism " + 
newParallelism + " whereas the corresponding " +
-                                               "state object has a parallelism 
of " + oldParallelism);
-                       }
+               if (taskState.hasNonPartitionedState() && (oldParallelism != 
newParallelism)) {
+                       throw new IllegalStateException("Cannot restore the 
latest checkpoint because " +
+                                       "the operator " + 
executionJobVertex.getJobVertexId() + " has non-partitioned " +
+                                       "state and its parallelism changed. The 
operator " + executionJobVertex.getJobVertexId() +
+                                       " has parallelism " + newParallelism + 
" whereas the corresponding " +
+                                       "state object has a parallelism of " + 
oldParallelism);
+               }
+       }
 
-                       List<KeyGroupRange> keyGroupPartitions = 
createKeyGroupPartitions(
-                                       executionJobVertex.getMaxParallelism(),
-                                       newParallelism);
+       private static void assignTaskStatesToOperatorInstances(
+                       TaskState taskState, ExecutionJobVertex 
executionJobVertex) {
 
-                       final int chainLength = taskState.getChainLength();
+               final int oldParallelism = taskState.getParallelism();
+               final int newParallelism = executionJobVertex.getParallelism();
 
-                       // operator chain idx -> list of the stored op states 
from all parallel instances for this chain idx
-                       @SuppressWarnings("unchecked")
-                       List<OperatorStateHandle>[] parallelOpStatesBackend = 
new List[chainLength];
-                       @SuppressWarnings("unchecked")
-                       List<OperatorStateHandle>[] parallelOpStatesStream = 
new List[chainLength];
+               List<KeyGroupRange> keyGroupPartitions = 
createKeyGroupPartitions(
+                               executionJobVertex.getMaxParallelism(),
+                               newParallelism);
 
-                       List<KeyGroupsStateHandle> parallelKeyedStatesBackend = 
new ArrayList<>(oldParallelism);
-                       List<KeyGroupsStateHandle> parallelKeyedStateStream = 
new ArrayList<>(oldParallelism);
+               final int chainLength = taskState.getChainLength();
 
-                       for (int p = 0; p < oldParallelism; ++p) {
-                               SubtaskState subtaskState = 
taskState.getState(p);
+               // operator chain idx -> list of the stored op states from all 
parallel instances for this chain idx
+               @SuppressWarnings("unchecked")
+               List<OperatorStateHandle>[] parallelOpStatesBackend = new 
List[chainLength];
+               @SuppressWarnings("unchecked")
+               List<OperatorStateHandle>[] parallelOpStatesStream = new 
List[chainLength];
 
-                               if (null != subtaskState) {
-                                       collectParallelStatesByChainOperator(
-                                                       
parallelOpStatesBackend, subtaskState.getManagedOperatorState());
+               List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new 
ArrayList<>(oldParallelism);
+               List<KeyGroupsStateHandle> parallelKeyedStateStream = new 
ArrayList<>(oldParallelism);
 
-                                       collectParallelStatesByChainOperator(
-                                                       parallelOpStatesStream, 
subtaskState.getRawOperatorState());
+               for (int p = 0; p < oldParallelism; ++p) {
+                       SubtaskState subtaskState = taskState.getState(p);
 
-                                       KeyGroupsStateHandle keyedStateBackend 
= subtaskState.getManagedKeyedState();
-                                       if (null != keyedStateBackend) {
-                                               
parallelKeyedStatesBackend.add(keyedStateBackend);
-                                       }
+                       if (null != subtaskState) {
+                               collectParallelStatesByChainOperator(
+                                               parallelOpStatesBackend, 
subtaskState.getManagedOperatorState());
 
-                                       KeyGroupsStateHandle keyedStateStream = 
subtaskState.getRawKeyedState();
-                                       if (null != keyedStateStream) {
-                                               
parallelKeyedStateStream.add(keyedStateStream);
-                                       }
+                               collectParallelStatesByChainOperator(
+                                               parallelOpStatesStream, 
subtaskState.getRawOperatorState());
+
+                               KeyGroupsStateHandle keyedStateBackend = 
subtaskState.getManagedKeyedState();
+                               if (null != keyedStateBackend) {
+                                       
parallelKeyedStatesBackend.add(keyedStateBackend);
+                               }
+
+                               KeyGroupsStateHandle keyedStateStream = 
subtaskState.getRawKeyedState();
+                               if (null != keyedStateStream) {
+                                       
parallelKeyedStateStream.add(keyedStateStream);
                                }
                        }
+               }
 
-                       // operator chain index -> lists with collected states 
(one collection for each parallel subtasks)
-                       @SuppressWarnings("unchecked")
-                       List<Collection<OperatorStateHandle>>[] 
partitionedParallelStatesBackend = new List[chainLength];
+               // operator chain index -> lists with collected states (one 
collection for each parallel subtasks)
+               @SuppressWarnings("unchecked")
+               List<Collection<OperatorStateHandle>>[] 
partitionedParallelStatesBackend = new List[chainLength];
 
-                       @SuppressWarnings("unchecked")
-                       List<Collection<OperatorStateHandle>>[] 
partitionedParallelStatesStream = new List[chainLength];
+               @SuppressWarnings("unchecked")
+               List<Collection<OperatorStateHandle>>[] 
partitionedParallelStatesStream = new List[chainLength];
 
-                       //TODO here we can employ different redistribution 
strategies for state, e.g. union state.
-                       // For now we only offer round robin as the default.
-                       OperatorStateRepartitioner opStateRepartitioner = 
RoundRobinOperatorStateRepartitioner.INSTANCE;
+               //TODO here we can employ different redistribution strategies 
for state, e.g. union state.
+               // For now we only offer round robin as the default.
+               OperatorStateRepartitioner opStateRepartitioner = 
RoundRobinOperatorStateRepartitioner.INSTANCE;
 
-                       for (int chainIdx = 0; chainIdx < chainLength; 
++chainIdx) {
+               for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
 
-                               List<OperatorStateHandle> 
chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx];
-                               List<OperatorStateHandle> 
chainOpParallelStatesStream = parallelOpStatesStream[chainIdx];
+                       List<OperatorStateHandle> chainOpParallelStatesBackend 
= parallelOpStatesBackend[chainIdx];
+                       List<OperatorStateHandle> chainOpParallelStatesStream = 
parallelOpStatesStream[chainIdx];
 
-                               partitionedParallelStatesBackend[chainIdx] = 
applyRepartitioner(
-                                               opStateRepartitioner,
-                                               chainOpParallelStatesBackend,
-                                               oldParallelism,
-                                               newParallelism);
+                       partitionedParallelStatesBackend[chainIdx] = 
applyRepartitioner(
+                                       opStateRepartitioner,
+                                       chainOpParallelStatesBackend,
+                                       oldParallelism,
+                                       newParallelism);
 
-                               partitionedParallelStatesStream[chainIdx] = 
applyRepartitioner(
-                                               opStateRepartitioner,
-                                               chainOpParallelStatesStream,
-                                               oldParallelism,
-                                               newParallelism);
-                       }
+                       partitionedParallelStatesStream[chainIdx] = 
applyRepartitioner(
+                                       opStateRepartitioner,
+                                       chainOpParallelStatesStream,
+                                       oldParallelism,
+                                       newParallelism);
+               }
 
-                       for (int subTaskIdx = 0; subTaskIdx < newParallelism; 
++subTaskIdx) {
-                               // non-partitioned state
-                               ChainedStateHandle<StreamStateHandle> 
nonPartitionableState = null;
+               for (int subTaskIdx = 0; subTaskIdx < newParallelism; 
++subTaskIdx) {
+                       // non-partitioned state
+                       ChainedStateHandle<StreamStateHandle> 
nonPartitionableState = null;
 
-                               if (!parallelismChanged) {
-                                       if (taskState.getState(subTaskIdx) != 
null) {
-                                               nonPartitionableState = 
taskState.getState(subTaskIdx).getLegacyOperatorState();
-                                       }
+                       if (oldParallelism == newParallelism) {
+                               if (taskState.getState(subTaskIdx) != null) {
+                                       nonPartitionableState = 
taskState.getState(subTaskIdx).getLegacyOperatorState();
                                }
+                       }
 
-                               // partitionable state
-                               @SuppressWarnings("unchecked")
-                               Collection<OperatorStateHandle>[] iab = new 
Collection[chainLength];
-                               @SuppressWarnings("unchecked")
-                               Collection<OperatorStateHandle>[] ias = new 
Collection[chainLength];
-                               List<Collection<OperatorStateHandle>> 
operatorStateFromBackend = Arrays.asList(iab);
-                               List<Collection<OperatorStateHandle>> 
operatorStateFromStream = Arrays.asList(ias);
-
-                               for (int chainIdx = 0; chainIdx < 
partitionedParallelStatesBackend.length; ++chainIdx) {
-                                       List<Collection<OperatorStateHandle>> 
redistributedOpStateBackend =
-                                                       
partitionedParallelStatesBackend[chainIdx];
+                       // partitionable state
+                       @SuppressWarnings("unchecked")
+                       Collection<OperatorStateHandle>[] iab = new 
Collection[chainLength];
+                       @SuppressWarnings("unchecked")
+                       Collection<OperatorStateHandle>[] ias = new 
Collection[chainLength];
+                       List<Collection<OperatorStateHandle>> 
operatorStateFromBackend = Arrays.asList(iab);
+                       List<Collection<OperatorStateHandle>> 
operatorStateFromStream = Arrays.asList(ias);
 
-                                       List<Collection<OperatorStateHandle>> 
redistributedOpStateStream =
-                                                       
partitionedParallelStatesStream[chainIdx];
+                       for (int chainIdx = 0; chainIdx < 
partitionedParallelStatesBackend.length; ++chainIdx) {
+                               List<Collection<OperatorStateHandle>> 
redistributedOpStateBackend =
+                                               
partitionedParallelStatesBackend[chainIdx];
 
-                                       if (redistributedOpStateBackend != 
null) {
-                                               
operatorStateFromBackend.set(chainIdx, 
redistributedOpStateBackend.get(subTaskIdx));
-                                       }
+                               List<Collection<OperatorStateHandle>> 
redistributedOpStateStream =
+                                               
partitionedParallelStatesStream[chainIdx];
 
-                                       if (redistributedOpStateStream != null) 
{
-                                               
operatorStateFromStream.set(chainIdx, 
redistributedOpStateStream.get(subTaskIdx));
-                                       }
+                               if (redistributedOpStateBackend != null) {
+                                       operatorStateFromBackend.set(chainIdx, 
redistributedOpStateBackend.get(subTaskIdx));
                                }
 
-                               Execution currentExecutionAttempt = 
executionJobVertex
-                                               .getTaskVertices()[subTaskIdx]
-                                               .getCurrentExecutionAttempt();
+                               if (redistributedOpStateStream != null) {
+                                       operatorStateFromStream.set(chainIdx, 
redistributedOpStateStream.get(subTaskIdx));
+                               }
+                       }
 
-                               List<KeyGroupsStateHandle> 
newKeyedStatesBackend;
-                               List<KeyGroupsStateHandle> newKeyedStateStream;
-                               if (parallelismChanged) {
-                                       KeyGroupRange subtaskKeyGroupIds = 
keyGroupPartitions.get(subTaskIdx);
-                                       newKeyedStatesBackend = 
getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
-                                       newKeyedStateStream = 
getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
+                       Execution currentExecutionAttempt = executionJobVertex
+                                       .getTaskVertices()[subTaskIdx]
+                                       .getCurrentExecutionAttempt();
+
+                       List<KeyGroupsStateHandle> newKeyedStatesBackend;
+                       List<KeyGroupsStateHandle> newKeyedStateStream;
+                       if (oldParallelism == newParallelism) {
+                               SubtaskState subtaskState = 
taskState.getState(subTaskIdx);
+                               if (subtaskState != null) {
+                                       KeyGroupsStateHandle 
oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
+                                       KeyGroupsStateHandle 
oldKeyedStatesStream = subtaskState.getRawKeyedState();
+                                       newKeyedStatesBackend = 
oldKeyedStatesBackend != null ? Collections.singletonList(
+                                                       oldKeyedStatesBackend) 
: null;
+                                       newKeyedStateStream = 
oldKeyedStatesStream != null ? Collections.singletonList(
+                                                       oldKeyedStatesStream) : 
null;
                                } else {
-                                       SubtaskState subtaskState = 
taskState.getState(subTaskIdx);
-                                       if (subtaskState != null) {
-                                               KeyGroupsStateHandle 
oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
-                                               KeyGroupsStateHandle 
oldKeyedStatesStream = subtaskState.getRawKeyedState();
-                                               newKeyedStatesBackend = 
oldKeyedStatesBackend != null ? Collections.singletonList(
-                                                               
oldKeyedStatesBackend) : null;
-                                               newKeyedStateStream = 
oldKeyedStatesStream != null ? Collections.singletonList(
-                                                               
oldKeyedStatesStream) : null;
-                                       } else {
-                                               newKeyedStatesBackend = null;
-                                               newKeyedStateStream = null;
-                                       }
+                                       newKeyedStatesBackend = null;
+                                       newKeyedStateStream = null;
                                }
+                       } else {
+                               KeyGroupRange subtaskKeyGroupIds = 
keyGroupPartitions.get(subTaskIdx);
+                               newKeyedStatesBackend = 
getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
+                               newKeyedStateStream = 
getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
+                       }
 
-                               TaskStateHandles taskStateHandles = new 
TaskStateHandles(
-                                               nonPartitionableState,
-                                               operatorStateFromBackend,
-                                               operatorStateFromStream,
-                                               newKeyedStatesBackend,
-                                               newKeyedStateStream);
+                       TaskStateHandles taskStateHandles = new 
TaskStateHandles(
+                                       nonPartitionableState,
+                                       operatorStateFromBackend,
+                                       operatorStateFromStream,
+                                       newKeyedStatesBackend,
+                                       newKeyedStateStream);
 
-                               
currentExecutionAttempt.setInitialState(taskStateHandles);
-                       }
+                       
currentExecutionAttempt.setInitialState(taskStateHandles);
                }
-
-               return true;
        }
 
        /**
@@ -298,7 +329,7 @@ public class StateAssignmentOperation {
 
        /**
         * @param chainParallelOpStates array = chain ops, array[idx] = 
parallel states for this chain op.
-        * @param chainOpState
+        * @param chainOpState the operator chain
         */
        private static void collectParallelStatesByChainOperator(
                        List<OperatorStateHandle>[] chainParallelOpStates, 
ChainedStateHandle<OperatorStateHandle> chainOpState) {
@@ -359,4 +390,4 @@ public class StateAssignmentOperation {
                        return repackStream;
                }
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index d6be482..950a9a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -81,10 +81,11 @@ public class SavepointLoader {
                        }
 
                        if (executionJobVertex != null) {
-                               if (executionJobVertex.getMaxParallelism() == 
taskState.getMaxParallelism()) {
+
+                               if (executionJobVertex.getMaxParallelism() == 
taskState.getMaxParallelism()
+                                               || 
!executionJobVertex.isMaxParallelismConfigured()) {
                                        
taskStates.put(taskState.getJobVertexID(), taskState);
-                               }
-                               else {
+                               } else {
                                        String msg = String.format("Failed to 
rollback to savepoint %s. " +
                                                                        "Max 
parallelism mismatch between savepoint state and new program. " +
                                                                        "Cannot 
map operator %s with max parallelism %d to new program with " +
@@ -106,6 +107,7 @@ public class SavepointLoader {
                                                                "you want to 
allow to skip this, you can set the --allowNonRestoredState " +
                                                                "option on the 
CLI.",
                                                savepointPath, 
taskState.getJobVertexID());
+
                                throw new IllegalStateException(msg);
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 14c7d2a..061f925 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
 import java.io.Serializable;
 
@@ -36,6 +37,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ResultPartitionDeploymentDescriptor implements Serializable {
 
+       private static final long serialVersionUID = 6343547936086963705L;
+
        /** The ID of the result this partition belongs to. */
        private final IntermediateDataSetID resultId;
 
@@ -47,6 +50,9 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
        /** The number of subpartitions. */
        private final int numberOfSubpartitions;
+
+       /** The maximum parallelism */
+       private final int maxParallelism;
        
        /** Flag whether the result partition should send 
scheduleOrUpdateConsumer messages. */
        private final boolean sendScheduleOrUpdateConsumersMessage;
@@ -56,14 +62,17 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                        IntermediateResultPartitionID partitionId,
                        ResultPartitionType partitionType,
                        int numberOfSubpartitions,
+                       int maxParallelism,
                        boolean lazyScheduling) {
 
                this.resultId = checkNotNull(resultId);
                this.partitionId = checkNotNull(partitionId);
                this.partitionType = checkNotNull(partitionType);
 
+               
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
                checkArgument(numberOfSubpartitions >= 1);
                this.numberOfSubpartitions = numberOfSubpartitions;
+               this.maxParallelism = maxParallelism;
                this.sendScheduleOrUpdateConsumersMessage = lazyScheduling;
        }
 
@@ -83,6 +92,10 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return numberOfSubpartitions;
        }
 
+       public int getMaxParallelism() {
+               return maxParallelism;
+       }
+
        public boolean sendScheduleOrUpdateConsumersMessage() {
                return sendScheduleOrUpdateConsumersMessage;
        }
@@ -96,7 +109,8 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
        // 
------------------------------------------------------------------------
 
-       public static ResultPartitionDeploymentDescriptor 
from(IntermediateResultPartition partition, boolean lazyScheduling) {
+       public static ResultPartitionDeploymentDescriptor from(
+                       IntermediateResultPartition partition, int 
maxParallelism, boolean lazyScheduling) {
 
                final IntermediateDataSetID resultId = 
partition.getIntermediateResult().getId();
                final IntermediateResultPartitionID partitionId = 
partition.getPartitionId();
@@ -118,6 +132,6 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                }
 
                return new ResultPartitionDeploymentDescriptor(
-                               resultId, partitionId, partitionType, 
numberOfSubpartitions, lazyScheduling);
+                               resultId, partitionId, partitionType, 
numberOfSubpartitions, maxParallelism, lazyScheduling);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f31eada..b9e6c84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -663,12 +663,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        }
 
                        // create the execution job vertex and attach it to the 
graph
-                       ExecutionJobVertex ejv = null;
-                       try {
-                               ejv = new ExecutionJobVertex(this, jobVertex, 
1, timeout, createTimestamp);
-                       } catch (IOException e) {
-                               throw new JobException("Could not create a 
execution job vertex for " + jobVertex.getID() + '.', e);
-                       }
+                       ExecutionJobVertex ejv =
+                                       new ExecutionJobVertex(this, jobVertex, 
1, timeout, createTimestamp);
                        ejv.connectToPredecessors(this.intermediateResults);
 
                        ExecutionJobVertex previousTask = 
this.tasks.putIfAbsent(jobVertex.getID(), ejv);

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index fbab572..e8664f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -57,8 +57,10 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
 
        /** Use the same log for all ExecutionGraph classes */
        private static final Logger LOG = ExecutionGraph.LOG;
-       
-       private final SerializableObject stateMonitor = new 
SerializableObject();
+
+       public static final int VALUE_NOT_SET = -1;
+
+       private final Object stateMonitor = new Object();
        
        private final ExecutionGraph graph;
        
@@ -66,30 +68,32 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
        
        private final ExecutionVertex[] taskVertices;
 
-       private IntermediateResult[] producedDataSets;
+       private final IntermediateResult[] producedDataSets;
        
        private final List<IntermediateResult> inputs;
        
        private final int parallelism;
 
-       private final int maxParallelism;
-       
        private final boolean[] finishedSubtasks;
-                       
-       private volatile int numSubtasksInFinalState;
-       
+
        private final SlotSharingGroup slotSharingGroup;
-       
+
        private final CoLocationGroup coLocationGroup;
-       
+
        private final InputSplit[] inputSplits;
 
+       private final boolean maxParallelismConfigured;
+
+       private int maxParallelism;
+
+       private volatile int numSubtasksInFinalState;
+
        /**
         * Serialized task information which is for all sub tasks the same. 
Thus, it avoids to
         * serialize the same information multiple times in order to create the
         * TaskDeploymentDescriptors.
         */
-       private final SerializedValue<TaskInformation> 
serializedTaskInformation;
+       private SerializedValue<TaskInformation> serializedTaskInformation;
 
        private InputSplitAssigner splitAssigner;
        
@@ -97,7 +101,7 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                ExecutionGraph graph,
                JobVertex jobVertex,
                int defaultParallelism,
-               Time timeout) throws JobException, IOException {
+               Time timeout) throws JobException {
 
                this(graph, jobVertex, defaultParallelism, timeout, 
System.currentTimeMillis());
        }
@@ -107,7 +111,7 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                JobVertex jobVertex,
                int defaultParallelism,
                Time timeout,
-               long createTimestamp) throws JobException, IOException {
+               long createTimestamp) throws JobException {
 
                if (graph == null || jobVertex == null) {
                        throw new NullPointerException();
@@ -121,24 +125,19 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
 
                this.parallelism = numTaskVertices;
 
-               int maxP = jobVertex.getMaxParallelism();
+               final int configuredMaxParallelism = 
jobVertex.getMaxParallelism();
+
+               this.maxParallelismConfigured = (VALUE_NOT_SET != 
configuredMaxParallelism);
 
-               Preconditions.checkArgument(maxP >= parallelism, "The maximum 
parallelism (" +
-                       maxP + ") must be greater or equal than the parallelism 
(" + parallelism +
-                       ").");
-               this.maxParallelism = maxP;
+               // if no max parallelism was configured by the user, we 
calculate and set a default
+               setMaxParallelismInternal(maxParallelismConfigured ?
+                               configuredMaxParallelism : 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism));
 
-               this.serializedTaskInformation = new SerializedValue<>(new 
TaskInformation(
-                       jobVertex.getID(),
-                       jobVertex.getName(),
-                       parallelism,
-                       maxParallelism,
-                       jobVertex.getInvokableClassName(),
-                       jobVertex.getConfiguration()));
+               this.serializedTaskInformation = null;
 
                this.taskVertices = new ExecutionVertex[numTaskVertices];
                
-               this.inputs = new 
ArrayList<IntermediateResult>(jobVertex.getInputs().size());
+               this.inputs = new ArrayList<>(jobVertex.getInputs().size());
                
                // take the sharing group
                this.slotSharingGroup = jobVertex.getSlotSharingGroup();
@@ -212,6 +211,24 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                finishedSubtasks = new boolean[parallelism];
        }
 
+       public void setMaxParallelism(int maxParallelismDerived) {
+
+               Preconditions.checkState(!maxParallelismConfigured,
+                               "Attempt to override a configured max 
parallelism. Configured: " + this.maxParallelism
+                                               + ", argument: " + 
maxParallelismDerived);
+
+               setMaxParallelismInternal(maxParallelismDerived);
+       }
+
+       private void setMaxParallelismInternal(int maxParallelism) {
+               Preconditions.checkArgument(maxParallelism > 0
+                                               && maxParallelism <= 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+                               "Overriding max parallelism is not in valid 
bounds (1.." +
+                                               
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + 
maxParallelism);
+
+               this.maxParallelism = maxParallelism;
+       }
+
        public ExecutionGraph getGraph() {
                return graph;
        }
@@ -235,6 +252,10 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                return maxParallelism;
        }
 
+       public boolean isMaxParallelismConfigured() {
+               return maxParallelismConfigured;
+       }
+
        public JobID getJobId() {
                return graph.getJobID();
        }
@@ -269,24 +290,56 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                return inputs;
        }
 
-       public SerializedValue<TaskInformation> getSerializedTaskInformation() {
+       public SerializedValue<TaskInformation> getSerializedTaskInformation() 
throws IOException {
+
+               if (null == serializedTaskInformation) {
+
+                       int parallelism = getParallelism();
+                       int maxParallelism = getMaxParallelism();
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Creating task information for " + 
generateDebugString());
+                       }
+
+                       serializedTaskInformation = new SerializedValue<>(
+                                       new TaskInformation(
+                                                       jobVertex.getID(),
+                                                       jobVertex.getName(),
+                                                       parallelism,
+                                                       maxParallelism,
+                                                       
jobVertex.getInvokableClassName(),
+                                                       
jobVertex.getConfiguration()));
+               }
+
                return serializedTaskInformation;
        }
-       
+
        public boolean isInFinalState() {
                return numSubtasksInFinalState == parallelism;
        }
-       
+
        @Override
        public ExecutionState getAggregateState() {
                int[] num = new int[ExecutionState.values().length];
                for (ExecutionVertex vertex : this.taskVertices) {
                        num[vertex.getExecutionState().ordinal()]++;
                }
-               
+
                return getAggregateJobVertexState(num, parallelism);
        }
 
+       private String generateDebugString() {
+
+               return "ExecutionJobVertex" +
+                               "(" + jobVertex.getName() + " | " + 
jobVertex.getID() + ")" +
+                               "{" +
+                               "parallelism=" + parallelism +
+                               ", maxParallelism=" + getMaxParallelism() +
+                               ", maxParallelismConfigured=" + 
maxParallelismConfigured +
+                               '}';
+       }
+
+
        
//---------------------------------------------------------------------------------------------
        
        public void connectToPredecessors(Map<IntermediateDataSetID, 
IntermediateResult> intermediateDataSets) throws JobException {

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 5cbd1c1..09497e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -40,13 +40,16 @@ import 
org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -599,7 +602,24 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                boolean lazyScheduling = 
getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
                for (IntermediateResultPartition partition : 
resultPartitions.values()) {
-                       
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, 
lazyScheduling));
+
+                       List<List<ExecutionEdge>> consumers = 
partition.getConsumers();
+
+                       if (consumers.isEmpty()) {
+                               //TODO this case only exists for test, 
currently there has to be exactly one consumer in real jobs!
+                               
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
+                                               partition,
+                                               
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+                                               lazyScheduling));
+                       } else {
+                               Preconditions.checkState(1 == consumers.size(),
+                                               "Only one consumer supported in 
the current implementation! Found: " + consumers.size());
+
+                               List<ExecutionEdge> consumer = consumers.get(0);
+                               ExecutionJobVertex vertex = 
consumer.get(0).getTarget().getJobVertex();
+                               int maxParallelism = vertex.getMaxParallelism();
+                               
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, 
maxParallelism, lazyScheduling));
+                       }
                }
                
                
@@ -620,7 +640,14 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                }
 
                SerializedValue<JobInformation> serializedJobInformation = 
getExecutionGraph().getSerializedJobInformation();
-               SerializedValue<TaskInformation> serializedJobVertexInformation 
= jobVertex.getSerializedTaskInformation();
+               SerializedValue<TaskInformation> serializedJobVertexInformation 
= null;
+
+               try {
+                       serializedJobVertexInformation = 
jobVertex.getSerializedTaskInformation();
+               } catch (IOException e) {
+                       throw new ExecutionGraphException(
+                                       "Could not create a serialized 
JobVertexInformation for " + jobVertex.getJobVertexId(), e);
+               }
 
                return new TaskDeploymentDescriptor(
                        serializedJobInformation,

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index cfab34d..2a1bed8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -63,6 +63,10 @@ public class ResultPartitionWriter implements 
EventListener<TaskEvent> {
                return partition.getNumberOfSubpartitions();
        }
 
+       public int getNumTargetKeyGroups() {
+               return partition.getNumTargetKeyGroups();
+       }
+
        // 
------------------------------------------------------------------------
        // Data processing
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 474c25c..3d92584 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -95,6 +95,8 @@ public class ResultPartition implements BufferPoolOwner {
 
        private final ResultPartitionConsumableNotifier 
partitionConsumableNotifier;
 
+       public final int numTargetKeyGroups;
+
        private final boolean sendScheduleOrUpdateConsumersMessage;
 
        // - Runtime state 
--------------------------------------------------------
@@ -131,6 +133,7 @@ public class ResultPartition implements BufferPoolOwner {
                ResultPartitionID partitionId,
                ResultPartitionType partitionType,
                int numberOfSubpartitions,
+               int numTargetKeyGroups,
                ResultPartitionManager partitionManager,
                ResultPartitionConsumableNotifier partitionConsumableNotifier,
                IOManager ioManager,
@@ -142,6 +145,7 @@ public class ResultPartition implements BufferPoolOwner {
                this.partitionId = checkNotNull(partitionId);
                this.partitionType = checkNotNull(partitionType);
                this.subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
+               this.numTargetKeyGroups = numTargetKeyGroups;
                this.partitionManager = checkNotNull(partitionManager);
                this.partitionConsumableNotifier = 
checkNotNull(partitionConsumableNotifier);
                this.sendScheduleOrUpdateConsumersMessage = 
sendScheduleOrUpdateConsumersMessage;
@@ -356,6 +360,10 @@ public class ResultPartition implements BufferPoolOwner {
                return cause;
        }
 
+       public int getNumTargetKeyGroups() {
+               return numTargetKeyGroups;
+       }
+
        /**
         * Releases buffers held by this result partition.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index d24100e..9dcaeeb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -40,7 +40,6 @@ public class JobVertex implements java.io.Serializable {
 
        private static final String DEFAULT_NAME = "(unnamed vertex)";
 
-
        // 
--------------------------------------------------------------------------------------------
        // Members that define the structure / topology of the graph
        // 
--------------------------------------------------------------------------------------------
@@ -60,7 +59,7 @@ public class JobVertex implements java.io.Serializable {
        private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
        /** Maximum number of subtasks to split this taks into a runtime. */
-       private int maxParallelism = Short.MAX_VALUE;
+       private int maxParallelism = -1;
 
        /** Custom configuration passed to the assigned task at runtime. */
        private Configuration configuration;
@@ -276,10 +275,6 @@ public class JobVertex implements java.io.Serializable {
         * @param maxParallelism The maximum parallelism to be set. must be 
between 1 and Short.MAX_VALUE.
         */
        public void setMaxParallelism(int maxParallelism) {
-               org.apache.flink.util.Preconditions.checkArgument(
-                               maxParallelism > 0 && maxParallelism <= (1 << 
15),
-                               "The max parallelism must be at least 1 and 
smaller than 2^15.");
-
                this.maxParallelism = maxParallelism;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index 894f721..62bf3f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -23,7 +23,14 @@ import org.apache.flink.util.Preconditions;
 
 public final class KeyGroupRangeAssignment {
 
-       public static final int DEFAULT_MAX_PARALLELISM = 128;
+       /**
+        * The default lower bound for max parallelism if nothing was 
configured by the user. We have this so allow users
+        * some degree of scale-up in case they forgot to configure maximum 
parallelism explicitly.
+        */
+       public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
+
+       /** The (inclusive) upper bound for max parallelism */
+       public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
 
        private KeyGroupRangeAssignment() {
                throw new AssertionError();
@@ -79,9 +86,12 @@ public final class KeyGroupRangeAssignment {
                        int maxParallelism,
                        int parallelism,
                        int operatorIndex) {
-               Preconditions.checkArgument(parallelism > 0, "Parallelism must 
not be smaller than zero.");
-               Preconditions.checkArgument(maxParallelism >= parallelism, 
"Maximum parallelism must not be smaller than parallelism.");
-               Preconditions.checkArgument(maxParallelism <= (1 << 15), 
"Maximum parallelism must be smaller than 2^15.");
+
+               checkParallelismPreconditions(parallelism);
+               checkParallelismPreconditions(maxParallelism);
+
+               Preconditions.checkArgument(maxParallelism >= parallelism,
+                               "Maximum parallelism must not be smaller than 
parallelism.");
 
                int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
                int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
@@ -105,4 +115,28 @@ public final class KeyGroupRangeAssignment {
        public static int computeOperatorIndexForKeyGroup(int maxParallelism, 
int parallelism, int keyGroupId) {
                return keyGroupId * parallelism / maxParallelism;
        }
+
+       /**
+        * Computes a default maximum parallelism from the operator 
parallelism. This is used in case the user has not
+        * explicitly configured a maximum parallelism to still allow a certain 
degree of scale-up.
+        *
+        * @param operatorParallelism the operator parallelism as basis for 
computation.
+        * @return the computed default maximum parallelism.
+        */
+       public static int computeDefaultMaxParallelism(int operatorParallelism) 
{
+
+               checkParallelismPreconditions(operatorParallelism);
+
+               return Math.min(
+                               Math.max(
+                                               
MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
+                                               
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
+                               UPPER_BOUND_MAX_PARALLELISM);
+       }
+
+       public static void checkParallelismPreconditions(int parallelism) {
+               Preconditions.checkArgument(parallelism > 0
+                                               && parallelism <= 
UPPER_BOUND_MAX_PARALLELISM,
+                               "Operator parallelism not within bounds: " + 
parallelism);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e945b93..3c57e3f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -343,6 +343,7 @@ public class Task implements Runnable, TaskActions {
                                partitionId,
                                desc.getPartitionType(),
                                desc.getNumberOfSubpartitions(),
+                               desc.getMaxParallelism(),
                                networkEnvironment.getResultPartitionManager(),
                                resultPartitionConsumableNotifier,
                                ioManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ca9dbc2..6ba557b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2607,6 +2607,7 @@ public class CheckpointCoordinatorTest {
                
when(executionJobVertex.getTaskVertices()).thenReturn(executionVertices);
                
when(executionJobVertex.getParallelism()).thenReturn(parallelism);
                
when(executionJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
+               
when(executionJobVertex.isMaxParallelismConfigured()).thenReturn(true);
 
                return executionJobVertex;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 67575d6..6471d6f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -87,6 +87,7 @@ public class SavepointLoaderTest {
 
                // 2) Load and validate: max parallelism mismatch
                when(vertex.getMaxParallelism()).thenReturn(222);
+               when(vertex.isMaxParallelismConfigured()).thenReturn(true);
 
                try {
                        SavepointLoader.loadAndValidateSavepoint(jobId, tasks, 
path, ucl, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 3ed8236..aac2e13 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -46,6 +46,7 @@ public class ResultPartitionDeploymentDescriptorTest {
                                                partitionId,
                                                partitionType,
                                                numberOfSubpartitions,
+                                               numberOfSubpartitions,
                                                true);
 
                ResultPartitionDeploymentDescriptor copy =

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
new file mode 100644
index 0000000..f0f6248
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ExecutionJobVertexTest {
+
+       private static final int NOT_CONFIGURED = -1;
+
+       @Test
+       public void testMaxParallelismDefaulting() throws Exception {
+
+               // default minimum
+               ExecutionJobVertex executionJobVertex = 
createExecutionJobVertex(1, NOT_CONFIGURED);
+               Assert.assertEquals(128, 
executionJobVertex.getMaxParallelism());
+
+               // test round up part 1
+               executionJobVertex = createExecutionJobVertex(171, 
NOT_CONFIGURED);
+               Assert.assertEquals(256, 
executionJobVertex.getMaxParallelism());
+
+               // test round up part 2
+               executionJobVertex = createExecutionJobVertex(172, 
NOT_CONFIGURED);
+               Assert.assertEquals(512, 
executionJobVertex.getMaxParallelism());
+
+               // test round up limit
+               executionJobVertex = createExecutionJobVertex(1 << 15, 
NOT_CONFIGURED);
+               Assert.assertEquals(1 << 15, 
executionJobVertex.getMaxParallelism());
+
+               // test upper bound
+               try {
+                       executionJobVertex = createExecutionJobVertex(1 + (1 << 
15), NOT_CONFIGURED);
+                       executionJobVertex.getMaxParallelism();
+                       Assert.fail();
+               } catch (IllegalArgumentException ignore) {
+               }
+
+               // test configured / trumps computed default
+               executionJobVertex = createExecutionJobVertex(172, 4);
+               Assert.assertEquals(4, executionJobVertex.getMaxParallelism());
+
+
+               // test configured / trumps computed default
+               executionJobVertex = createExecutionJobVertex(4, 1 << 15);
+               Assert.assertEquals(1 << 15, 
executionJobVertex.getMaxParallelism());
+
+               // test upper bound configured
+               try {
+                       executionJobVertex = createExecutionJobVertex(4, 1 + (1 
<< 15));
+                       
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+               } catch (IllegalArgumentException ignore) {
+               }
+
+               // test lower bound configured
+               try {
+                       executionJobVertex = createExecutionJobVertex(4, 0);
+                       
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+               } catch (IllegalArgumentException ignore) {
+               }
+
+               // test override trumps test configured 2
+               executionJobVertex = createExecutionJobVertex(4, 
NOT_CONFIGURED);
+               executionJobVertex.setMaxParallelism(7);
+               Assert.assertEquals(7, executionJobVertex.getMaxParallelism());
+
+               // test lower bound with derived value
+               executionJobVertex = createExecutionJobVertex(4, 
NOT_CONFIGURED);
+               try {
+                       executionJobVertex.setMaxParallelism(0);
+                       
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+               } catch (IllegalArgumentException ignore) {
+               }
+
+               // test upper bound with derived value
+               executionJobVertex = createExecutionJobVertex(4, 
NOT_CONFIGURED);
+               try {
+                       executionJobVertex.setMaxParallelism(1 + (1 << 15));
+                       
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+               } catch (IllegalArgumentException ignore) {
+               }
+
+               // test complain on setting derived value in presence of a 
configured value
+               executionJobVertex = createExecutionJobVertex(4, 16);
+               try {
+                       executionJobVertex.setMaxParallelism(7);
+                       
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+               } catch (IllegalStateException ignore) {
+               }
+
+       }
+
+       
//------------------------------------------------------------------------------------------------------
+
+       private static ExecutionJobVertex createExecutionJobVertex(
+                       int parallelism,
+                       int preconfiguredMaxParallelism) throws JobException, 
IOException {
+
+               JobVertex jobVertex = new JobVertex("testVertex");
+               jobVertex.setInvokableClass(AbstractInvokable.class);
+               jobVertex.setParallelism(parallelism);
+
+               if (NOT_CONFIGURED != preconfiguredMaxParallelism) {
+                       
jobVertex.setMaxParallelism(preconfiguredMaxParallelism);
+               }
+
+               ExecutionGraph executionGraphMock = mock(ExecutionGraph.class);
+               
when(executionGraphMock.getFutureExecutor()).thenReturn(Executors.directExecutor());
+               ExecutionJobVertex executionJobVertex =
+                               new ExecutionJobVertex(executionGraphMock, 
jobVertex, 1, Time.seconds(10));
+
+               return executionJobVertex;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 8bc39a7..8becd7f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -354,8 +354,17 @@ public class ExecutionVertexDeploymentTest {
        public void testTddProducedPartitionsLazyScheduling() throws Exception {
                TestingUtils.QueuedActionExecutionContext context = 
TestingUtils.queuedActionExecutionContext();
                ExecutionJobVertex jobVertex = getExecutionVertex(new 
JobVertexID(), context);
-               IntermediateResult result = new IntermediateResult(new 
IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED);
-               ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new 
IntermediateResult[]{result}, Time.minutes(1));
+
+               IntermediateResult result =
+                               new IntermediateResult(new 
IntermediateDataSetID(), jobVertex, 1, ResultPartitionType.PIPELINED);
+
+               ExecutionVertex vertex =
+                               new ExecutionVertex(jobVertex, 0, new 
IntermediateResult[]{result}, Time.minutes(1));
+
+               ExecutionEdge mockEdge = createMockExecutionEdge(1);
+
+               result.getPartitions()[0].addConsumerGroup();
+               result.getPartitions()[0].addConsumer(mockEdge, 0);
 
                Slot root = mock(Slot.class);
                when(root.getSlotNumber()).thenReturn(1);
@@ -374,4 +383,18 @@ public class ExecutionVertexDeploymentTest {
                        assertEquals(mode.allowLazyDeployment(), 
desc.sendScheduleOrUpdateConsumersMessage());
                }
        }
+
+
+
+       private ExecutionEdge createMockExecutionEdge(int maxParallelism) {
+               ExecutionVertex targetVertex = mock(ExecutionVertex.class);
+               ExecutionJobVertex targetJobVertex = 
mock(ExecutionJobVertex.class);
+
+               when(targetVertex.getJobVertex()).thenReturn(targetJobVertex);
+               
when(targetJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
+
+               ExecutionEdge edge = mock(ExecutionEdge.class);
+               when(edge.getTarget()).thenReturn(targetVertex);
+               return edge;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 4eb4fd1..f6562a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -83,6 +83,7 @@ public class ResultPartitionTest {
                        new ResultPartitionID(),
                        type,
                        1,
+                       1,
                        mock(ResultPartitionManager.class),
                        notifier,
                        mock(IOManager.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 37ec751..e05fb56 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -120,6 +120,7 @@ public class LocalInputChannelTest {
                                partitionIds[i],
                                ResultPartitionType.PIPELINED,
                                parallelism,
+                               parallelism,
                                partitionManager,
                                partitionConsumableNotifier,
                                ioManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 1fea6f6..2fd2b98 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -624,10 +624,12 @@ public class JobManagerTest {
                JobGraph jobGraph = new JobGraph("croissant");
                JobVertex jobVertex1 = new JobVertex("cappuccino");
                jobVertex1.setParallelism(4);
+               jobVertex1.setMaxParallelism(16);
                jobVertex1.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
 
                JobVertex jobVertex2 = new JobVertex("americano");
                jobVertex2.setParallelism(4);
+               jobVertex2.setMaxParallelism(16);
                jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
 
                jobGraph.addVertex(jobVertex1);

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e24410e..2fb5fa8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -631,7 +631,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, true));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, 1, true));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -776,7 +776,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, true));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, 1, true));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -1486,6 +1486,7 @@ public class TaskManagerTest extends TestLogger {
                                new IntermediateResultPartitionID(),
                                ResultPartitionType.PIPELINED,
                                1,
+                               1,
                                true);
 
                        final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 3e3afd3..7f33275 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.RichProcessFunction;
@@ -40,17 +39,18 @@ import 
org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import 
org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
 import 
org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -113,8 +113,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
                        dataStream.getExecutionEnvironment(),
                        new PartitionTransformation<>(
                                dataStream.getTransformation(),
-                               new KeyGroupStreamPartitioner<>(
-                                       keySelector, 
KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM)));
+                               new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
                this.keySelector = keySelector;
                this.keyType = keyType;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 3fe21fb..9dd60b7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -115,18 +115,18 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
 
        /**
         * Sets the parallelism for this operator. The degree must be 1 or more.
-        * 
+        *
         * @param parallelism
         *            The parallelism for this operator.
         * @return The operator with set parallelism.
         */
        public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
-               if (parallelism < 1) {
-                       throw new IllegalArgumentException("The parallelism of 
an operator must be at least 1.");
-               }
-               if (nonParallel && parallelism > 1) {
-                       throw new IllegalArgumentException("The parallelism of 
non parallel operator must be 1.");
-               }
+               Preconditions.checkArgument(parallelism > 0,
+                               "The parallelism of an operator must be at 
least 1.");
+
+               Preconditions.checkArgument(canBeParallel() || parallelism == 1,
+                               "The parallelism of non parallel operator must 
be 1.");
+
                transformation.setParallelism(parallelism);
 
                return this;
@@ -143,15 +143,23 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
         */
        @PublicEvolving
        public SingleOutputStreamOperator<T> setMaxParallelism(int 
maxParallelism) {
-               Preconditions.checkArgument(maxParallelism > 0, "The maximum 
parallelism must be greater than 0.");
+               Preconditions.checkArgument(maxParallelism > 0,
+                               "The maximum parallelism must be greater than 
0.");
+
+               Preconditions.checkArgument(canBeParallel() || maxParallelism 
== 1,
+                               "The maximum parallelism of non parallel 
operator must be 1.");
 
                transformation.setMaxParallelism(maxParallelism);
 
                return this;
        }
 
+       private boolean canBeParallel() {
+               return !nonParallel;
+       }
+
        /**
-        * Sets the parallelism of this operator to one.
+        * Sets the parallelism and maximum parallelism of this operator to one.
         * And mark this operator cannot set a non-1 degree of parallelism.
         *
         * @return The operator with only one parallelism.
@@ -159,6 +167,7 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
        @PublicEvolving
        public SingleOutputStreamOperator<T> forceNonParallel() {
                transformation.setParallelism(1);
+               transformation.setMaxParallelism(1);
                nonParallel = true;
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5b4b901..dab0a06 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -48,6 +48,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -166,16 +167,19 @@ public abstract class StreamExecutionEnvironment {
        }
 
        /**
-        * Sets the maximum degree of parallelism defined for the program.
+        * Sets the maximum degree of parallelism defined for the program. The 
upper limit (inclusive) is Short.MAX_VALUE.
         *
         * The maximum degree of parallelism specifies the upper limit for 
dynamic scaling. It also
         * defines the number of key groups used for partitioned state.
         *
-        * @param maxParallelism Maximum degree of parallelism to be used for 
the program., with 0 < maxParallelism <= 2^15
+        * @param maxParallelism Maximum degree of parallelism to be used for 
the program., with 0 < maxParallelism <= 2^15 - 1
         */
        public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) 
{
-               Preconditions.checkArgument(maxParallelism > 0 && 
maxParallelism <= (1 << 15),
-                               "maxParallelism is out of bounds 0 < 
maxParallelism <= 2^15. Found: " + maxParallelism);
+               Preconditions.checkArgument(maxParallelism > 0 &&
+                                               maxParallelism <= 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+                               "maxParallelism is out of bounds 0 < 
maxParallelism <= " +
+                                               
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + 
maxParallelism);
+
                config.setMaxParallelism(maxParallelism);
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a4a8dc7..2f80764 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -17,6 +17,18 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
@@ -38,7 +50,6 @@ import 
org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -51,18 +62,6 @@ import 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Class representing the streaming topology. It contains all the information
  * necessary to build the jobgraph for the execution.
@@ -358,18 +357,6 @@ public class StreamGraph extends StreamingPlan {
                        if (partitioner == null) {
                                partitioner = 
virtuaPartitionNodes.get(virtualId).f1;
                        }
-
-                       if (partitioner instanceof 
ConfigurableStreamPartitioner) {
-                               StreamNode downstreamNode = 
getStreamNode(downStreamVertexID);
-
-                               ConfigurableStreamPartitioner 
configurableStreamPartitioner = (ConfigurableStreamPartitioner) partitioner;
-
-                               // Configure the partitioner with the max 
parallelism. This is necessary if the
-                               // partitioner has been created before the 
maximum parallelism has been set. The
-                               // maximum parallelism is necessary for the key 
group mapping.
-                               
configurableStreamPartitioner.configure(downstreamNode.getMaxParallelism());
-                       }
-
                        addEdgeInternal(upStreamVertexID, downStreamVertexID, 
typeNumber, partitioner, outputNames);
                } else {
                        StreamNode upstreamNode = 
getStreamNode(upStreamVertexID);

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 7ab7494..ddd0515 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.streaming.api.transformations.SplitTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
-import org.apache.flink.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,6 +78,9 @@ public class StreamGraphGenerator {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamGraphGenerator.class);
 
+       public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 
KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
+       public static final int UPPER_BOUND_MAX_PARALLELISM = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+
        // The StreamGraph that is being built, this is initialized at the 
beginning.
        private StreamGraph streamGraph;
 
@@ -149,25 +151,11 @@ public class StreamGraphGenerator {
                if (transform.getMaxParallelism() <= 0) {
 
                        // if the max parallelism hasn't been set, then first 
use the job wide max parallelism
-                       // from theExecutionConfig. If this value has not been 
specified either, then use the
-                       // parallelism of the operator.
-                       int maxParallelism = 
env.getConfig().getMaxParallelism();
-
-                       if (maxParallelism <= 0) {
-
-                               int parallelism = transform.getParallelism();
-
-                               if(parallelism <= 0) {
-                                       parallelism = 1;
-                                       transform.setParallelism(parallelism);
-                               }
-
-                               maxParallelism = Math.max(
-                                               
MathUtils.roundUpToPowerOfTwo(parallelism + (parallelism / 2)),
-                                               
KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM);
+                       // from theExecutionConfig.
+                       int globalMaxParallelismFromConfig = 
env.getConfig().getMaxParallelism();
+                       if (globalMaxParallelismFromConfig > 0) {
+                               
transform.setMaxParallelism(globalMaxParallelismFromConfig);
                        }
-
-                       transform.setMaxParallelism(maxParallelism);
                }
 
                // call at least once to trigger exceptions about 
MissingTypeInfo

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 0d58ed2..19a3699 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -163,7 +162,6 @@ public class StreamNode implements Serializable {
         * @param maxParallelism Maximum parallelism to be set
         */
        void setMaxParallelism(int maxParallelism) {
-               Preconditions.checkArgument(maxParallelism > 0, "The maximum 
parallelism must be at least 1.");
                this.maxParallelism = maxParallelism;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index e306f30..8877c80 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -310,18 +310,7 @@ public class StreamingJobGraphGenerator {
                        parallelism = jobVertex.getParallelism();
                }
 
-               int maxParallelism = streamNode.getMaxParallelism();
-
-               // the maximum parallelism specifies the upper bound for the 
parallelism
-               if (parallelism > maxParallelism) {
-                       // the parallelism should always be smaller or equal 
than the max parallelism
-                       throw new IllegalStateException("The maximum 
parallelism (" + maxParallelism + ") of " +
-                               "the stream node " + streamNode + " is smaller 
than the parallelism (" +
-                               parallelism + "). Increase the maximum 
parallelism or decrease the parallelism of " +
-                               "this operator.");
-               } else {
-                       
jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
-               }
+               jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Parallelism set: {} for {}", parallelism, 
streamNodeId);

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index f7aecdb..5e1b3e2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.util.Preconditions;
 
@@ -205,6 +206,10 @@ public abstract class StreamTransformation<T> {
         * @param maxParallelism Maximum parallelism for this stream 
transformation.
         */
        public void setMaxParallelism(int maxParallelism) {
+               Preconditions.checkArgument(maxParallelism > 0
+                                               && maxParallelism <= 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
+                               "Maximum parallelism must be between 1 and " + 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
+                                               + ". Found: " + maxParallelism);
                this.maxParallelism = maxParallelism;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
index 256fee1..ddbdaea 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -76,6 +76,7 @@ public class KeyGroupStreamPartitioner<T, K> extends 
StreamPartitioner<T> implem
 
        @Override
        public void configure(int maxParallelism) {
+               
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
                this.maxParallelism = maxParallelism;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 7771064..6d01795 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -94,6 +95,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                try {
                        for (int i = 0; i < outEdgesInOrder.size(); i++) {
                                StreamEdge outEdge = outEdgesInOrder.get(i);
+
                                RecordWriterOutput<?> streamOutput = 
createStreamOutput(
                                                outEdge, 
chainedConfigs.get(outEdge.getSourceId()), i,
                                                
containingTask.getEnvironment(), containingTask.getName());
@@ -330,6 +332,14 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                
                ResultPartitionWriter bufferWriter = 
taskEnvironment.getWriter(outputIndex);
 
+               // we initialize the partitioner here with the number of key 
groups (aka max. parallelism)
+               if (outputPartitioner instanceof ConfigurableStreamPartitioner) 
{
+                       int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
+                       if (0 < numKeyGroups) {
+                               ((ConfigurableStreamPartitioner) 
outputPartitioner).configure(numKeyGroups);
+                       }
+               }
+
                StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> 
output = 
                                new StreamRecordWriter<>(bufferWriter, 
outputPartitioner, upStreamConfig.getBufferTimeout());
                
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());

Reply via email to