Repository: flink
Updated Branches:
  refs/heads/master 58204da13 -> 5d5637b01


Revert "[FLINK-3232] [runtime] Add option to eagerly deploy channels"

The reverted commit did not really fix anything, but hid the problem by
brute force, sending many more schedule or update consumers messages.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d2e8b29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d2e8b29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d2e8b29

Branch: refs/heads/master
Commit: 0d2e8b2964b58f5610772c6b5bf39a93b9b0fd95
Parents: 58204da
Author: Ufuk Celebi <[email protected]>
Authored: Wed Nov 9 16:07:22 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Nov 10 22:59:44 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java       |  17 ++-
 ...PartialInputChannelDeploymentDescriptor.java |   2 +-
 .../ResultPartitionDeploymentDescriptor.java    |  25 +---
 .../executiongraph/ExecutionJobVertex.java      |  10 +-
 .../executiongraph/IntermediateResult.java      |  11 +-
 .../runtime/io/network/NetworkEnvironment.java  |   3 -
 .../io/network/partition/ResultPartition.java   |  21 +--
 .../partition/ResultPartitionManager.java       |   2 -
 .../runtime/jobgraph/IntermediateDataSet.java   |  31 ----
 .../flink/runtime/jobgraph/JobVertex.java       |  12 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   1 -
 ...ResultPartitionDeploymentDescriptorTest.java |   4 +-
 .../ExecutionGraphDeploymentTest.java           |   2 +-
 .../io/network/NetworkEnvironmentTest.java      | 147 -------------------
 .../consumer/LocalInputChannelTest.java         |   3 -
 .../runtime/jobgraph/JobTaskVertexTest.java     |  36 +----
 .../runtime/taskmanager/TaskManagerTest.java    |  20 +--
 .../api/graph/StreamingJobGraphGenerator.java   |   9 +-
 18 files changed, 35 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 0912055..a72b92f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
@@ -88,7 +89,9 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
         * Creates an input channel deployment descriptor for each partition.
         */
        public static InputChannelDeploymentDescriptor[] fromEdges(
-                       ExecutionEdge[] edges, SimpleSlot consumerSlot) {
+                       ExecutionEdge[] edges,
+                       SimpleSlot consumerSlot,
+                       boolean allowLazyDeployment) throws 
ExecutionGraphException {
 
                final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerID();
                final InputChannelDeploymentDescriptor[] icdd = new 
InputChannelDeploymentDescriptor[edges.length];
@@ -105,9 +108,11 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
 
                        // The producing task needs to be RUNNING or already 
FINISHED
                        if (consumedPartition.isConsumable() && producerSlot != 
null &&
-                                       (producerState == ExecutionState.RUNNING
-                                                       || producerState == 
ExecutionState.FINISHED)) {
-
+                                       (producerState == 
ExecutionState.RUNNING ||
+                                               producerState == 
ExecutionState.FINISHED ||
+                                               producerState == 
ExecutionState.SCHEDULED ||
+                                               producerState == 
ExecutionState.DEPLOYING)) {
+                               
                                final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
                                final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
 
@@ -124,9 +129,11 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
                                        partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);
                                }
                        }
