Repository: flink
Updated Branches:
  refs/heads/master a65cd8db1 -> 2c556f74e


[FLINK-3232] [runtime] Add option to eagerly deploy channels

Adds a flag to the ExecutionGraph's IntermediateResult class indicating whether
the result consumers should be deployed eagerly. If true, the consumers are
deployed as soon as the partition is registered at the ResultPartitionManager of
the task manager. In practice, the deployment boils down to updating unknown
input channels of the consumers (because the actual tasks are actually deployed
all at once).

This behaviour is configured in the JobGraph generator and only activated for
streaming programs (StreamingJobGraphGenerator). It only makes sense for
pipelined results.

The motivation is to get down the latency of the first records passing a
pipeline. The initial update of the input channels causes a higher latency.

You can see this effect in the StreamingScalabilityAndLatency class (manual
test).

At the moment, this results in duplicate Akka messages when the first record
is produced (the message travels from the task to the job manager and from the
job manager to task manager, which then will be ignored at the InputGate).

This closes #1503


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0937be0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0937be0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0937be0a

Branch: refs/heads/master
Commit: 0937be0a94eae0b47f0a5f0206e62a98bcbc8432
Parents: 8b8dfc1
Author: Ufuk Celebi <u...@apache.org>
Authored: Wed Jan 13 18:45:35 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 15 11:44:20 2016 +0100

----------------------------------------------------------------------
 .../ResultPartitionDeploymentDescriptor.java    |  25 +++-
 .../executiongraph/ExecutionJobVertex.java      |   6 +-
 .../executiongraph/IntermediateResult.java      |  11 +-
 .../runtime/io/network/NetworkEnvironment.java  |  13 +++
 .../io/network/partition/ResultPartition.java   |  20 ++++
 .../runtime/jobgraph/IntermediateDataSet.java   |  31 +++++
 .../flink/runtime/jobgraph/JobVertex.java       |  13 ++-
 .../apache/flink/runtime/taskmanager/Task.java  |   1 +
 ...ResultPartitionDeploymentDescriptorTest.java |  60 ++++++++++
 .../io/network/NetworkEnvironmentTest.java      | 113 ++++++++++++++++++-
 .../consumer/LocalInputChannelTest.java         |   1 +
 .../runtime/jobgraph/JobTaskVertexTest.java     |  33 +++++-
 .../runtime/taskmanager/TaskManagerTest.java    |   4 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  13 ++-
 14 files changed, 327 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2b0bbc1..eadcd1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -48,11 +48,20 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
        /** The number of subpartitions. */
        private final int numberOfSubpartitions;
 
+       /**
+        * Flag indicating whether to eagerly deploy consumers.
+        *
+        * <p>If <code>true</code>, the consumers are deployed as soon as the
+        * runtime result is registered at the result manager of the task 
manager.
+        */
+       private final boolean eagerlyDeployConsumers;
+
        public ResultPartitionDeploymentDescriptor(
                        IntermediateDataSetID resultId,
                        IntermediateResultPartitionID partitionId,
                        ResultPartitionType partitionType,
-                       int numberOfSubpartitions) {
+                       int numberOfSubpartitions,
+                       boolean eagerlyDeployConsumers) {
 
                this.resultId = checkNotNull(resultId);
                this.partitionId = checkNotNull(partitionId);
@@ -60,6 +69,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
                checkArgument(numberOfSubpartitions >= 1);
                this.numberOfSubpartitions = numberOfSubpartitions;
+               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
        }
 
        public IntermediateDataSetID getResultId() {
@@ -78,6 +88,16 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return numberOfSubpartitions;
        }
 
