[FLINK-5040] [jobmanager] Set correct input channel types with eager scheduling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2742d5c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2742d5c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2742d5c1

Branch: refs/heads/master
Commit: 2742d5c1761ca02d871333e91a8ecbc6d0a52f6c
Parents: 0d2e8b2
Author: Ufuk Celebi <[email protected]>
Authored: Wed Nov 9 18:25:06 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Nov 10 23:06:55 2016 +0100

----------------------------------------------------------------------
 .../ResultPartitionDeploymentDescriptor.java    |  17 +-
 .../runtime/executiongraph/ExecutionVertex.java |  21 +-
 .../runtime/io/network/PartitionState.java      |  18 +-
 .../io/network/partition/ResultPartition.java   |   8 +-
 .../flink/runtime/jobgraph/ScheduleMode.java    |  10 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../InputChannelDeploymentDescriptorTest.java   | 206 +++++++++++++++++++
 ...ResultPartitionDeploymentDescriptorTest.java |   6 +-
 .../ExecutionVertexDeploymentTest.java          | 106 ++++++----
 .../network/partition/ResultPartitionTest.java  |  92 +++++++++
 .../consumer/LocalInputChannelTest.java         |   3 +-
 .../runtime/jobgraph/ScheduleModeTest.java      |  36 ++++
 .../runtime/taskmanager/TaskManagerTest.java    |  19 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   5 +-
 15 files changed, 491 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2881dde..2ecde80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -47,12 +47,16 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
        /** The number of subpartitions. */
        private final int numberOfSubpartitions;
+       
+       /** Flag whether the result partition should send 
scheduleOrUpdateConsumer messages. */
+       private final boolean lazyScheduling;
 
        public ResultPartitionDeploymentDescriptor(
                        IntermediateDataSetID resultId,
                        IntermediateResultPartitionID partitionId,
                        ResultPartitionType partitionType,
-                       int numberOfSubpartitions) {
+                       int numberOfSubpartitions,
+                       boolean lazyScheduling) {
 
                this.resultId = checkNotNull(resultId);
                this.partitionId = checkNotNull(partitionId);
@@ -60,6 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
                checkArgument(numberOfSubpartitions >= 1);
                this.numberOfSubpartitions = numberOfSubpartitions;
+               this.lazyScheduling = lazyScheduling;
        }
 
        public IntermediateDataSetID getResultId() {
@@ -78,6 +83,10 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return numberOfSubpartitions;
        }
 
