Revert "[FLINK-3232] [runtime] Add option to eagerly deploy channels"

The reverted commit did not really fix anything, but hid the problem by
brute force, sending many more schedule or update consumers messages.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5a4cb6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5a4cb6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5a4cb6c

Branch: refs/heads/release-1.1
Commit: b5a4cb6cc9ba7099045f145a2fe3c58567253b4f
Parents: 9a19ca1
Author: Ufuk Celebi <[email protected]>
Authored: Thu Nov 10 14:01:22 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Nov 10 21:53:30 2016 +0100

----------------------------------------------------------------------
 .../ResultPartitionDeploymentDescriptor.java    | 32 ++-----
 .../executiongraph/ExecutionJobVertex.java      |  3 +-
 .../executiongraph/IntermediateResult.java      | 17 +---
 .../runtime/io/network/NetworkEnvironment.java  | 13 ---
 .../io/network/partition/ResultPartition.java   | 40 ++------
 .../runtime/jobgraph/IntermediateDataSet.java   | 23 -----
 .../flink/runtime/jobgraph/JobVertex.java       | 12 +--
 .../apache/flink/runtime/taskmanager/Task.java  |  1 -
 .../NetworkEnvironmentConfiguration.scala       |  1 +
 ...ResultPartitionDeploymentDescriptorTest.java |  6 +-
 .../ExecutionGraphDeploymentTest.java           |  2 +-
 .../io/network/NetworkEnvironmentTest.java      | 98 --------------------
 .../consumer/LocalInputChannelTest.java         |  3 +-
 .../runtime/jobgraph/JobTaskVertexTest.java     | 40 ++------
 .../runtime/taskmanager/TaskManagerTest.java    |  8 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  9 +-
 16 files changed, 41 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index e72d468..ecdacbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -48,20 +48,11 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
        /** The number of subpartitions. */
        private final int numberOfSubpartitions;
 
-       /**
-        * Flag indicating whether to eagerly deploy consumers.
-        *
-        * <p>If <code>true</code>, the consumers are deployed as soon as the
-        * runtime result is registered at the result manager of the task 
manager.
-        */
-       private final boolean eagerlyDeployConsumers;
-
        public ResultPartitionDeploymentDescriptor(
-                       IntermediateDataSetID resultId,
-                       IntermediateResultPartitionID partitionId,
-                       ResultPartitionType partitionType,
-                       int numberOfSubpartitions,
-                       boolean eagerlyDeployConsumers) {
+               IntermediateDataSetID resultId,
+               IntermediateResultPartitionID partitionId,
+               ResultPartitionType partitionType,
+               int numberOfSubpartitions) {
 
                this.resultId = checkNotNull(resultId);
                this.partitionId = checkNotNull(partitionId);
@@ -69,7 +60,6 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
                checkArgument(numberOfSubpartitions >= 1);
                this.numberOfSubpartitions = numberOfSubpartitions;
-               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
        }
 
        public IntermediateDataSetID getResultId() {
@@ -88,16 +78,6 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return numberOfSubpartitions;
        }
 