+       /**
+        * Returns whether consumers should be deployed eagerly (as soon as they
+        * are registered at the result manager of the task manager).
+        *
+        * @return Whether consumers should be deployed eagerly
+        */
+       public boolean getEagerlyDeployConsumers() {
+               return eagerlyDeployConsumers;
+       }
+
        @Override
        public String toString() {
                return String.format("ResultPartitionDeploymentDescriptor 
[result id: %s, "
@@ -109,6 +129,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                }
 
                return new ResultPartitionDeploymentDescriptor(
-                               resultId, partitionId, partitionType, 
numberOfSubpartitions);
+                               resultId, partitionId, partitionType, 
numberOfSubpartitions,
+                               
partition.getIntermediateResult().getEagerlyDeployConsumers());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index aa17167..93ae7c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -128,7 +128,11 @@ public class ExecutionJobVertex implements Serializable {
                        final IntermediateDataSet result = 
jobVertex.getProducedDataSets().get(i);
 
                        this.producedDataSets[i] = new IntermediateResult(
-                                       result.getId(), this, numTaskVertices, 
result.getResultType());
+                                       result.getId(),
+                                       this,
+                                       numTaskVertices,
+                                       result.getResultType(),
+                                       result.getEagerlyDeployConsumers());
                }
 
                // create all task vertices

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 658a06b..59c70cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -46,11 +46,14 @@ public class IntermediateResult {
 
        private final ResultPartitionType resultType;
 
+       private final boolean eagerlyDeployConsumers;
+
        public IntermediateResult(
                        IntermediateDataSetID id,
                        ExecutionJobVertex producer,
                        int numParallelProducers,
-                       ResultPartitionType resultType) {
+                       ResultPartitionType resultType,
+                       boolean eagerlyDeployConsumers) {
 
                this.id = checkNotNull(id);
                this.producer = checkNotNull(producer);
@@ -68,6 +71,8 @@ public class IntermediateResult {
 
                // The runtime type for this produced result
                this.resultType = checkNotNull(resultType);
+
+               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
        }
 
        public void setPartition(int partitionNumber, 
IntermediateResultPartition partition) {
@@ -103,6 +108,10 @@ public class IntermediateResult {
                return resultType;
        }
 
+       public boolean getEagerlyDeployConsumers() {
+               return eagerlyDeployConsumers;
+       }
+
        public int registerConsumer() {
                final int index = numConsumers;
                numConsumers++;

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 6a919bc..10fcc63 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -277,6 +277,8 @@ public class NetworkEnvironment {
                        throw new IllegalStateException("Unequal number of 
writers and partitions.");
                }
 
+               ResultPartitionConsumableNotifier jobManagerNotifier;
+
                synchronized (lock) {
                        if (isShutdown) {
                                throw new 
IllegalStateException("NetworkEnvironment is shut down");
@@ -338,6 +340,17 @@ public class NetworkEnvironment {
                                        }
                                }
                        }
+
+                       // Copy the reference to prevent races with concurrent 
shut downs
+                       jobManagerNotifier = partitionConsumableNotifier;
+               }
+
+               for (ResultPartition partition : producedPartitions) {
+                       // Eagerly notify consumers if required.
+                       if (partition.getEagerlyDeployConsumers()) {
+                               jobManagerNotifier.notifyPartitionConsumable(
+                                               partition.getJobId(), 
partition.getPartitionId());
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 98902f2..189ee62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -86,6 +86,14 @@ public class ResultPartition implements BufferPoolOwner {
        /** Type of this partition. Defines the concrete subpartition 
implementation to use. */
        private final ResultPartitionType partitionType;
 
+       /**
+        * Flag indicating whether to eagerly deploy consumers.
+        *
+        * <p>If <code>true</code>, the consumers are deployed as soon as the
+        * runtime result is registered at the result manager of the task 
manager.
+        */
+       private final boolean eagerlyDeployConsumers;
+
        /** The subpartitions of this partition. At least one. */
        private final ResultSubpartition[] subpartitions;
 
@@ -125,6 +133,7 @@ public class ResultPartition implements BufferPoolOwner {
                        JobID jobId,
                        ResultPartitionID partitionId,
                        ResultPartitionType partitionType,
+                       boolean eagerlyDeployConsumers,
                        int numberOfSubpartitions,
                        ResultPartitionManager partitionManager,
                        ResultPartitionConsumableNotifier 
partitionConsumableNotifier,
@@ -135,6 +144,7 @@ public class ResultPartition implements BufferPoolOwner {
                this.jobId = checkNotNull(jobId);
                this.partitionId = checkNotNull(partitionId);
                this.partitionType = checkNotNull(partitionType);
+               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
                this.subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
                this.partitionManager = checkNotNull(partitionManager);
                this.partitionConsumableNotifier = 
checkNotNull(partitionConsumableNotifier);
@@ -201,6 +211,16 @@ public class ResultPartition implements BufferPoolOwner {
                return subpartitions.length;
        }
 
+       /**
+        * Returns whether consumers should be deployed eagerly (as soon as they
+        * are registered at the result manager of the task manager).
+        *
+        * @return Whether consumers should be deployed eagerly
+        */
+       public boolean getEagerlyDeployConsumers() {
+               return eagerlyDeployConsumers;
+       }
+
        public BufferProvider getBufferProvider() {
                return bufferPool;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index 7c8f32b..1a14ed5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -45,6 +45,14 @@ public class IntermediateDataSet implements 
java.io.Serializable {
 
        // The type of partition to use at runtime
        private final ResultPartitionType resultType;
+
+       /**
+        * Flag indicating whether to eagerly deploy consumers.
+        *
+        * <p>If <code>true</code>, the consumers are deployed as soon as the
+        * runtime result is registered at the result manager of the task 
manager.
+        */
+       private boolean eagerlyDeployConsumers;
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -79,6 +87,29 @@ public class IntermediateDataSet implements 
java.io.Serializable {
        public ResultPartitionType getResultType() {
                return resultType;
        }
+
+       /**
+        * Sets the flag indicating whether to eagerly deploy consumers 
(default:
+        * <code>false</code>).
+        *
+        * @param eagerlyDeployConsumers If <code>true</code>, the consumers are
+        *                               deployed as soon as the runtime result 
is
+        *                               registered at the result manager of the
+        *                               task manager. Default is 
<code>false</code>.
+        */
+       public void setEagerlyDeployConsumers(boolean eagerlyDeployConsumers) {
+               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
+       }
+
+       /**
+        * Returns whether consumers should be deployed eagerly (as soon as they
+        * are registered at the result manager of the task manager).
+        *
+        * @return Whether consumers should be deployed eagerly
+        */
+       public boolean getEagerlyDeployConsumers() {
+               return eagerlyDeployConsumers;
+       }
        
        // 
--------------------------------------------------------------------------------------------
        

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index f897f6c..8ebc30c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -352,7 +352,7 @@ public class JobVertex implements java.io.Serializable {
        }
 
        public JobEdge connectNewDataSetAsInput(JobVertex input, 
DistributionPattern distPattern) {
-               return connectNewDataSetAsInput(input, distPattern, 
ResultPartitionType.PIPELINED);
+               return connectNewDataSetAsInput(input, distPattern, 
ResultPartitionType.PIPELINED, false);
        }
 
        public JobEdge connectNewDataSetAsInput(
@@ -360,7 +360,18 @@ public class JobVertex implements java.io.Serializable {
                        DistributionPattern distPattern,
                        ResultPartitionType partitionType) {
 
+               return connectNewDataSetAsInput(input, distPattern, 
partitionType, false);
+       }
+
+       public JobEdge connectNewDataSetAsInput(
+                       JobVertex input,
+                       DistributionPattern distPattern,
+                       ResultPartitionType partitionType,
+                       boolean eagerlyDeployConsumers) {
+
                IntermediateDataSet dataSet = 
input.createAndAddResultDataSet(partitionType);
+               dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers);
+
                JobEdge edge = new JobEdge(dataSet, this, distPattern);
                this.inputs.add(edge);
                dataSet.addConsumer(edge);

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 5c8e180..974e687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -284,6 +284,7 @@ public class Task implements Runnable {
                                        jobId,
                                        partitionId,
                                        desc.getPartitionType(),
+                                       desc.getEagerlyDeployConsumers(),
                                        desc.getNumberOfSubpartitions(),
                                        
networkEnvironment.getPartitionManager(),
                                        
networkEnvironment.getPartitionConsumableNotifier(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
new file mode 100644
index 0000000..d2fcc7b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ResultPartitionDeploymentDescriptorTest {
+
+       /**
+        * Tests simple de/serialization.
+        */
+       @Test
+       public void testSerialization() throws Exception {
+               // Expected values
+               IntermediateDataSetID resultId = new IntermediateDataSetID();
+               IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
+               ResultPartitionType partitionType = 
ResultPartitionType.PIPELINED;
+               int numberOfSubpartitions = 24;
+               boolean eagerlyDeployConsumers = true;
+
+               ResultPartitionDeploymentDescriptor orig =
+                               new ResultPartitionDeploymentDescriptor(
+                                               resultId,
+                                               partitionId,
+                                               partitionType,
+                                               numberOfSubpartitions,
+                                               eagerlyDeployConsumers);
+
+               ResultPartitionDeploymentDescriptor copy =
+                               CommonTestUtils.createCopySerializable(orig);
+
+               assertEquals(resultId, copy.getResultId());
+               assertEquals(partitionId, copy.getPartitionId());
+               assertEquals(partitionType, copy.getPartitionType());
+               assertEquals(numberOfSubpartitions, 
copy.getNumberOfSubpartitions());
+               assertEquals(eagerlyDeployConsumers, 
copy.getEagerlyDeployConsumers());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index bcf8837..84f9f14 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -18,27 +18,47 @@
 
 package org.apache.flink.runtime.io.network;
 
-import static org.junit.Assert.*;
-
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
-
 import org.junit.Test;
-
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
 
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class NetworkEnvironmentTest {
 
        @Test
@@ -59,8 +79,8 @@ public class NetworkEnvironmentTest {
                        NettyConfig nettyConf = new 
NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, new Configuration());
                        NetworkEnvironmentConfiguration config = new 
NetworkEnvironmentConfiguration(
                                        NUM_BUFFERS, BUFFER_SIZE, 
MemoryType.HEAP,
-                                       IOManager.IOMode.SYNC, new 
Some<NettyConfig>(nettyConf),
-                                       new Tuple2<Integer, Integer>(0, 0));
+                                       IOManager.IOMode.SYNC, new 
Some<>(nettyConf),
+                                       new Tuple2<>(0, 0));
 
                        NetworkEnvironment env = new NetworkEnvironment(
                                TestingUtils.defaultExecutionContext(),
@@ -131,4 +151,85 @@ public class NetworkEnvironmentTest {
                        fail(e.getMessage());
                }
        }
+
+
+       /**
+        * Registers a task with an eager and non-eager partition at the network
+        * environment and verifies that there is exactly on schedule or update
+        * message to the job manager for the eager partition.
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testEagerlyDeployConsumers() throws Exception {
+               // Mock job manager => expected interactions will be verified
+               ActorGateway jobManager = mock(ActorGateway.class);
+               when(jobManager.ask(anyObject(), any(FiniteDuration.class)))
+                               .thenReturn(new 
Promise.DefaultPromise<>().future());
+
+               // Network environment setup
+               NetworkEnvironmentConfiguration config = new 
NetworkEnvironmentConfiguration(
+                               20,
+                               1024,
+                               MemoryType.HEAP,
+                               IOManager.IOMode.SYNC,
+                               Some.<NettyConfig>empty(),
+                               new Tuple2<>(0, 0));
+
+               NetworkEnvironment env = new NetworkEnvironment(
+                               TestingUtils.defaultExecutionContext(),
+                               new FiniteDuration(30, TimeUnit.SECONDS),
+                               config);
+
+               // Associate the environment with the mock actors
+               env.associateWithTaskManagerAndJobManager(
+                               jobManager,
+                               DummyActorGateway.INSTANCE);
+
+               // Register mock task
+               JobID jobId = new JobID();
+
+               ResultPartition[] partitions = new ResultPartition[2];
+               partitions[0] = createPartition("p1", jobId, true, env);
+               partitions[1] = createPartition("p2", jobId, false, env);
+
+               ResultPartitionWriter[] writers = new ResultPartitionWriter[2];
+               writers[0] = new ResultPartitionWriter(partitions[0]);
+               writers[1] = new ResultPartitionWriter(partitions[1]);
+
+               Task mockTask = mock(Task.class);
+               when(mockTask.getAllInputGates()).thenReturn(new 
SingleInputGate[0]);
+               when(mockTask.getAllWriters()).thenReturn(writers);
+               when(mockTask.getProducedPartitions()).thenReturn(partitions);
+
+               env.registerTask(mockTask);
+
+               // Verify
+               ResultPartitionID eagerPartitionId = 
partitions[0].getPartitionId();
+
+               verify(jobManager, times(1)).ask(
+                               eq(new ScheduleOrUpdateConsumers(jobId, 
eagerPartitionId)),
+                               any(FiniteDuration.class));
+       }
+
+       /**
+        * Helper to create a mock result partition.
+        */
+       private static ResultPartition createPartition(
+                       String name,
+                       JobID jobId,
+                       boolean eagerlyDeployConsumers,
+                       NetworkEnvironment env) {
+
+               return new ResultPartition(
+                               name,
+                               jobId,
+                               new ResultPartitionID(),
+                               ResultPartitionType.PIPELINED,
+                               eagerlyDeployConsumers,
+                               1,
+                               env.getPartitionManager(),
+                               env.getPartitionConsumableNotifier(),
+                               mock(IOManager.class),
+                               env.getDefaultIOMode());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index ea40a55..d4fcf16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -118,6 +118,7 @@ public class LocalInputChannelTest {
                                        jobId,
                                        partitionIds[i],
                                        ResultPartitionType.PIPELINED,
+                                       false,
                                        parallelism,
                                        partitionManager,
                                        partitionConsumableNotifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index 2511bd6..c3ba909 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -29,12 +29,12 @@ import 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
-
 @SuppressWarnings("serial")
 public class JobTaskVertexTest {
 
@@ -130,7 +130,36 @@ public class JobTaskVertexTest {
                        fail(e.getMessage());
                }
        }
-       
+
+       /**
+        * Verifies correct setting of eager deploy settings.
+        */
+       @Test
+       public void testEagerlyDeployConsumers() throws Exception {
+               JobVertex producer = new JobVertex("producer");
+
+               {
+                       JobVertex consumer = new JobVertex("consumer");
+                       JobEdge edge = consumer.connectNewDataSetAsInput(
+                                       producer, 
DistributionPattern.ALL_TO_ALL);
+                       
assertFalse(edge.getSource().getEagerlyDeployConsumers());
+               }
+
+               {
+                       JobVertex consumer = new JobVertex("consumer");
+                       JobEdge edge = consumer.connectNewDataSetAsInput(
+                                       producer, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+                       
assertFalse(edge.getSource().getEagerlyDeployConsumers());
+               }
+
+               {
+                       JobVertex consumer = new JobVertex("consumer");
+                       JobEdge edge = consumer.connectNewDataSetAsInput(
+                                       producer, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true);
+                       
assertTrue(edge.getSource().getEagerlyDeployConsumers());
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
        
        private static final class TestingOutputFormat extends 
DiscardingOutputFormat<Object> implements InitializeOnMaster {

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0381c27..dbd047e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -499,7 +499,7 @@ public class TaskManagerTest {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, false));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -643,7 +643,7 @@ public class TaskManagerTest {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, false));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/0937be0a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5a970fc..ad96cbf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -358,9 +359,17 @@ public class StreamingJobGraphGenerator {
 
                StreamPartitioner<?> partitioner = edge.getPartitioner();
                if (partitioner instanceof ForwardPartitioner) {
-                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.POINTWISE);
+                       downStreamVertex.connectNewDataSetAsInput(
+                                       headVertex,
+                                       DistributionPattern.POINTWISE,
+                                       ResultPartitionType.PIPELINED,
+                                       true);
                } else {
-                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.ALL_TO_ALL);
+                       downStreamVertex.connectNewDataSetAsInput(
+                                       headVertex,
+                                       DistributionPattern.ALL_TO_ALL,
+                                       ResultPartitionType.PIPELINED,
+                                       true);
                }
 
                if (LOG.isDebugEnabled()) {

Reply via email to