+       public boolean allowLazyScheduling() {
+               return lazyScheduling;
+       }
+
        @Override
        public String toString() {
                return String.format("ResultPartitionDeploymentDescriptor 
[result id: %s, "
@@ -87,7 +96,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
        // 
------------------------------------------------------------------------
 
-       public static ResultPartitionDeploymentDescriptor 
from(IntermediateResultPartition partition) {
+       public static ResultPartitionDeploymentDescriptor 
from(IntermediateResultPartition partition, boolean lazyScheduling) {
 
                final IntermediateDataSetID resultId = 
partition.getIntermediateResult().getId();
                final IntermediateResultPartitionID partitionId = 
partition.getPartitionId();
@@ -102,13 +111,13 @@ public class ResultPartitionDeploymentDescriptor 
implements Serializable {
                if (!partition.getConsumers().isEmpty() && 
!partition.getConsumers().get(0).isEmpty()) {
 
                        if (partition.getConsumers().size() > 1) {
-                               new IllegalStateException("Currently, only a 
single consumer group per partition is supported.");
+                               throw new IllegalStateException("Currently, 
only a single consumer group per partition is supported.");
                        }
 
                        numberOfSubpartitions = 
partition.getConsumers().get(0).size();
                }
 
                return new ResultPartitionDeploymentDescriptor(
-                               resultId, partitionId, partitionType, 
numberOfSubpartitions);
+                               resultId, partitionId, partitionType, 
numberOfSubpartitions, lazyScheduling);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e7f000c..01e8660 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -568,21 +568,24 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                        ExecutionAttemptID executionId,
                        SimpleSlot targetSlot,
                        TaskStateHandles taskStateHandles,
-                       int attemptNumber) {
-
+                       int attemptNumber) throws ExecutionGraphException {
+               
                // Produced intermediate results
-               List<ResultPartitionDeploymentDescriptor> producedPartitions = 
new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
+               List<ResultPartitionDeploymentDescriptor> producedPartitions = 
new ArrayList<>(resultPartitions.size());
+               
+               // Consumed intermediate results
+               List<InputGateDeploymentDescriptor> consumedPartitions = new 
ArrayList<>(inputEdges.length);
+               
+               boolean lazyScheduling = 
getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
                for (IntermediateResultPartition partition : 
resultPartitions.values()) {
-                       
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition));
+                       
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, 
lazyScheduling));
                }
-
-               // Consumed intermediate results
-               List<InputGateDeploymentDescriptor> consumedPartitions = new 
ArrayList<InputGateDeploymentDescriptor>();
-
+               
+               
                for (ExecutionEdge[] edges : inputEdges) {
                        InputChannelDeploymentDescriptor[] partitions = 
InputChannelDeploymentDescriptor
-                                       .fromEdges(edges, targetSlot);
+                                       .fromEdges(edges, targetSlot, 
lazyScheduling);
 
                        // If the produced partition has multiple consumers 
registered, we
                        // need to request the one matching our sub task index.

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
index 083412b..59357fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
@@ -23,18 +23,25 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
 /**
  * Contains information about the state of a result partition.
  */
-public class PartitionState {
+public class PartitionState implements Serializable {
+
+       private static final long serialVersionUID = -4693651272083825031L;
+
        private final IntermediateDataSetID intermediateDataSetID;
        private final IntermediateResultPartitionID 
intermediateResultPartitionID;
        private final ExecutionState executionState;
 
        public PartitionState(
-                       IntermediateDataSetID intermediateDataSetID,
-                       IntermediateResultPartitionID 
intermediateResultPartitionID,
-                       ExecutionState executionState) {
+               IntermediateDataSetID intermediateDataSetID,
+               IntermediateResultPartitionID intermediateResultPartitionID,
+               @Nullable ExecutionState executionState) {
+
                this.intermediateDataSetID = 
Preconditions.checkNotNull(intermediateDataSetID);
                this.intermediateResultPartitionID = 
Preconditions.checkNotNull(intermediateResultPartitionID);
                this.executionState = executionState;
@@ -48,6 +55,9 @@ public class PartitionState {
                return intermediateResultPartitionID;
        }
 
+       /**
+        * Returns the execution state of the partition producer or 
<code>null</code> if it is not available.
+        */
        public ExecutionState getExecutionState() {
                return executionState;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 034b27a..834318c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -96,6 +96,8 @@ public class ResultPartition implements BufferPoolOwner {
 
        private final ResultPartitionConsumableNotifier 
partitionConsumableNotifier;
 
+       private final boolean sendScheduleOrUpdateConsumersMessage;
+
        // - Runtime state 
--------------------------------------------------------
 
        private final AtomicBoolean isReleased = new AtomicBoolean();
@@ -133,7 +135,8 @@ public class ResultPartition implements BufferPoolOwner {
                ResultPartitionManager partitionManager,
                ResultPartitionConsumableNotifier partitionConsumableNotifier,
                IOManager ioManager,
-               IOMode defaultIoMode) {
+               IOMode defaultIoMode,
+               boolean sendScheduleOrUpdateConsumersMessage) {
 
                this.owningTaskName = checkNotNull(owningTaskName);
                this.taskActions = checkNotNull(taskActions);
@@ -143,6 +146,7 @@ public class ResultPartition implements BufferPoolOwner {
                this.subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
                this.partitionManager = checkNotNull(partitionManager);
                this.partitionConsumableNotifier = 
checkNotNull(partitionConsumableNotifier);
+               this.sendScheduleOrUpdateConsumersMessage = 
sendScheduleOrUpdateConsumersMessage;
 
                // Create the subpartitions.
                switch (partitionType) {
@@ -437,7 +441,7 @@ public class ResultPartition implements BufferPoolOwner {
         * Notifies pipelined consumers of this result partition once.
         */
        private void notifyPipelinedConsumers() {
-               if (partitionType.isPipelined() && 
!hasNotifiedPipelinedConsumers) {
+               if (sendScheduleOrUpdateConsumersMessage && 
!hasNotifiedPipelinedConsumers && partitionType.isPipelined()) {
                        
partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, 
taskActions);
 
                        hasNotifiedPipelinedConsumers = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
index 9405067..6a98e46 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
@@ -27,5 +27,13 @@ public enum ScheduleMode {
        LAZY_FROM_SOURCES,
 
        /** Schedules all tasks immediately. */
-       EAGER
+       EAGER;
+       
+       /**
+        * Returns whether we are allowed to deploy consumers lazily.
+        */
+       public boolean allowLazyDeployment() {
+               return this == LAZY_FROM_SOURCES;
+       }
+       
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 6907606..4f3dd54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -345,7 +345,8 @@ public class Task implements Runnable, TaskActions {
                                networkEnvironment.getResultPartitionManager(),
                                resultPartitionConsumableNotifier,
                                ioManager,
-                               networkEnvironment.getDefaultIOMode());
+                               networkEnvironment.getDefaultIOMode(),
+                               desc.allowLazyScheduling());
 
                        writers[counter] = new 
ResultPartitionWriter(producedPartitions[counter]);
 
@@ -568,6 +569,7 @@ public class Task implements Runnable, TaskActions {
                        // 
----------------------------------------------------------------
 
                        LOG.info("Registering task at network: " + this);
+
                        network.registerTask(this);
 
                        // next, kick off the background copying of files for 
the distributed cache
@@ -1135,7 +1137,11 @@ public class Task implements Runnable, TaskActions {
                        final SingleInputGate inputGate = 
inputGatesById.get(resultId);
 
                        if (inputGate != null) {
-                               if (partitionState == ExecutionState.RUNNING) {
+                               if (partitionState == ExecutionState.RUNNING ||
+                                       partitionState == 
ExecutionState.FINISHED ||
+                                       partitionState == 
ExecutionState.SCHEDULED ||
+                                       partitionState == 
ExecutionState.DEPLOYING) {
+
                                        // Retrigger the partition request
                                        
inputGate.retriggerPartitionRequest(partitionId);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9af5355..b2e1002 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -948,7 +948,7 @@ class JobManager(
           if (execution != null) execution.getState else null
         case None =>
           // Nothing to do. This is not an error, because the request is 
received when a sending
-          // task fails during a remote partition request.
+          // task fails or is not yet available during a remote partition 
request.
           log.debug(s"Cannot find execution graph for job $jobId.")
 
           null

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
new file mode 100644
index 0000000..e9e8901
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Test;
+
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class InputChannelDeploymentDescriptorTest {
+
+       /**
+        * Tests the deployment descriptors for local, remote, and unknown 
partition
+        * locations (with lazy deployment allowed and all execution states for 
the
+        * producers).
+        */
+       @Test
+       public void testMixedLocalRemoteUnknownDeployment() throws Exception {
+               boolean allowLazyDeployment = true;
+
+               ResourceID consumerResourceId = ResourceID.generate();
+               ExecutionVertex consumer = mock(ExecutionVertex.class);
+               SimpleSlot consumerSlot = mockSlot(consumerResourceId);
+
+               // Local and remote channel are only allowed for certain 
execution
+               // states.
+               for (ExecutionState state : ExecutionState.values()) {
+                       // Local partition
+                       ExecutionVertex localProducer = 
mockExecutionVertex(state, consumerResourceId);
+                       IntermediateResultPartition localPartition = 
mockPartition(localProducer);
+                       ResultPartitionID localPartitionId = new 
ResultPartitionID(localPartition.getPartitionId(), 
localProducer.getCurrentExecutionAttempt().getAttemptId());
+                       ExecutionEdge localEdge = new 
ExecutionEdge(localPartition, consumer, 0);
+
+                       // Remote partition
+                       ExecutionVertex remoteProducer = 
mockExecutionVertex(state, ResourceID.generate()); // new resource ID
+                       IntermediateResultPartition remotePartition = 
mockPartition(remoteProducer);
+                       ResultPartitionID remotePartitionId = new 
ResultPartitionID(remotePartition.getPartitionId(), 
remoteProducer.getCurrentExecutionAttempt().getAttemptId());
+                       ConnectionID remoteConnectionId = new 
ConnectionID(remoteProducer.getCurrentAssignedResource().getTaskManagerLocation(),
 0);
+                       ExecutionEdge remoteEdge = new 
ExecutionEdge(remotePartition, consumer, 1);
+
+                       // Unknown partition
+                       ExecutionVertex unknownProducer = 
mockExecutionVertex(state, null); // no assigned resource
+                       IntermediateResultPartition unknownPartition = 
mockPartition(unknownProducer);
+                       ResultPartitionID unknownPartitionId = new 
ResultPartitionID(unknownPartition.getPartitionId(), 
unknownProducer.getCurrentExecutionAttempt().getAttemptId());
+                       ExecutionEdge unknownEdge = new 
ExecutionEdge(unknownPartition, consumer, 2);
+
+                       InputChannelDeploymentDescriptor[] desc = 
InputChannelDeploymentDescriptor.fromEdges(
+                               new ExecutionEdge[]{localEdge, remoteEdge, 
unknownEdge},
+                               consumerSlot,
+                               allowLazyDeployment);
+
+                       assertEquals(3, desc.length);
+
+                       // These states are allowed
+                       if (state == ExecutionState.RUNNING || state == 
ExecutionState.FINISHED ||
+                               state == ExecutionState.SCHEDULED || state == 
ExecutionState.DEPLOYING) {
+
+                               // Create local or remote channels
+                               assertEquals(localPartitionId, 
desc[0].getConsumedPartitionId());
+                               
assertTrue(desc[0].getConsumedPartitionLocation().isLocal());
+                               
assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+                               assertEquals(remotePartitionId, 
desc[1].getConsumedPartitionId());
+                               
assertTrue(desc[1].getConsumedPartitionLocation().isRemote());
+                               assertEquals(remoteConnectionId, 
desc[1].getConsumedPartitionLocation().getConnectionId());
+                       } else {
+                               // Unknown (lazy deployment allowed)
+                               assertEquals(localPartitionId, 
desc[0].getConsumedPartitionId());
+                               
assertTrue(desc[0].getConsumedPartitionLocation().isUnknown());
+                               
assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+                               assertEquals(remotePartitionId, 
desc[1].getConsumedPartitionId());
+                               
assertTrue(desc[1].getConsumedPartitionLocation().isUnknown());
+                               
assertNull(desc[1].getConsumedPartitionLocation().getConnectionId());
+                       }
+
+                       assertEquals(unknownPartitionId, 
desc[2].getConsumedPartitionId());
+                       
assertTrue(desc[2].getConsumedPartitionLocation().isUnknown());
+                       
assertNull(desc[2].getConsumedPartitionLocation().getConnectionId());
+               }
+       }
+
+       @Test
+       public void testUnknownChannelWithoutLazyDeploymentThrows() throws 
Exception {
+               ResourceID consumerResourceId = ResourceID.generate();
+               ExecutionVertex consumer = mock(ExecutionVertex.class);
+               SimpleSlot consumerSlot = mockSlot(consumerResourceId);
+
+
+               // Unknown partition
+               ExecutionVertex unknownProducer = 
mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource
+               IntermediateResultPartition unknownPartition = 
mockPartition(unknownProducer);
+               ResultPartitionID unknownPartitionId = new 
ResultPartitionID(unknownPartition.getPartitionId(), 
unknownProducer.getCurrentExecutionAttempt().getAttemptId());
+               ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, 
consumer, 2);
+
+               // This should work if lazy deployment is allowed
+               boolean allowLazyDeployment = true;
+
+               InputChannelDeploymentDescriptor[] desc = 
InputChannelDeploymentDescriptor.fromEdges(
+                       new ExecutionEdge[]{unknownEdge},
+                       consumerSlot,
+                       allowLazyDeployment);
+
+               assertEquals(1, desc.length);
+
+               assertEquals(unknownPartitionId, 
desc[0].getConsumedPartitionId());
+               assertTrue(desc[0].getConsumedPartitionLocation().isUnknown());
+               
assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+
+               try {
+                       // Fail if lazy deployment is *not* allowed
+                       allowLazyDeployment = false;
+
+                       InputChannelDeploymentDescriptor.fromEdges(
+                               new ExecutionEdge[]{unknownEdge},
+                               consumerSlot,
+                               allowLazyDeployment);
+
+                       fail("Did not throw expected ExecutionGraphException");
+               } catch (ExecutionGraphException ignored) {
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static SimpleSlot mockSlot(ResourceID resourceId) {
+               SimpleSlot slot = mock(SimpleSlot.class);
+               when(slot.getTaskManagerLocation()).thenReturn(new 
TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 5000));
+               when(slot.getTaskManagerID()).thenReturn(resourceId);
+
+               return slot;
+       }
+
+       private static ExecutionVertex mockExecutionVertex(ExecutionState 
state, ResourceID resourceId) {
+               ExecutionVertex vertex = mock(ExecutionVertex.class);
+
+               Execution exec = mock(Execution.class);
+               when(exec.getState()).thenReturn(state);
+               when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
+
+               if (resourceId != null) {
+                       SimpleSlot slot = mockSlot(resourceId);
+                       when(exec.getAssignedResource()).thenReturn(slot);
+                       
when(vertex.getCurrentAssignedResource()).thenReturn(slot);
+               } else {
+                       when(exec.getAssignedResource()).thenReturn(null); // 
no resource
+                       
when(vertex.getCurrentAssignedResource()).thenReturn(null);
+               }
+
+               when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+               return vertex;
+       }
+
+       private static IntermediateResultPartition 
mockPartition(ExecutionVertex producer) {
+               IntermediateResultPartition partition = 
mock(IntermediateResultPartition.class);
+               when(partition.isConsumable()).thenReturn(true);
+
+               IntermediateResult result = mock(IntermediateResult.class);
+               when(result.getConnectionIndex()).thenReturn(0);
+
+               when(partition.getIntermediateResult()).thenReturn(result);
+               when(partition.getPartitionId()).thenReturn(new 
IntermediateResultPartitionID());
+
+               when(partition.getProducer()).thenReturn(producer);
+
+               return partition;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 4b1e546..4223b49 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ResultPartitionDeploymentDescriptorTest {
 
@@ -38,14 +39,14 @@ public class ResultPartitionDeploymentDescriptorTest {
                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
                ResultPartitionType partitionType = 
ResultPartitionType.PIPELINED;
                int numberOfSubpartitions = 24;
-               boolean eagerlyDeployConsumers = true;
 
                ResultPartitionDeploymentDescriptor orig =
                                new ResultPartitionDeploymentDescriptor(
                                                resultId,
                                                partitionId,
                                                partitionType,
-                                               numberOfSubpartitions);
+                                               numberOfSubpartitions,
+                                               true);
 
                ResultPartitionDeploymentDescriptor copy =
                                CommonTestUtils.createCopySerializable(orig);
@@ -54,5 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
                assertEquals(partitionId, copy.getPartitionId());
                assertEquals(partitionType, copy.getPartitionType());
                assertEquals(numberOfSubpartitions, 
copy.getNumberOfSubpartitions());
+               assertTrue(copy.allowLazyScheduling());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 54aeff9..8bc39a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -18,20 +18,37 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-
-import static org.junit.Assert.*;
-
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
 import org.junit.Test;
 
+import java.util.Collection;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ERROR_MESSAGE;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleFailingActorGateway;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class ExecutionVertexDeploymentTest {
 
        @Test
@@ -48,7 +65,7 @@ public class ExecutionVertexDeploymentTest {
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        vertex.deployToSlot(slot);
@@ -58,8 +75,7 @@ public class ExecutionVertexDeploymentTest {
                        try {
                                vertex.deployToSlot(slot);
                                fail("Scheduled from wrong state");
-                       }
-                       catch (IllegalStateException e) {
+                       } catch (IllegalStateException e) {
                                // as expected
                        }
 
@@ -67,8 +83,7 @@ public class ExecutionVertexDeploymentTest {
 
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
@@ -82,12 +97,12 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid, 
TestingUtils.directExecutionContext());
 
                        final Instance instance = getInstance(
-                                       new ActorTaskManagerGateway(
-                                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())));
+                               new ActorTaskManagerGateway(
+                                       new 
SimpleActorGateway(TestingUtils.directExecutionContext())));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 
@@ -99,8 +114,7 @@ public class ExecutionVertexDeploymentTest {
                        try {
                                vertex.deployToSlot(slot);
                                fail("Scheduled from wrong state");
-                       }
-                       catch (IllegalStateException e) {
+                       } catch (IllegalStateException e) {
                                // as expected
                        }
 
@@ -109,8 +123,7 @@ public class ExecutionVertexDeploymentTest {
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
@@ -123,7 +136,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout());
 
                        final Instance instance = getInstance(
                                new ActorTaskManagerGateway(
@@ -138,8 +151,7 @@ public class ExecutionVertexDeploymentTest {
                        try {
                                vertex.deployToSlot(slot);
                                fail("Scheduled from wrong state");
-                       }
-                       catch (IllegalStateException e) {
+                       } catch (IllegalStateException e) {
                                // as expected
                        }
 
@@ -149,16 +161,14 @@ public class ExecutionVertexDeploymentTest {
                        try {
                                vertex.deployToSlot(slot);
                                fail("Scheduled from wrong state");
-                       }
-                       catch (IllegalStateException e) {
+                       } catch (IllegalStateException e) {
                                // as expected
                        }
 
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
@@ -171,7 +181,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid, 
TestingUtils.directExecutionContext());
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout());
 
                        final Instance instance = getInstance(
                                new ActorTaskManagerGateway(
@@ -189,8 +199,7 @@ public class ExecutionVertexDeploymentTest {
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
@@ -202,7 +211,7 @@ public class ExecutionVertexDeploymentTest {
                        final JobVertexID jid = new JobVertexID();
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout());
 
                        final Instance instance = getInstance(
                                new ActorTaskManagerGateway(
@@ -229,8 +238,7 @@ public class ExecutionVertexDeploymentTest {
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
@@ -247,7 +255,7 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionJobVertex ejv = getExecutionVertex(jid, 
ec);
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout());
 
                        final Instance instance = getInstance(
                                new ActorTaskManagerGateway(
@@ -270,8 +278,7 @@ public class ExecutionVertexDeploymentTest {
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
@@ -288,7 +295,7 @@ public class ExecutionVertexDeploymentTest {
 
                        final ExecutionJobVertex ejv = getExecutionVertex(jid, 
context);
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.getDefaultTimeout());
+                               AkkaUtils.getDefaultTimeout());
 
                        final ExecutionAttemptID eid = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
@@ -334,10 +341,37 @@ public class ExecutionVertexDeploymentTest {
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
 
                        assertTrue(queue.isEmpty());
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
        }
+
+       /**
+        * Tests that the lazy scheduling flag is correctly forwarded to the 
produced partition descriptors.
+        */
+       @Test
+       public void testTddProducedPartitionsLazyScheduling() throws Exception {
+               TestingUtils.QueuedActionExecutionContext context = 
TestingUtils.queuedActionExecutionContext();
+               ExecutionJobVertex jobVertex = getExecutionVertex(new 
JobVertexID(), context);
+               IntermediateResult result = new IntermediateResult(new 
IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED);
+               ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new 
IntermediateResult[]{result}, Time.minutes(1));
+
+               Slot root = mock(Slot.class);
+               when(root.getSlotNumber()).thenReturn(1);
+               SimpleSlot slot = mock(SimpleSlot.class);
+               when(slot.getRoot()).thenReturn(root);
+
+               for (ScheduleMode mode : ScheduleMode.values()) {
+                       vertex.getExecutionGraph().setScheduleMode(mode);
+
+                       TaskDeploymentDescriptor tdd = 
vertex.createDeploymentDescriptor(new ExecutionAttemptID(), slot, null, 1);
+
+                       Collection<ResultPartitionDeploymentDescriptor> 
producedPartitions = tdd.getProducedPartitions();
+
+                       assertEquals(1, producedPartitions.size());
+                       ResultPartitionDeploymentDescriptor desc = 
producedPartitions.iterator().next();
+                       assertEquals(mode.allowLazyDeployment(), 
desc.sendScheduleOrUpdateConsumersMessage());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
new file mode 100644
index 0000000..f6fddfa
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class ResultPartitionTest {
+
+       /**
+        * Tests the schedule or update consumers message sending behaviour 
depending on the relevant flags.
+        */
+       @Test
+       public void testSendScheduleOrUpdateConsumersMessage() throws Exception 
{
+               {
+                       // Pipelined, send message => notify
+                       ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+                       ResultPartition partition = createPartition(notifier, 
ResultPartitionType.PIPELINED, true);
+                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       verify(notifier, 
times(1)).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
+               }
+
+               {
+                       // Pipelined, don't send message => don't notify
+                       ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+                       ResultPartition partition = createPartition(notifier, 
ResultPartitionType.PIPELINED, false);
+                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
+               }
+
+               {
+                       // Blocking, send message => don't notify
+                       ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+                       ResultPartition partition = createPartition(notifier, 
ResultPartitionType.BLOCKING, true);
+                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
+               }
+
+               {
+                       // Blocking, don't send message => don't notify
+                       ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+                       ResultPartition partition = createPartition(notifier, 
ResultPartitionType.BLOCKING, false);
+                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static ResultPartition createPartition(
+               ResultPartitionConsumableNotifier notifier,
+               ResultPartitionType type,
+               boolean sendScheduleOrUpdateConsumersMessage) {
+               return new ResultPartition(
+                       "TestTask",
+                       mock(TaskActions.class),
+                       new JobID(),
+                       new ResultPartitionID(),
+                       type,
+                       1,
+                       mock(ResultPartitionManager.class),
+                       notifier,
+                       mock(IOManager.class),
+                       IOManager.IOMode.SYNC,
+                       sendScheduleOrUpdateConsumersMessage);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 2d3797d..4ca1d1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -124,7 +124,8 @@ public class LocalInputChannelTest {
                                partitionManager,
                                partitionConsumableNotifier,
                                ioManager,
-                               ASYNC);
+                               ASYNC,
+                               true);
 
                        // Create a buffer pool for this partition
                        partition.registerBufferPool(

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
new file mode 100644
index 0000000..144ef12
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ScheduleModeTest {
+
+       /**
+        * Test that schedule modes set the lazy deployment flag correctly.
+        */
+       @Test
+       public void testAllowLazyDeployment() throws Exception {
+               
assertTrue(ScheduleMode.LAZY_FROM_SOURCES.allowLazyDeployment());
+               assertFalse(ScheduleMode.EAGER.allowLazyDeployment());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 15947f9..22f0c60 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import akka.actor.*;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.actor.Status;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -31,7 +35,11 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.deployment.*;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
@@ -622,7 +630,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, true));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -767,7 +775,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, true));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -1419,7 +1427,8 @@ public class TaskManagerTest extends TestLogger {
                                new IntermediateDataSetID(),
                                new IntermediateResultPartitionID(),
                                ResultPartitionType.PIPELINED,
-                               1);
+                               1,
+                               true);
 
                        final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
                                "TestTask", 1, 0, 1, 0, new Configuration(), 
new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 1eebe12..5d26050 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -553,6 +553,9 @@ public class TaskTest extends TestLogger {
                }
 
                expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
+               expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
+               expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
+               expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
 
                expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
                expected.put(ExecutionState.CANCELING, 
ExecutionState.CANCELING);
@@ -568,7 +571,7 @@ public class TaskTest extends TestLogger {
                        assertEquals(expected.get(state), newTaskState);
                }
 
-               verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
+               verify(inputGate, 
times(4)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
        }
 
        /**

Reply via email to