Repository: flink
Updated Branches:
  refs/heads/release-1.1 9a19ca115 -> 0bd8e0279


[FLINK-5040] [jobmanager] Set correct input channel types with eager scheduling

This closes #2784.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55c506f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55c506f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55c506f2

Branch: refs/heads/release-1.1
Commit: 55c506f2ee58f70d9220d507256146df2a434381
Parents: b5a4cb6
Author: Ufuk Celebi <[email protected]>
Authored: Wed Nov 9 18:25:06 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Nov 10 21:53:30 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java       |  12 +-
 .../ResultPartitionDeploymentDescriptor.java    |  24 ++-
 .../runtime/executiongraph/ExecutionVertex.java |  14 +-
 .../io/network/partition/ResultPartition.java   |  10 +-
 .../flink/runtime/jobgraph/ScheduleMode.java    |  10 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../InputChannelDeploymentDescriptorTest.java   | 202 +++++++++++++++++++
 ...ResultPartitionDeploymentDescriptorTest.java |   6 +-
 .../ExecutionVertexDeploymentTest.java          |  60 +++++-
 .../network/partition/ResultPartitionTest.java  |  90 +++++++++
 .../consumer/LocalInputChannelTest.java         |   5 +-
 .../runtime/jobgraph/ScheduleModeTest.java      |  37 ++++
 .../runtime/taskmanager/TaskManagerTest.java    |   9 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   5 +-
 15 files changed, 454 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index e00a480..6b87e69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -85,7 +85,7 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
         * Creates an input channel deployment descriptor for each partition.
         */
        public static InputChannelDeploymentDescriptor[] fromEdges(
-                       ExecutionEdge[] edges, SimpleSlot consumerSlot) {
+                       ExecutionEdge[] edges, SimpleSlot consumerSlot, boolean 
allowLazyDeployment) {
 
                final InputChannelDeploymentDescriptor[] icdd = new 
InputChannelDeploymentDescriptor[edges.length];
 
@@ -101,8 +101,10 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
 
                        // The producing task needs to be RUNNING or already 
FINISHED
                        if (consumedPartition.isConsumable() && producerSlot != 
null &&
-                                       (producerState == ExecutionState.RUNNING
-                                                       || producerState == 
ExecutionState.FINISHED)) {
+                                       (producerState == 
ExecutionState.RUNNING ||
+                                               producerState == 
ExecutionState.FINISHED ||
+                                               producerState == 
ExecutionState.SCHEDULED ||
+                                               producerState == 
ExecutionState.DEPLOYING)) {
 
                                final Instance partitionInstance = 
producerSlot.getInstance();
 
@@ -119,9 +121,11 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
                                        partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);
                                }
                        }