-                       else {
+                       else if (allowLazyDeployment) {
                                // The producing task might not have registered 
the partition yet
                                partitionLocation = 
ResultPartitionLocation.createUnknown();
+                       } else {
+                               throw new ExecutionGraphException("Trying to 
eagerly schedule a task whose inputs are not ready.");
                        }
 
                        final ResultPartitionID consumedPartitionId = new 
ResultPartitionID(

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
index 0eac39d..c925f75 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.deployment;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index e72d468..2881dde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -48,20 +48,11 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
        /** The number of subpartitions. */
        private final int numberOfSubpartitions;
 
-       /**
-        * Flag indicating whether to eagerly deploy consumers.
-        *
-        * <p>If <code>true</code>, the consumers are deployed as soon as the
-        * runtime result is registered at the result manager of the task 
manager.
-        */
-       private final boolean eagerlyDeployConsumers;
-
        public ResultPartitionDeploymentDescriptor(
                        IntermediateDataSetID resultId,
                        IntermediateResultPartitionID partitionId,
                        ResultPartitionType partitionType,
-                       int numberOfSubpartitions,
-                       boolean eagerlyDeployConsumers) {
+                       int numberOfSubpartitions) {
 
                this.resultId = checkNotNull(resultId);
                this.partitionId = checkNotNull(partitionId);
@@ -69,7 +60,6 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
                checkArgument(numberOfSubpartitions >= 1);
                this.numberOfSubpartitions = numberOfSubpartitions;
-               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
        }
 
        public IntermediateDataSetID getResultId() {
@@ -88,16 +78,6 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return numberOfSubpartitions;
        }
 
-       /**
-        * Returns whether consumers should be deployed eagerly (as soon as they
-        * are registered at the result manager of the task manager).
-        *
-        * @return Whether consumers should be deployed eagerly
-        */
-       public boolean getEagerlyDeployConsumers() {
-               return eagerlyDeployConsumers;
-       }
-
        @Override
        public String toString() {
                return String.format("ResultPartitionDeploymentDescriptor 
[result id: %s, "
@@ -129,7 +109,6 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                }
 
                return new ResultPartitionDeploymentDescriptor(
-                               resultId, partitionId, partitionType, 
numberOfSubpartitions,
-                               
partition.getIntermediateResult().getEagerlyDeployConsumers());
+                               resultId, partitionId, partitionType, 
numberOfSubpartitions);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 47cfde1..a62ed86 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
@@ -30,13 +32,11 @@ import 
org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
-
 import scala.Option;
 
 import java.io.IOException;
@@ -161,8 +160,7 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                                        result.getId(),
                                        this,
                                        numTaskVertices,
-                                       result.getResultType(),
-                                       result.getEagerlyDeployConsumers());
+                                       result.getResultType());
                }
 
                // create all task vertices

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 9d57014..c2c19d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -46,14 +46,11 @@ public class IntermediateResult {
 
        private final ResultPartitionType resultType;
 
-       private final boolean eagerlyDeployConsumers;
-
        public IntermediateResult(
                        IntermediateDataSetID id,
                        ExecutionJobVertex producer,
                        int numParallelProducers,
-                       ResultPartitionType resultType,
-                       boolean eagerlyDeployConsumers) {
+                       ResultPartitionType resultType) {
 
                this.id = checkNotNull(id);
                this.producer = checkNotNull(producer);
@@ -71,8 +68,6 @@ public class IntermediateResult {
 
                // The runtime type for this produced result
                this.resultType = checkNotNull(resultType);
-
-               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
        }
 
        public void setPartition(int partitionNumber, 
IntermediateResultPartition partition) {
@@ -108,10 +103,6 @@ public class IntermediateResult {
                return resultType;
        }
 
-       public boolean getEagerlyDeployConsumers() {
-               return eagerlyDeployConsumers;
-       }
-
        public int registerConsumer() {
                final int index = numConsumers;
                numConsumers++;

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index b221ec7..d0032d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -158,8 +157,6 @@ public class NetworkEnvironment {
                        throw new IllegalStateException("Unequal number of 
writers and partitions.");
                }
 
-               ResultPartitionConsumableNotifier jobManagerNotifier;
-
                synchronized (lock) {
                        if (isShutdown) {
                                throw new 
IllegalStateException("NetworkEnvironment is shut down");

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index f06cb43..034b27a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -28,7 +29,6 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
@@ -89,14 +89,6 @@ public class ResultPartition implements BufferPoolOwner {
        /** Type of this partition. Defines the concrete subpartition 
implementation to use. */
        private final ResultPartitionType partitionType;
 
-       /**
-        * Flag indicating whether to eagerly deploy consumers.
-        *
-        * <p>If <code>true</code>, the consumers are deployed as soon as the
-        * runtime result is registered at the result manager of the task 
manager.
-        */
-       private final boolean doEagerDeployment;
-
        /** The subpartitions of this partition. At least one. */
        private final ResultSubpartition[] subpartitions;
 
@@ -137,7 +129,6 @@ public class ResultPartition implements BufferPoolOwner {
                JobID jobId,
                ResultPartitionID partitionId,
                ResultPartitionType partitionType,
-               boolean doEagerDeployment,
                int numberOfSubpartitions,
                ResultPartitionManager partitionManager,
                ResultPartitionConsumableNotifier partitionConsumableNotifier,
@@ -149,7 +140,6 @@ public class ResultPartition implements BufferPoolOwner {
                this.jobId = checkNotNull(jobId);
                this.partitionId = checkNotNull(partitionId);
                this.partitionType = checkNotNull(partitionType);
-               this.doEagerDeployment = doEagerDeployment;
                this.subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
                this.partitionManager = checkNotNull(partitionManager);
                this.partitionConsumableNotifier = 
checkNotNull(partitionConsumableNotifier);
@@ -366,15 +356,6 @@ public class ResultPartition implements BufferPoolOwner {
        }
 
        /**
-        * Deploys consumers if eager deployment is activated
-        */
-       public void deployConsumers() {
-               if (doEagerDeployment) {
-                       
partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, 
taskActions);
-               }
-       }
-
-       /**
         * Releases buffers held by this result partition.
         *
         * <p> This is a callback from the buffer pool, which is registered for 
result partitions, which

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 6edae6f..9da3e14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -58,8 +58,6 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
                                throw new IllegalStateException("Result 
partition already registered.");
                        }
 
-                       partition.deployConsumers();
-
                        LOG.debug("Registered {}.", partition);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index c30c78e..2d9faa8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -45,14 +45,6 @@ public class IntermediateDataSet implements 
java.io.Serializable {
 
        // The type of partition to use at runtime
        private final ResultPartitionType resultType;
-
-       /**
-        * Flag indicating whether to eagerly deploy consumers.
-        *
-        * <p>If <code>true</code>, the consumers are deployed as soon as the
-        * runtime result is registered at the result manager of the task 
manager.
-        */
-       private boolean eagerlyDeployConsumers;
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -87,29 +79,6 @@ public class IntermediateDataSet implements 
java.io.Serializable {
        public ResultPartitionType getResultType() {
                return resultType;
        }
-
-       /**
-        * Sets the flag indicating whether to eagerly deploy consumers 
(default:
-        * <code>false</code>).
-        *
-        * @param eagerlyDeployConsumers If <code>true</code>, the consumers are
-        *                               deployed as soon as the runtime result 
is
-        *                               registered at the result manager of the
-        *                               task manager. Default is 
<code>false</code>.
-        */
-       public void setEagerlyDeployConsumers(boolean eagerlyDeployConsumers) {
-               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
-       }
-
-       /**
-        * Returns whether consumers should be deployed eagerly (as soon as they
-        * are registered at the result manager of the task manager).
-        *
-        * @return Whether consumers should be deployed eagerly
-        */
-       public boolean getEagerlyDeployConsumers() {
-               return eagerlyDeployConsumers;
-       }
        
        // 
--------------------------------------------------------------------------------------------
        

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 8ddc9f5..2bda9d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -382,7 +382,7 @@ public class JobVertex implements java.io.Serializable {
        }
 
        public JobEdge connectNewDataSetAsInput(JobVertex input, 
DistributionPattern distPattern) {
-               return connectNewDataSetAsInput(input, distPattern, 
ResultPartitionType.PIPELINED, false);
+               return connectNewDataSetAsInput(input, distPattern, 
ResultPartitionType.PIPELINED);
        }
 
        public JobEdge connectNewDataSetAsInput(
@@ -390,17 +390,7 @@ public class JobVertex implements java.io.Serializable {
                        DistributionPattern distPattern,
                        ResultPartitionType partitionType) {
 
-               return connectNewDataSetAsInput(input, distPattern, 
partitionType, false);
-       }
-
-       public JobEdge connectNewDataSetAsInput(
-                       JobVertex input,
-                       DistributionPattern distPattern,
-                       ResultPartitionType partitionType,
-                       boolean eagerlyDeployConsumers) {
-
                IntermediateDataSet dataSet = 
input.createAndAddResultDataSet(partitionType);
-               dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers);
 
                JobEdge edge = new JobEdge(dataSet, this, distPattern);
                this.inputs.add(edge);

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 827451e..6907606 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -341,7 +341,6 @@ public class Task implements Runnable, TaskActions {
                                jobId,
                                partitionId,
                                desc.getPartitionType(),
-                               desc.getEagerlyDeployConsumers(),
                                desc.getNumberOfSubpartitions(),
                                networkEnvironment.getResultPartitionManager(),
                                resultPartitionConsumableNotifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index d2fcc7b..4b1e546 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -45,8 +45,7 @@ public class ResultPartitionDeploymentDescriptorTest {
                                                resultId,
                                                partitionId,
                                                partitionType,
-                                               numberOfSubpartitions,
-                                               eagerlyDeployConsumers);
+                                               numberOfSubpartitions);
 
                ResultPartitionDeploymentDescriptor copy =
                                CommonTestUtils.createCopySerializable(orig);
@@ -55,6 +54,5 @@ public class ResultPartitionDeploymentDescriptorTest {
                assertEquals(partitionId, copy.getPartitionId());
                assertEquals(partitionType, copy.getPartitionType());
                assertEquals(numberOfSubpartitions, 
copy.getNumberOfSubpartitions());
-               assertEquals(eagerlyDeployConsumers, 
copy.getEagerlyDeployConsumers());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 63da1ab..d4acd8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -308,7 +308,7 @@ public class ExecutionGraphDeploymentTest {
                v1.setInvokableClass(BatchTask.class);
                v2.setInvokableClass(BatchTask.class);
 
-               v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING, false);
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
 
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
deleted file mode 100644
index 13da18e..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import 
org.apache.flink.runtime.taskmanager.ActorGatewayResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
-import scala.Some;
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class NetworkEnvironmentTest {
-       /**
-        * Registers a task with an eager and non-eager partition at the network
-        * environment and verifies that there is exactly on schedule or update
-        * message to the job manager for the eager partition.
-        */
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testEagerlyDeployConsumers() throws Exception {
-               // Mock job manager => expected interactions will be verified
-               final ActorGateway jobManager = mock(ActorGateway.class);
-               when(jobManager.ask(anyObject(), any(FiniteDuration.class)))
-                               .thenReturn(new 
Promise.DefaultPromise<>().future());
-
-               // Network environment setup
-               NetworkEnvironmentConfiguration config = new 
NetworkEnvironmentConfiguration(
-                       20,
-                       1024,
-                       MemoryType.HEAP,
-                       IOManager.IOMode.SYNC,
-                       Some.<NettyConfig>empty(),
-                       0,
-                       0);
-
-               NetworkEnvironment env = new NetworkEnvironment(
-                       new NetworkBufferPool(config.numNetworkBuffers(), 
config.networkBufferSize(), config.memoryType()),
-                       new LocalConnectionManager(),
-                       new ResultPartitionManager(),
-                       new TaskEventDispatcher(),
-                       new KvStateRegistry(),
-                       null,
-                       config.ioMode(),
-                       config.partitionRequestInitialBackoff(),
-                       config.partitinRequestMaxBackoff());
-
-               env.start();
-
-               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = new 
ActorGatewayResultPartitionConsumableNotifier(
-                       TestingUtils.defaultExecutionContext(),
-                       jobManager,
-                       new FiniteDuration(30L, TimeUnit.SECONDS));
-
-               // Register mock task
-               JobID jobId = new JobID();
-               Task mockTask = mock(Task.class);
-
-               ResultPartition[] partitions = new ResultPartition[2];
-               partitions[0] = createPartition(mockTask, "p1", jobId, true, 
env, resultPartitionConsumableNotifier);
-               partitions[1] = createPartition(mockTask, "p2", jobId, false, 
env, resultPartitionConsumableNotifier);
-
-               ResultPartitionWriter[] writers = new ResultPartitionWriter[2];
-               writers[0] = new ResultPartitionWriter(partitions[0]);
-               writers[1] = new ResultPartitionWriter(partitions[1]);
-
-               when(mockTask.getAllInputGates()).thenReturn(new 
SingleInputGate[0]);
-               when(mockTask.getAllWriters()).thenReturn(writers);
-               when(mockTask.getProducedPartitions()).thenReturn(partitions);
-
-               env.registerTask(mockTask);
-
-               // Verify
-               ResultPartitionID eagerPartitionId = 
partitions[0].getPartitionId();
-
-               verify(jobManager, times(1)).ask(
-                               eq(new ScheduleOrUpdateConsumers(jobId, 
eagerPartitionId)),
-                               any(FiniteDuration.class));
-       }
-
-       /**
-        * Helper to create a mock result partition.
-        */
-       private static ResultPartition createPartition(
-               Task owningTask,
-               String name,
-               JobID jobId,
-               boolean eagerlyDeployConsumers,
-               NetworkEnvironment env,
-               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier) {
-
-               return new ResultPartition(
-                       name,
-                       owningTask,
-                       jobId,
-                       new ResultPartitionID(),
-                       ResultPartitionType.PIPELINED,
-                       eagerlyDeployConsumers,
-                       1,
-                       env.getResultPartitionManager(),
-                       resultPartitionConsumableNotifier,
-                       mock(IOManager.class),
-                       env.getDefaultIOMode());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 19bb67e..2d3797d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Lists;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -41,7 +40,6 @@ import 
org.apache.flink.runtime.io.network.util.TestPartitionProducer;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
@@ -122,7 +120,6 @@ public class LocalInputChannelTest {
                                jobId,
                                partitionIds[i],
                                ResultPartitionType.PIPELINED,
-                               false,
                                parallelism,
                                partitionManager,
                                partitionConsumableNotifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index c3ba909..48f06b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import java.io.IOException;
-
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.InitializeOnMaster;
@@ -29,10 +27,11 @@ import 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.junit.Test;
 
+import java.io.IOException;
+
 import static org.junit.Assert.*;
 
 @SuppressWarnings("serial")
@@ -130,36 +129,7 @@ public class JobTaskVertexTest {
                        fail(e.getMessage());
                }
        }
-
-       /**
-        * Verifies correct setting of eager deploy settings.
-        */
-       @Test
-       public void testEagerlyDeployConsumers() throws Exception {
-               JobVertex producer = new JobVertex("producer");
-
-               {
-                       JobVertex consumer = new JobVertex("consumer");
-                       JobEdge edge = consumer.connectNewDataSetAsInput(
-                                       producer, 
DistributionPattern.ALL_TO_ALL);
-                       
assertFalse(edge.getSource().getEagerlyDeployConsumers());
-               }
-
-               {
-                       JobVertex consumer = new JobVertex("consumer");
-                       JobEdge edge = consumer.connectNewDataSetAsInput(
-                                       producer, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-                       
assertFalse(edge.getSource().getEagerlyDeployConsumers());
-               }
-
-               {
-                       JobVertex consumer = new JobVertex("consumer");
-                       JobEdge edge = consumer.connectNewDataSetAsInput(
-                                       producer, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true);
-                       
assertTrue(edge.getSource().getEagerlyDeployConsumers());
-               }
-       }
-
+       
        // 
--------------------------------------------------------------------------------------------
        
        private static final class TestingOutputFormat extends 
DiscardingOutputFormat<Object> implements InitializeOnMaster {

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index ad107b1..15947f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.actor.Props;
-import akka.actor.Status;
+import akka.actor.*;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -35,11 +31,7 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.*;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
@@ -630,7 +622,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, false));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -775,7 +767,7 @@ public class TaskManagerTest extends TestLogger {
                                IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
 
                                List<ResultPartitionDeploymentDescriptor> irpdd 
= new ArrayList<ResultPartitionDeploymentDescriptor>();
-                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1, false));
+                               irpdd.add(new 
ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, 
ResultPartitionType.PIPELINED, 1));
 
                                InputGateDeploymentDescriptor ircdd =
                                                new 
InputGateDeploymentDescriptor(
@@ -1427,9 +1419,7 @@ public class TaskManagerTest extends TestLogger {
                                new IntermediateDataSetID(),
                                new IntermediateResultPartitionID(),
                                ResultPartitionType.PIPELINED,
-                               1,
-                               false // don't deploy eagerly but with the 
first completed memory buffer
-                       );
+                               1);
 
                        final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
                                "TestTask", 1, 0, 1, 0, new Configuration(), 
new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 2065a16..48be2e9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -392,20 +392,17 @@ public class StreamingJobGraphGenerator {
                        jobEdge = downStreamVertex.connectNewDataSetAsInput(
                                headVertex,
                                DistributionPattern.POINTWISE,
-                               ResultPartitionType.PIPELINED,
-                               true);
+                               ResultPartitionType.PIPELINED);
                } else if (partitioner instanceof RescalePartitioner){
                        jobEdge = downStreamVertex.connectNewDataSetAsInput(
                                headVertex,
                                DistributionPattern.POINTWISE,
-                               ResultPartitionType.PIPELINED,
-                               true);
+                               ResultPartitionType.PIPELINED);
                } else {
                        jobEdge = downStreamVertex.connectNewDataSetAsInput(
                                        headVertex,
                                        DistributionPattern.ALL_TO_ALL,
-                                       ResultPartitionType.PIPELINED,
-                                       true);
+                                       ResultPartitionType.PIPELINED);
                }
                // set strategy name so that web interface can show it.
                jobEdge.setShipStrategyName(partitioner.toString());

Reply via email to