-       /**
-        * Returns whether consumers should be deployed eagerly (as soon as they
-        * are registered at the result manager of the task manager).
-        *
-        * @return Whether consumers should be deployed eagerly
-        */
-       public boolean getEagerlyDeployConsumers() {
-               return eagerlyDeployConsumers;
-       }
-
        @Override
        public String toString() {
                return String.format("ResultPartitionDeploymentDescriptor 
[result id: %s, "
@@ -129,7 +109,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                }
 
                return new ResultPartitionDeploymentDescriptor(
-                               resultId, partitionId, partitionType, 
numberOfSubpartitions,
-                               
partition.getIntermediateResult().getEagerlyDeployConsumers());
+                               resultId, partitionId, partitionType, 
numberOfSubpartitions
+               );
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 9e175f1..65259ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -147,8 +147,7 @@ public class ExecutionJobVertex implements Serializable {
                                        result.getId(),
                                        this,
                                        numTaskVertices,
-                                       result.getResultType(),
-                                       result.getEagerlyDeployConsumers());
+                                       result.getResultType());
                }
 
                // create all task vertices

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 9d57014..67b1fe0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -46,14 +46,11 @@ public class IntermediateResult {
 
        private final ResultPartitionType resultType;
 
-       private final boolean eagerlyDeployConsumers;
-
        public IntermediateResult(
-                       IntermediateDataSetID id,
-                       ExecutionJobVertex producer,
-                       int numParallelProducers,
-                       ResultPartitionType resultType,
-                       boolean eagerlyDeployConsumers) {
+               IntermediateDataSetID id,
+               ExecutionJobVertex producer,
+               int numParallelProducers,
+               ResultPartitionType resultType) {
 
                this.id = checkNotNull(id);
                this.producer = checkNotNull(producer);
@@ -71,8 +68,6 @@ public class IntermediateResult {
 
                // The runtime type for this produced result
                this.resultType = checkNotNull(resultType);
-
-               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
        }
 
        public void setPartition(int partitionNumber, 
IntermediateResultPartition partition) {
@@ -108,10 +103,6 @@ public class IntermediateResult {
                return resultType;
        }
 
-       public boolean getEagerlyDeployConsumers() {
-               return eagerlyDeployConsumers;
-       }
-
        public int registerConsumer() {
                final int index = numConsumers;
                numConsumers++;

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 30d2e38..11661cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -277,8 +277,6 @@ public class NetworkEnvironment {
                        throw new IllegalStateException("Unequal number of 
writers and partitions.");
                }
 
-               ResultPartitionConsumableNotifier jobManagerNotifier;
-
                synchronized (lock) {
                        if (isShutdown) {
                                throw new 
IllegalStateException("NetworkEnvironment is shut down");
@@ -340,17 +338,6 @@ public class NetworkEnvironment {
                                        }
                                }
                        }
-
-                       // Copy the reference to prevent races with concurrent 
shut downs
-                       jobManagerNotifier = partitionConsumableNotifier;
-               }
-
-               for (ResultPartition partition : producedPartitions) {
-                       // Eagerly notify consumers if required.
-                       if (partition.getEagerlyDeployConsumers()) {
-                               jobManagerNotifier.notifyPartitionConsumable(
-                                               partition.getJobId(), 
partition.getPartitionId());
-                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 7c109f3..a60f95d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -28,7 +29,6 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,14 +86,6 @@ public class ResultPartition implements BufferPoolOwner {
        /** Type of this partition. Defines the concrete subpartition 
implementation to use. */
        private final ResultPartitionType partitionType;
 
-       /**
-        * Flag indicating whether to eagerly deploy consumers.
-        *
-        * <p>If <code>true</code>, the consumers are deployed as soon as the
-        * runtime result is registered at the result manager of the task 
manager.
-        */
-       private final boolean eagerlyDeployConsumers;
-
        /** The subpartitions of this partition. At least one. */
        private final ResultSubpartition[] subpartitions;
 
@@ -129,22 +121,20 @@ public class ResultPartition implements BufferPoolOwner {
        private long totalNumberOfBytes;
 
        public ResultPartition(
-                       String owningTaskName,
-                       JobID jobId,
-                       ResultPartitionID partitionId,
-                       ResultPartitionType partitionType,
-                       boolean eagerlyDeployConsumers,
-                       int numberOfSubpartitions,
-                       ResultPartitionManager partitionManager,
-                       ResultPartitionConsumableNotifier 
partitionConsumableNotifier,
-                       IOManager ioManager,
-                       IOMode defaultIoMode) {
+               String owningTaskName,
+               JobID jobId,
+               ResultPartitionID partitionId,
+               ResultPartitionType partitionType,
+               int numberOfSubpartitions,
+               ResultPartitionManager partitionManager,
+               ResultPartitionConsumableNotifier partitionConsumableNotifier,
+               IOManager ioManager,
+               IOMode defaultIoMode) {
 
                this.owningTaskName = checkNotNull(owningTaskName);
                this.jobId = checkNotNull(jobId);
                this.partitionId = checkNotNull(partitionId);
                this.partitionType = checkNotNull(partitionType);
-               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
                this.subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
                this.partitionManager = checkNotNull(partitionManager);
                this.partitionConsumableNotifier = 
checkNotNull(partitionConsumableNotifier);
@@ -211,16 +201,6 @@ public class ResultPartition implements BufferPoolOwner {
                return subpartitions.length;
        }
 
-       /**
-        * Returns whether consumers should be deployed eagerly (as soon as they
-        * are registered at the result manager of the task manager).
-        *
-        * @return Whether consumers should be deployed eagerly
-        */
-       public boolean getEagerlyDeployConsumers() {
-               return eagerlyDeployConsumers;
-       }
-
        public BufferProvider getBufferProvider() {
                return bufferPool;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index c30c78e..fdc5d1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -88,29 +88,6 @@ public class IntermediateDataSet implements 
java.io.Serializable {
                return resultType;
        }
 
-       /**
-        * Sets the flag indicating whether to eagerly deploy consumers 
(default:
-        * <code>false</code>).
-        *
-        * @param eagerlyDeployConsumers If <code>true</code>, the consumers are
-        *                               deployed as soon as the runtime result 
is
-        *                               registered at the result manager of the
-        *                               task manager. Default is 
<code>false</code>.
-        */
-       public void setEagerlyDeployConsumers(boolean eagerlyDeployConsumers) {
-               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
-       }
-
-       /**
-        * Returns whether consumers should be deployed eagerly (as soon as they
-        * are registered at the result manager of the task manager).
-        *
-        * @return Whether consumers should be deployed eagerly
-        */
-       public boolean getEagerlyDeployConsumers() {
-               return eagerlyDeployConsumers;
-       }
-       
        // 
--------------------------------------------------------------------------------------------
        
        public void addConsumer(JobEdge edge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 379a42a..47dcb36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -357,7 +357,7 @@ public class JobVertex implements java.io.Serializable {
        }
 
        public JobEdge connectNewDataSetAsInput(JobVertex input, 
DistributionPattern distPattern) {
-               return connectNewDataSetAsInput(input, distPattern, 
ResultPartitionType.PIPELINED, false);
+               return connectNewDataSetAsInput(input, distPattern, 
ResultPartitionType.PIPELINED);
        }
 
        public JobEdge connectNewDataSetAsInput(
@@ -365,17 +365,7 @@ public class JobVertex implements java.io.Serializable {
                        DistributionPattern distPattern,
                        ResultPartitionType partitionType) {
 
-               return connectNewDataSetAsInput(input, distPattern, 
partitionType, false);
-       }
-
-       public JobEdge connectNewDataSetAsInput(
-                       JobVertex input,
-                       DistributionPattern distPattern,
-                       ResultPartitionType partitionType,
-                       boolean eagerlyDeployConsumers) {
-
                IntermediateDataSet dataSet = 
input.createAndAddResultDataSet(partitionType);
-               dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers);
 
                JobEdge edge = new JobEdge(dataSet, this, distPattern);
                this.inputs.add(edge);

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 00046b7..dd14aaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -332,7 +332,6 @@ public class Task implements Runnable {
                                        jobId,
                                        partitionId,
                                        desc.getPartitionType(),
-                                       desc.getEagerlyDeployConsumers(),
                                        desc.getNumberOfSubpartitions(),
                                        
networkEnvironment.getPartitionManager(),
                                        
networkEnvironment.getPartitionConsumableNotifier(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 065211c..619da96 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager
 
 import org.apache.flink.core.memory.MemoryType
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
+
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index d2fcc7b..1986eae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -38,15 +38,14 @@ public class ResultPartitionDeploymentDescriptorTest {
                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
                ResultPartitionType partitionType = 
ResultPartitionType.PIPELINED;
                int numberOfSubpartitions = 24;
-               boolean eagerlyDeployConsumers = true;
 
                ResultPartitionDeploymentDescriptor orig =
                                new ResultPartitionDeploymentDescriptor(
                                                resultId,
                                                partitionId,
                                                partitionType,
-                                               numberOfSubpartitions,
-                                               eagerlyDeployConsumers);
+                                               numberOfSubpartitions
+                               );
 
                ResultPartitionDeploymentDescriptor copy =
                                CommonTestUtils.createCopySerializable(orig);
@@ -55,6 +54,5 @@ public class ResultPartitionDeploymentDescriptorTest {
                assertEquals(partitionId, copy.getPartitionId());
                assertEquals(partitionType, copy.getPartitionType());
                assertEquals(numberOfSubpartitions, 
copy.getNumberOfSubpartitions());
-               assertEquals(eagerlyDeployConsumers, 
copy.getEagerlyDeployConsumers());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 332c8cd..cfbde6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -312,7 +312,7 @@ public class ExecutionGraphDeploymentTest {
                v1.setInvokableClass(BatchTask.class);
                v2.setInvokableClass(BatchTask.class);
 
-               v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING, false);
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
 
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index fca3ceb..a659be3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -18,29 +18,19 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
 import org.junit.Test;
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
 
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
@@ -51,13 +41,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 public class NetworkEnvironmentTest {
 
@@ -151,85 +134,4 @@ public class NetworkEnvironmentTest {
                        fail(e.getMessage());
                }
        }
-
-
-       /**
-        * Registers a task with an eager and non-eager partition at the network
-        * environment and verifies that there is exactly on schedule or update
-        * message to the job manager for the eager partition.
-        */
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testEagerlyDeployConsumers() throws Exception {
-               // Mock job manager => expected interactions will be verified
-               ActorGateway jobManager = mock(ActorGateway.class);
-               when(jobManager.ask(anyObject(), any(FiniteDuration.class)))
-                               .thenReturn(new 
Promise.DefaultPromise<>().future());
-
-               // Network environment setup
-               NetworkEnvironmentConfiguration config = new 
NetworkEnvironmentConfiguration(
-                               20,
-                               1024,
-                               MemoryType.HEAP,
-                               IOManager.IOMode.SYNC,
-                               Some.<NettyConfig>empty(),
-                               new Tuple2<>(0, 0));
-
-               NetworkEnvironment env = new NetworkEnvironment(
-                               TestingUtils.defaultExecutionContext(),
-                               new FiniteDuration(30, TimeUnit.SECONDS),
-                               config);
-
-               // Associate the environment with the mock actors
-               env.associateWithTaskManagerAndJobManager(
-                               jobManager,
-                               DummyActorGateway.INSTANCE);
-
-               // Register mock task
-               JobID jobId = new JobID();
-
-               ResultPartition[] partitions = new ResultPartition[2];
-               partitions[0] = createPartition("p1", jobId, true, env);
-               partitions[1] = createPartition("p2", jobId, false, env);
-
-               ResultPartitionWriter[] writers = new ResultPartitionWriter[2];
-               writers[0] = new ResultPartitionWriter(partitions[0]);
-               writers[1] = new ResultPartitionWriter(partitions[1]);
-
-               Task mockTask = mock(Task.class);
-               when(mockTask.getAllInputGates()).thenReturn(new 
SingleInputGate[0]);
-               when(mockTask.getAllWriters()).thenReturn(writers);
-               when(mockTask.getProducedPartitions()).thenReturn(partitions);
-
-               env.registerTask(mockTask);
-
-               // Verify
-               ResultPartitionID eagerPartitionId = 
partitions[0].getPartitionId();
-
-               verify(jobManager, times(1)).ask(
-                               eq(new ScheduleOrUpdateConsumers(jobId, 
eagerPartitionId)),
-                               any(FiniteDuration.class));
-       }
-
-       /**
-        * Helper to create a mock result partition.
-        */
-       private static ResultPartition createPartition(
-                       String name,
-                       JobID jobId,
-                       boolean eagerlyDeployConsumers,
-                       NetworkEnvironment env) {
-
-               return new ResultPartition(
-                               name,
-                               jobId,
-                               new ResultPartitionID(),
-                               ResultPartitionType.PIPELINED,
-                               eagerlyDeployConsumers,
-                               1,
-                               env.getPartitionManager(),
-                               env.getPartitionConsumableNotifier(),
-                               mock(IOManager.class),
-                               env.getDefaultIOMode());
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index f91a4ba..88a3ff5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -119,8 +119,7 @@ public class LocalInputChannelTest {
                                        jobId,
                                        partitionIds[i],
                                        ResultPartitionType.PIPELINED,
-                                       false,
-                                       parallelism,
+                               parallelism,
                                        partitionManager,
                                        partitionConsumableNotifier,
                                        ioManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index c3ba909..4f2d807 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import java.io.IOException;
-
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.InitializeOnMaster;
@@ -29,11 +27,16 @@ import 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class JobTaskVertexTest {
@@ -131,35 +134,6 @@ public class JobTaskVertexTest {
                }
        }
 
-       /**
-        * Verifies correct setting of eager deploy settings.
-        */
-       @Test
-       public void testEagerlyDeployConsumers() throws Exception {
-               JobVertex producer = new JobVertex("producer");
-
-               {
-                       JobVertex consumer = new JobVertex("consumer");
-                       JobEdge edge = consumer.connectNewDataSetAsInput(
-                                       producer, 
DistributionPattern.ALL_TO_ALL);
-                       
assertFalse(edge.getSource().getEagerlyDeployConsumers());
-               }
-
-               {
-                       JobVertex consumer = new JobVertex("consumer");
-                       JobEdge edge = consumer.connectNewDataSetAsInput(
-                                       producer, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-                       
assertFalse(edge.getSource().getEagerlyDeployConsumers());
-               }
-
-               {
-                       JobVertex consumer = new JobVertex("consumer");
-                       JobEdge edge = consumer.connectNewDataSetAsInput(
-                                       producer, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true);
-                       
assertTrue(edge.getSource().getEagerlyDeployConsumers());
-               }
-       }
-
        // 
--------------------------------------------------------------------------------------------
        
        private static final class TestingOutputFormat extends 
DiscardingOutputFormat<Object> implements InitializeOnMaster {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 53117d0..779a17d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -630,7 +630,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, false));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -775,7 +775,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, false));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -1430,8 +1430,8 @@ public class TaskManagerTest extends TestLogger {
                                new IntermediateDataSetID(),
                                new IntermediateResultPartitionID(),
                                ResultPartitionType.PIPELINED,
-                               1,
-                               false // don't deploy eagerly but with the 
first completed memory buffer
+                               1
+                               // don't deploy eagerly but with the first 
completed memory buffer
                        );
 
                        final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a63b089..20f5981 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -371,20 +371,17 @@ public class StreamingJobGraphGenerator {
                        downStreamVertex.connectNewDataSetAsInput(
                                headVertex,
                                DistributionPattern.POINTWISE,
-                               ResultPartitionType.PIPELINED,
-                               true);
+                               ResultPartitionType.PIPELINED);
                } else if (partitioner instanceof RescalePartitioner){
                        downStreamVertex.connectNewDataSetAsInput(
                                headVertex,
                                DistributionPattern.POINTWISE,
-                               ResultPartitionType.PIPELINED,
-                               true);
+                               ResultPartitionType.PIPELINED);
                } else {
                        downStreamVertex.connectNewDataSetAsInput(
                                        headVertex,
                                        DistributionPattern.ALL_TO_ALL,
-                                       ResultPartitionType.PIPELINED,
-                                       true);
+                                       ResultPartitionType.PIPELINED);
                }
 
                if (LOG.isDebugEnabled()) {

Reply via email to