-                       else {
+                       else if (allowLazyDeployment) {
                                // The producing task might not have registered 
the partition yet
                                partitionLocation = 
ResultPartitionLocation.createUnknown();
+                       } else {
+                               throw new IllegalStateException("Trying to 
eagerly schedule a task whose inputs are not ready.");
                        }
 
                        final ResultPartitionID consumedPartitionId = new 
ResultPartitionID(

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index ecdacbb..2ecde80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -47,12 +47,16 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
        /** The number of subpartitions. */
        private final int numberOfSubpartitions;
+       
+       /** Flag whether the result partition should send 
scheduleOrUpdateConsumer messages. */
+       private final boolean lazyScheduling;
 
        public ResultPartitionDeploymentDescriptor(
-               IntermediateDataSetID resultId,
-               IntermediateResultPartitionID partitionId,
-               ResultPartitionType partitionType,
-               int numberOfSubpartitions) {
+                       IntermediateDataSetID resultId,
+                       IntermediateResultPartitionID partitionId,
+                       ResultPartitionType partitionType,
+                       int numberOfSubpartitions,
+                       boolean lazyScheduling) {
 
                this.resultId = checkNotNull(resultId);
                this.partitionId = checkNotNull(partitionId);
@@ -60,6 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
                checkArgument(numberOfSubpartitions >= 1);
                this.numberOfSubpartitions = numberOfSubpartitions;
+               this.lazyScheduling = lazyScheduling;
        }
 
        public IntermediateDataSetID getResultId() {
@@ -78,6 +83,10 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return numberOfSubpartitions;
        }
 
+       public boolean allowLazyScheduling() {
+               return lazyScheduling;
+       }
+
        @Override
        public String toString() {
                return String.format("ResultPartitionDeploymentDescriptor 
[result id: %s, "
@@ -87,7 +96,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
        // 
------------------------------------------------------------------------
 
-       public static ResultPartitionDeploymentDescriptor 
from(IntermediateResultPartition partition) {
+       public static ResultPartitionDeploymentDescriptor 
from(IntermediateResultPartition partition, boolean lazyScheduling) {
 
                final IntermediateDataSetID resultId = 
partition.getIntermediateResult().getId();
                final IntermediateResultPartitionID partitionId = 
partition.getPartitionId();
@@ -102,14 +111,13 @@ public class ResultPartitionDeploymentDescriptor 
implements Serializable {
                if (!partition.getConsumers().isEmpty() && 
!partition.getConsumers().get(0).isEmpty()) {
 
                        if (partition.getConsumers().size() > 1) {
-                               new IllegalStateException("Currently, only a 
single consumer group per partition is supported.");
+                               throw new IllegalStateException("Currently, 
only a single consumer group per partition is supported.");
                        }
 
                        numberOfSubpartitions = 
partition.getConsumers().get(0).size();
                }
 
                return new ResultPartitionDeploymentDescriptor(
-                               resultId, partitionId, partitionType, 
numberOfSubpartitions
-               );
+                               resultId, partitionId, partitionType, 
numberOfSubpartitions, lazyScheduling);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 309548d..c101548 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -638,18 +638,20 @@ public class ExecutionVertex implements Serializable {
                        int attemptNumber) {
 
                // Produced intermediate results
-               List<ResultPartitionDeploymentDescriptor> producedPartitions = 
new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
+               List<ResultPartitionDeploymentDescriptor> producedPartitions = 
new ArrayList<>(resultPartitions.size());
+               
+               // Consumed intermediate results
+               List<InputGateDeploymentDescriptor> consumedPartitions = new 
ArrayList<>(inputEdges.length);
+               
+               boolean lazyScheduling = 
getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
                for (IntermediateResultPartition partition : 
resultPartitions.values()) {
-                       
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition));
+                       
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, 
lazyScheduling));
                }
 
-               // Consumed intermediate results
-               List<InputGateDeploymentDescriptor> consumedPartitions = new 
ArrayList<InputGateDeploymentDescriptor>();
-
                for (ExecutionEdge[] edges : inputEdges) {
                        InputChannelDeploymentDescriptor[] partitions = 
InputChannelDeploymentDescriptor
-                                       .fromEdges(edges, targetSlot);
+                                       .fromEdges(edges, targetSlot, 
lazyScheduling);
 
                        // If the produced partition has multiple consumers 
registered, we
                        // need to request the one matching our sub task index.

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index a60f95d..c30f333 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -93,6 +93,8 @@ public class ResultPartition implements BufferPoolOwner {
 
        private final ResultPartitionConsumableNotifier 
partitionConsumableNotifier;
 
+       private final boolean sendScheduleOrUpdateConsumersMessage;
+
        // - Runtime state 
--------------------------------------------------------
 
        private final AtomicBoolean isReleased = new AtomicBoolean();
@@ -129,7 +131,8 @@ public class ResultPartition implements BufferPoolOwner {
                ResultPartitionManager partitionManager,
                ResultPartitionConsumableNotifier partitionConsumableNotifier,
                IOManager ioManager,
-               IOMode defaultIoMode) {
+               IOMode defaultIoMode,
+               boolean sendScheduleOrUpdateConsumersMessage) {
 
                this.owningTaskName = checkNotNull(owningTaskName);
                this.jobId = checkNotNull(jobId);
@@ -138,6 +141,7 @@ public class ResultPartition implements BufferPoolOwner {
                this.subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
                this.partitionManager = checkNotNull(partitionManager);
                this.partitionConsumableNotifier = 
checkNotNull(partitionConsumableNotifier);
+               this.sendScheduleOrUpdateConsumersMessage = 
sendScheduleOrUpdateConsumersMessage;
 
                // Create the subpartitions.
                switch (partitionType) {
@@ -417,8 +421,8 @@ public class ResultPartition implements BufferPoolOwner {
        /**
         * Notifies pipelined consumers of this result partition once.
         */
-       private void notifyPipelinedConsumers() throws IOException {
-               if (partitionType.isPipelined() && 
!hasNotifiedPipelinedConsumers) {
+       private void notifyPipelinedConsumers() {
+               if (sendScheduleOrUpdateConsumersMessage && 
!hasNotifiedPipelinedConsumers && partitionType.isPipelined()) {
                        
partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId);
 
                        hasNotifiedPipelinedConsumers = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
index 330519d..78b7b45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
@@ -30,6 +30,14 @@ public enum ScheduleMode {
        /**
         * Schedule tasks all at once instead of lazy deployment of receiving 
tasks.
         */
-       ALL
+       ALL;
+
+       /**
+        * Returns whether we are allowed to deploy consumers lazily.
+        */
+       public boolean allowLazyDeployment() {
+               return this != ALL;
+       }
 
 }
+

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dd14aaf..2179fc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -336,7 +336,8 @@ public class Task implements Runnable {
                                        
networkEnvironment.getPartitionManager(),
                                        
networkEnvironment.getPartitionConsumableNotifier(),
                                        ioManager,
-                                       networkEnvironment.getDefaultIOMode());
+                                       networkEnvironment.getDefaultIOMode(),
+                                       desc.allowLazyScheduling());
 
                        writers[counter] = new 
ResultPartitionWriter(producedPartitions[counter]);
 
@@ -1088,7 +1089,11 @@ public class Task implements Runnable {
                        final SingleInputGate inputGate = 
inputGatesById.get(resultId);
 
                        if (inputGate != null) {
-                               if (partitionState == ExecutionState.RUNNING) {
+                               if (partitionState == ExecutionState.RUNNING ||
+                                       partitionState == 
ExecutionState.FINISHED ||
+                                       partitionState == 
ExecutionState.SCHEDULED ||
+                                       partitionState == 
ExecutionState.DEPLOYING) {
+
                                        // Retrigger the partition request
                                        
inputGate.retriggerPartitionRequest(partitionId);
                                }
@@ -1245,7 +1250,6 @@ public class Task implements Runnable {
                        try {
                                if (watchDogThread != null) {
                                        watchDogThread.start();
-                                       logger.info("Started cancellation watch 
dog");
                                }
 
                                // the user-defined cancel method may throw 
errors.

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 106ffb6..41218c9 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -855,7 +855,7 @@ class JobManager(
           if (execution != null) execution.getState else null
         case None =>
           // Nothing to do. This is not an error, because the request is 
received when a sending
-          // task fails during a remote partition request.
+          // task fails or is not yet available during a remote partition 
request.
           log.debug(s"Cannot find execution graph for job $jobId.")
 
           null

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
new file mode 100644
index 0000000..cda0f4d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.junit.Test;
+
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class InputChannelDeploymentDescriptorTest {
+
+       /**
+        * Tests the deployment descriptors for local, remote, and unknown 
partition
+        * locations (with lazy deployment allowed and all execution states for 
the
+        * producers).
+        */
+       @Test
+       public void testMixedLocalRemoteUnknownDeployment() throws Exception {
+               boolean allowLazyDeployment = true;
+
+               ExecutionVertex consumer = mock(ExecutionVertex.class);
+               SimpleSlot consumerSlot = mockSlot(createConnInfo(5000));
+
+               // Local and remote channel are only allowed for certain 
execution
+               // states.
+               for (ExecutionState state : ExecutionState.values()) {
+                       // Local partition
+                       ExecutionVertex localProducer = 
mockExecutionVertex(state, consumerSlot);
+                       IntermediateResultPartition localPartition = 
mockPartition(localProducer);
+                       ResultPartitionID localPartitionId = new 
ResultPartitionID(localPartition.getPartitionId(), 
localProducer.getCurrentExecutionAttempt().getAttemptId());
+                       ExecutionEdge localEdge = new 
ExecutionEdge(localPartition, consumer, 0);
+
+                       // Remote partition
+                       InstanceConnectionInfo connInfo = createConnInfo(6000);
+                       ExecutionVertex remoteProducer = 
mockExecutionVertex(state, mockSlot(connInfo)); // new slot
+                       IntermediateResultPartition remotePartition = 
mockPartition(remoteProducer);
+                       ResultPartitionID remotePartitionId = new 
ResultPartitionID(remotePartition.getPartitionId(), 
remoteProducer.getCurrentExecutionAttempt().getAttemptId());
+                       ConnectionID remoteConnectionId = new 
ConnectionID(connInfo, 0);
+                       ExecutionEdge remoteEdge = new 
ExecutionEdge(remotePartition, consumer, 1);
+
+                       // Unknown partition
+                       ExecutionVertex unknownProducer = 
mockExecutionVertex(state, null); // no assigned resource
+                       IntermediateResultPartition unknownPartition = 
mockPartition(unknownProducer);
+                       ResultPartitionID unknownPartitionId = new 
ResultPartitionID(unknownPartition.getPartitionId(), 
unknownProducer.getCurrentExecutionAttempt().getAttemptId());
+                       ExecutionEdge unknownEdge = new 
ExecutionEdge(unknownPartition, consumer, 2);
+
+                       InputChannelDeploymentDescriptor[] desc = 
InputChannelDeploymentDescriptor.fromEdges(
+                               new ExecutionEdge[]{localEdge, remoteEdge, 
unknownEdge},
+                               consumerSlot,
+                               allowLazyDeployment);
+
+                       assertEquals(3, desc.length);
+
+                       // These states are allowed
+                       if (state == ExecutionState.RUNNING || state == 
ExecutionState.FINISHED ||
+                               state == ExecutionState.SCHEDULED || state == 
ExecutionState.DEPLOYING) {
+
+                               // Create local or remote channels
+                               assertEquals(localPartitionId, 
desc[0].getConsumedPartitionId());
+                               
assertTrue(desc[0].getConsumedPartitionLocation().isLocal());
+                               
assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+                               assertEquals(remotePartitionId, 
desc[1].getConsumedPartitionId());
+                               
assertTrue(desc[1].getConsumedPartitionLocation().isRemote());
+                               assertEquals(remoteConnectionId, 
desc[1].getConsumedPartitionLocation().getConnectionId());
+                       } else {
+                               // Unknown (lazy deployment allowed)
+                               assertEquals(localPartitionId, 
desc[0].getConsumedPartitionId());
+                               
assertTrue(desc[0].getConsumedPartitionLocation().isUnknown());
+                               
assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+                               assertEquals(remotePartitionId, 
desc[1].getConsumedPartitionId());
+                               
assertTrue(desc[1].getConsumedPartitionLocation().isUnknown());
+                               
assertNull(desc[1].getConsumedPartitionLocation().getConnectionId());
+                       }
+
+                       assertEquals(unknownPartitionId, 
desc[2].getConsumedPartitionId());
+                       
assertTrue(desc[2].getConsumedPartitionLocation().isUnknown());
+                       
assertNull(desc[2].getConsumedPartitionLocation().getConnectionId());
+               }
+       }
+
+       @Test
+       public void testUnknownChannelWithoutLazyDeploymentThrows() throws 
Exception {
+               ExecutionVertex consumer = mock(ExecutionVertex.class);
+               SimpleSlot consumerSlot = mock(SimpleSlot.class);
+
+               // Unknown partition
+               ExecutionVertex unknownProducer = 
mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource
+               IntermediateResultPartition unknownPartition = 
mockPartition(unknownProducer);
+               ResultPartitionID unknownPartitionId = new 
ResultPartitionID(unknownPartition.getPartitionId(), 
unknownProducer.getCurrentExecutionAttempt().getAttemptId());
+               ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, 
consumer, 2);
+
+               // This should work if lazy deployment is allowed
+               boolean allowLazyDeployment = true;
+
+               InputChannelDeploymentDescriptor[] desc = 
InputChannelDeploymentDescriptor.fromEdges(
+                       new ExecutionEdge[]{unknownEdge},
+                       consumerSlot,
+                       allowLazyDeployment);
+
+               assertEquals(1, desc.length);
+
+               assertEquals(unknownPartitionId, 
desc[0].getConsumedPartitionId());
+               assertTrue(desc[0].getConsumedPartitionLocation().isUnknown());
+               
assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+
+               try {
+                       // Fail if lazy deployment is *not* allowed
+                       allowLazyDeployment = false;
+
+                       InputChannelDeploymentDescriptor.fromEdges(
+                               new ExecutionEdge[]{unknownEdge},
+                               consumerSlot,
+                               allowLazyDeployment);
+
+                       fail("Did not throw expected IllegalStateException");
+               } catch (IllegalStateException ignored) {
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static InstanceConnectionInfo createConnInfo(int port) {
+               return new 
InstanceConnectionInfo(InetAddress.getLoopbackAddress(), port);
+       }
+
+       private static SimpleSlot mockSlot(InstanceConnectionInfo connInfo) {
+               SimpleSlot slot = mock(SimpleSlot.class);
+               Instance instance = mock(Instance.class);
+               when(slot.getInstance()).thenReturn(instance);
+               when(instance.getInstanceConnectionInfo()).thenReturn(connInfo);
+
+               return slot;
+       }
+
+       private static ExecutionVertex mockExecutionVertex(ExecutionState 
state, SimpleSlot slot) {
+               ExecutionVertex vertex = mock(ExecutionVertex.class);
+
+               Execution exec = mock(Execution.class);
+               when(exec.getState()).thenReturn(state);
+               when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
+
+               when(exec.getAssignedResource()).thenReturn(slot);
+               when(vertex.getCurrentAssignedResource()).thenReturn(slot);
+
+               when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+               return vertex;
+       }
+
+       private static IntermediateResultPartition 
mockPartition(ExecutionVertex producer) {
+               IntermediateResultPartition partition = 
mock(IntermediateResultPartition.class);
+               when(partition.isConsumable()).thenReturn(true);
+
+               IntermediateResult result = mock(IntermediateResult.class);
+               when(result.getConnectionIndex()).thenReturn(0);
+
+               when(partition.getIntermediateResult()).thenReturn(result);
+               when(partition.getPartitionId()).thenReturn(new 
IntermediateResultPartitionID());
+
+               when(partition.getProducer()).thenReturn(producer);
+
+               return partition;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 1986eae..4223b49 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ResultPartitionDeploymentDescriptorTest {
 
@@ -44,8 +45,8 @@ public class ResultPartitionDeploymentDescriptorTest {
                                                resultId,
                                                partitionId,
                                                partitionType,
-                                               numberOfSubpartitions
-                               );
+                                               numberOfSubpartitions,
+                                               true);
 
                ResultPartitionDeploymentDescriptor copy =
                                CommonTestUtils.createCopySerializable(orig);
@@ -54,5 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
                assertEquals(partitionId, copy.getPartitionId());
                assertEquals(partitionType, copy.getPartitionType());
                assertEquals(numberOfSubpartitions, 
copy.getNumberOfSubpartitions());
+               assertTrue(copy.allowLazyScheduling());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 81ec6c9..1f5c915 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -18,19 +18,37 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-
-import static org.junit.Assert.*;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
 import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ERROR_MESSAGE;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleFailingActorGateway;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ExecutionVertexDeploymentTest {
 
@@ -331,4 +349,34 @@ public class ExecutionVertexDeploymentTest {
                        fail(e.getMessage());
                }
        }
-}
\ No newline at end of file
+
+       /**
+        * Tests that the lazy scheduling flag is correctly forwarded to the 
produced partition descriptors.
+        */
+       @Test
+       public void testTddProducedPartitionsLazyScheduling() throws Exception {
+               TestingUtils.QueuedActionExecutionContext context = 
TestingUtils.queuedActionExecutionContext();
+               ExecutionJobVertex jobVertex = getExecutionVertex(new 
JobVertexID(), context);
+               IntermediateResult result = new IntermediateResult(new 
IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED);
+               ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new 
IntermediateResult[]{result}, new FiniteDuration(1, TimeUnit.MINUTES));
+
+               Slot root = mock(Slot.class);
+               when(root.getSlotNumber()).thenReturn(1);
+               SimpleSlot slot = mock(SimpleSlot.class);
+               when(slot.getRoot()).thenReturn(root);
+
+               for (ScheduleMode mode : ScheduleMode.values()) {
+                       vertex.getExecutionGraph().setScheduleMode(mode);
+
+                       TaskDeploymentDescriptor tdd = 
vertex.createDeploymentDescriptor(new ExecutionAttemptID(), slot, null, null, 
1);
+
+                       List<ResultPartitionDeploymentDescriptor> 
producedPartitions = tdd.getProducedPartitions();
+
+                       assertEquals(1, producedPartitions.size());
+                       ResultPartitionDeploymentDescriptor desc = 
producedPartitions.get(0);
+                       assertEquals(mode.allowLazyDeployment(), 
desc.allowLazyScheduling());
+               }
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
new file mode 100644
index 0000000..302b667
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class ResultPartitionTest {
+
+       /**
+        * Tests the schedule or update consumers message sending behaviour 
depending on the relevant flags.
+        */
+       @Test
+       public void testSendScheduleOrUpdateConsumersMessage() throws Exception 
{
+               {
+                       // Pipelined, send message => notify
+                       ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+                       ResultPartition partition = createPartition(notifier, 
ResultPartitionType.PIPELINED, true);
+                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       verify(notifier, 
times(1)).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class));
+               }
+
+               {
+                       // Pipelined, don't send message => don't notify
+                       ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+                       ResultPartition partition = createPartition(notifier, 
ResultPartitionType.PIPELINED, false);
+                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class));
+               }
+
+               {
+                       // Blocking, send message => don't notify
+                       ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+                       ResultPartition partition = createPartition(notifier, 
ResultPartitionType.BLOCKING, true);
+                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class));
+               }
+
+               {
+                       // Blocking, don't send message => don't notify
+                       ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+                       ResultPartition partition = createPartition(notifier, 
ResultPartitionType.BLOCKING, false);
+                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static ResultPartition createPartition(
+               ResultPartitionConsumableNotifier notifier,
+               ResultPartitionType type,
+               boolean sendScheduleOrUpdateConsumersMessage) {
+               return new ResultPartition(
+                       "TestTask",
+                       new JobID(),
+                       new ResultPartitionID(),
+                       type,
+                       1,
+                       mock(ResultPartitionManager.class),
+                       notifier,
+                       mock(IOManager.class),
+                       IOManager.IOMode.SYNC,
+                       sendScheduleOrUpdateConsumersMessage);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 88a3ff5..ee28b5a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -119,11 +119,12 @@ public class LocalInputChannelTest {
                                        jobId,
                                        partitionIds[i],
                                        ResultPartitionType.PIPELINED,
-                               parallelism,
+                                       parallelism,
                                        partitionManager,
                                        partitionConsumableNotifier,
                                        ioManager,
-                                       ASYNC);
+                                       ASYNC,
+                                       true);
 
                        // Create a buffer pool for this partition
                        partition.registerBufferPool(

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
new file mode 100644
index 0000000..aa5d12c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ScheduleModeTest {
+
+       /**
+        * Test that schedule modes set the lazy deployment flag correctly.
+        */
+       @Test
+       public void testAllowLazyDeployment() throws Exception {
+               assertTrue(ScheduleMode.FROM_SOURCES.allowLazyDeployment());
+               assertTrue(ScheduleMode.BACKTRACKING.allowLazyDeployment());
+               assertFalse(ScheduleMode.ALL.allowLazyDeployment());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 779a17d..431cbb8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -630,7 +630,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, true));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -775,7 +775,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, true));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -1430,9 +1430,8 @@ public class TaskManagerTest extends TestLogger {
                                new IntermediateDataSetID(),
                                new IntermediateResultPartitionID(),
                                ResultPartitionType.PIPELINED,
-                               1
-                               // don't deploy eagerly but with the first 
completed memory buffer
-                       );
+                               1,
+                               true);
 
                        final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
                                "TestTask", 0, 1, 0, new Configuration(), new 
Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 403836c..b5056ed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -542,6 +542,9 @@ public class TaskTest extends TestLogger {
                }
 
                expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
+               expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
+               expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
+               expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
 
                expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
                expected.put(ExecutionState.CANCELING, 
ExecutionState.CANCELING);
@@ -557,7 +560,7 @@ public class TaskTest extends TestLogger {
                        assertEquals(expected.get(state), newTaskState);
                }
 
-               verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
+               verify(inputGate, 
times(4)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
        }
 
        /**

Reply via email to