Repository: flink Updated Branches: refs/heads/master 58204da13 -> 5d5637b01
Revert "[FLINK-3232] [runtime] Add option to eagerly deploy channels" The reverted commit did not really fix anything, but hid the problem by brute force, sending many more schedule or update consumers messages. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d2e8b29 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d2e8b29 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d2e8b29 Branch: refs/heads/master Commit: 0d2e8b2964b58f5610772c6b5bf39a93b9b0fd95 Parents: 58204da Author: Ufuk Celebi <[email protected]> Authored: Wed Nov 9 16:07:22 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Thu Nov 10 22:59:44 2016 +0100 ---------------------------------------------------------------------- .../InputChannelDeploymentDescriptor.java | 17 ++- ...PartialInputChannelDeploymentDescriptor.java | 2 +- .../ResultPartitionDeploymentDescriptor.java | 25 +--- .../executiongraph/ExecutionJobVertex.java | 10 +- .../executiongraph/IntermediateResult.java | 11 +- .../runtime/io/network/NetworkEnvironment.java | 3 - .../io/network/partition/ResultPartition.java | 21 +-- .../partition/ResultPartitionManager.java | 2 - .../runtime/jobgraph/IntermediateDataSet.java | 31 ---- .../flink/runtime/jobgraph/JobVertex.java | 12 +- .../apache/flink/runtime/taskmanager/Task.java | 1 - ...ResultPartitionDeploymentDescriptorTest.java | 4 +- .../ExecutionGraphDeploymentTest.java | 2 +- .../io/network/NetworkEnvironmentTest.java | 147 ------------------- .../consumer/LocalInputChannelTest.java | 3 - .../runtime/jobgraph/JobTaskVertexTest.java | 36 +---- .../runtime/taskmanager/TaskManagerTest.java | 20 +-- .../api/graph/StreamingJobGraphGenerator.java | 9 +- 18 files changed, 35 insertions(+), 321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index 0912055..a72b92f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.io.network.ConnectionID; @@ -88,7 +89,9 @@ public class InputChannelDeploymentDescriptor implements Serializable { * Creates an input channel deployment descriptor for each partition. */ public static InputChannelDeploymentDescriptor[] fromEdges( - ExecutionEdge[] edges, SimpleSlot consumerSlot) { + ExecutionEdge[] edges, + SimpleSlot consumerSlot, + boolean allowLazyDeployment) throws ExecutionGraphException { final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID(); final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length]; @@ -105,9 +108,11 @@ public class InputChannelDeploymentDescriptor implements Serializable { // The producing task needs to be RUNNING or already FINISHED if (consumedPartition.isConsumable() && producerSlot != null && - (producerState == ExecutionState.RUNNING - || producerState == ExecutionState.FINISHED)) { - + (producerState == ExecutionState.RUNNING || + producerState == ExecutionState.FINISHED || + producerState == ExecutionState.SCHEDULED || + producerState == ExecutionState.DEPLOYING)) { + final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation(); final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID(); @@ -124,9 +129,11 @@ public class InputChannelDeploymentDescriptor implements Serializable { partitionLocation = ResultPartitionLocation.createRemote(connectionId); } } - else { + else if (allowLazyDeployment) { // The producing task might not have registered the partition yet partitionLocation = ResultPartitionLocation.createUnknown(); + } else { + throw new ExecutionGraphException("Trying to eagerly schedule a task whose inputs are not ready."); } final ResultPartitionID consumedPartitionId = new ResultPartitionID( http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java index 0eac39d..c925f75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java @@ -21,10 +21,10 @@ package org.apache.flink.runtime.deployment; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import static org.apache.flink.util.Preconditions.checkNotNull; http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index e72d468..2881dde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -48,20 +48,11 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { /** The number of subpartitions. */ private final int numberOfSubpartitions; - /** - * Flag indicating whether to eagerly deploy consumers. - * - * <p>If <code>true</code>, the consumers are deployed as soon as the - * runtime result is registered at the result manager of the task manager. - */ - private final boolean eagerlyDeployConsumers; - public ResultPartitionDeploymentDescriptor( IntermediateDataSetID resultId, IntermediateResultPartitionID partitionId, ResultPartitionType partitionType, - int numberOfSubpartitions, - boolean eagerlyDeployConsumers) { + int numberOfSubpartitions) { this.resultId = checkNotNull(resultId); this.partitionId = checkNotNull(partitionId); @@ -69,7 +60,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; - this.eagerlyDeployConsumers = eagerlyDeployConsumers; } public IntermediateDataSetID getResultId() { @@ -88,16 +78,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { return numberOfSubpartitions; } - /** - * Returns whether consumers should be deployed eagerly (as soon as they - * are registered at the result manager of the task manager). - * - * @return Whether consumers should be deployed eagerly - */ - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } - @Override public String toString() { return String.format("ResultPartitionDeploymentDescriptor [result id: %s, " @@ -129,7 +109,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { } return new ResultPartitionDeploymentDescriptor( - resultId, partitionId, partitionType, numberOfSubpartitions, - partition.getIntermediateResult().getEagerlyDeployConsumers()); + resultId, partitionId, partitionType, numberOfSubpartitions); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 47cfde1..a62ed86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.api.common.Archiveable; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.time.Time; @@ -30,13 +32,11 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.api.common.Archiveable; import org.apache.flink.runtime.instance.SlotProvider; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobEdge; -import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; @@ -45,7 +45,6 @@ import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; - import scala.Option; import java.io.IOException; @@ -161,8 +160,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable result.getId(), this, numTaskVertices, - result.getResultType(), - result.getEagerlyDeployConsumers()); + result.getResultType()); } // create all task vertices http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 9d57014..c2c19d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -46,14 +46,11 @@ public class IntermediateResult { private final ResultPartitionType resultType; - private final boolean eagerlyDeployConsumers; - public IntermediateResult( IntermediateDataSetID id, ExecutionJobVertex producer, int numParallelProducers, - ResultPartitionType resultType, - boolean eagerlyDeployConsumers) { + ResultPartitionType resultType) { this.id = checkNotNull(id); this.producer = checkNotNull(producer); @@ -71,8 +68,6 @@ public class IntermediateResult { // The runtime type for this produced result this.resultType = checkNotNull(resultType); - - this.eagerlyDeployConsumers = eagerlyDeployConsumers; } public void setPartition(int partitionNumber, IntermediateResultPartition partition) { @@ -108,10 +103,6 @@ public class IntermediateResult { return resultType; } - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } - public int registerConsumer() { final int index = numConsumers; numConsumers++; http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index b221ec7..d0032d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -158,8 +157,6 @@ public class NetworkEnvironment { throw new IllegalStateException("Unequal number of writers and partitions."); } - ResultPartitionConsumableNotifier jobManagerNotifier; - synchronized (lock) { if (isShutdown) { throw new IllegalStateException("NetworkEnvironment is shut down"); http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index f06cb43..034b27a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; @@ -28,7 +29,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.taskmanager.TaskActions; import org.apache.flink.runtime.taskmanager.TaskManager; import org.slf4j.Logger; @@ -89,14 +89,6 @@ public class ResultPartition implements BufferPoolOwner { /** Type of this partition. Defines the concrete subpartition implementation to use. */ private final ResultPartitionType partitionType; - /** - * Flag indicating whether to eagerly deploy consumers. - * - * <p>If <code>true</code>, the consumers are deployed as soon as the - * runtime result is registered at the result manager of the task manager. - */ - private final boolean doEagerDeployment; - /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; @@ -137,7 +129,6 @@ public class ResultPartition implements BufferPoolOwner { JobID jobId, ResultPartitionID partitionId, ResultPartitionType partitionType, - boolean doEagerDeployment, int numberOfSubpartitions, ResultPartitionManager partitionManager, ResultPartitionConsumableNotifier partitionConsumableNotifier, @@ -149,7 +140,6 @@ public class ResultPartition implements BufferPoolOwner { this.jobId = checkNotNull(jobId); this.partitionId = checkNotNull(partitionId); this.partitionType = checkNotNull(partitionType); - this.doEagerDeployment = doEagerDeployment; this.subpartitions = new ResultSubpartition[numberOfSubpartitions]; this.partitionManager = checkNotNull(partitionManager); this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier); @@ -366,15 +356,6 @@ public class ResultPartition implements BufferPoolOwner { } /** - * Deploys consumers if eager deployment is activated - */ - public void deployConsumers() { - if (doEagerDeployment) { - partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, taskActions); - } - } - - /** * Releases buffers held by this result partition. * * <p> This is a callback from the buffer pool, which is registered for result partitions, which http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java index 6edae6f..9da3e14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java @@ -58,8 +58,6 @@ public class ResultPartitionManager implements ResultPartitionProvider { throw new IllegalStateException("Result partition already registered."); } - partition.deployConsumers(); - LOG.debug("Registered {}.", partition); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java index c30c78e..2d9faa8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java @@ -45,14 +45,6 @@ public class IntermediateDataSet implements java.io.Serializable { // The type of partition to use at runtime private final ResultPartitionType resultType; - - /** - * Flag indicating whether to eagerly deploy consumers. - * - * <p>If <code>true</code>, the consumers are deployed as soon as the - * runtime result is registered at the result manager of the task manager. - */ - private boolean eagerlyDeployConsumers; // -------------------------------------------------------------------------------------------- @@ -87,29 +79,6 @@ public class IntermediateDataSet implements java.io.Serializable { public ResultPartitionType getResultType() { return resultType; } - - /** - * Sets the flag indicating whether to eagerly deploy consumers (default: - * <code>false</code>). - * - * @param eagerlyDeployConsumers If <code>true</code>, the consumers are - * deployed as soon as the runtime result is - * registered at the result manager of the - * task manager. Default is <code>false</code>. - */ - public void setEagerlyDeployConsumers(boolean eagerlyDeployConsumers) { - this.eagerlyDeployConsumers = eagerlyDeployConsumers; - } - - /** - * Returns whether consumers should be deployed eagerly (as soon as they - * are registered at the result manager of the task manager). - * - * @return Whether consumers should be deployed eagerly - */ - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 8ddc9f5..2bda9d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -382,7 +382,7 @@ public class JobVertex implements java.io.Serializable { } public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) { - return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED, false); + return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED); } public JobEdge connectNewDataSetAsInput( @@ -390,17 +390,7 @@ public class JobVertex implements java.io.Serializable { DistributionPattern distPattern, ResultPartitionType partitionType) { - return connectNewDataSetAsInput(input, distPattern, partitionType, false); - } - - public JobEdge connectNewDataSetAsInput( - JobVertex input, - DistributionPattern distPattern, - ResultPartitionType partitionType, - boolean eagerlyDeployConsumers) { - IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); - dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers); JobEdge edge = new JobEdge(dataSet, this, distPattern); this.inputs.add(edge); http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 827451e..6907606 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -341,7 +341,6 @@ public class Task implements Runnable, TaskActions { jobId, partitionId, desc.getPartitionType(), - desc.getEagerlyDeployConsumers(), desc.getNumberOfSubpartitions(), networkEnvironment.getResultPartitionManager(), resultPartitionConsumableNotifier, http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index d2fcc7b..4b1e546 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -45,8 +45,7 @@ public class ResultPartitionDeploymentDescriptorTest { resultId, partitionId, partitionType, - numberOfSubpartitions, - eagerlyDeployConsumers); + numberOfSubpartitions); ResultPartitionDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); @@ -55,6 +54,5 @@ public class ResultPartitionDeploymentDescriptorTest { assertEquals(partitionId, copy.getPartitionId()); assertEquals(partitionType, copy.getPartitionType()); assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions()); - assertEquals(eagerlyDeployConsumers, copy.getEagerlyDeployConsumers()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 63da1ab..d4acd8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -308,7 +308,7 @@ public class ExecutionGraphDeploymentTest { v1.setInvokableClass(BatchTask.class); v2.setInvokableClass(BatchTask.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, false); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java deleted file mode 100644 index 13da18e..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.taskmanager.ActorGatewayResultPartitionConsumableNotifier; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; -import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.Test; -import scala.Some; -import scala.concurrent.duration.FiniteDuration; -import scala.concurrent.impl.Promise; - -import java.util.concurrent.TimeUnit; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class NetworkEnvironmentTest { - /** - * Registers a task with an eager and non-eager partition at the network - * environment and verifies that there is exactly on schedule or update - * message to the job manager for the eager partition. - */ - @Test - @SuppressWarnings("unchecked") - public void testEagerlyDeployConsumers() throws Exception { - // Mock job manager => expected interactions will be verified - final ActorGateway jobManager = mock(ActorGateway.class); - when(jobManager.ask(anyObject(), any(FiniteDuration.class))) - .thenReturn(new Promise.DefaultPromise<>().future()); - - // Network environment setup - NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration( - 20, - 1024, - MemoryType.HEAP, - IOManager.IOMode.SYNC, - Some.<NettyConfig>empty(), - 0, - 0); - - NetworkEnvironment env = new NetworkEnvironment( - new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize(), config.memoryType()), - new LocalConnectionManager(), - new ResultPartitionManager(), - new TaskEventDispatcher(), - new KvStateRegistry(), - null, - config.ioMode(), - config.partitionRequestInitialBackoff(), - config.partitinRequestMaxBackoff()); - - env.start(); - - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new ActorGatewayResultPartitionConsumableNotifier( - TestingUtils.defaultExecutionContext(), - jobManager, - new FiniteDuration(30L, TimeUnit.SECONDS)); - - // Register mock task - JobID jobId = new JobID(); - Task mockTask = mock(Task.class); - - ResultPartition[] partitions = new ResultPartition[2]; - partitions[0] = createPartition(mockTask, "p1", jobId, true, env, resultPartitionConsumableNotifier); - partitions[1] = createPartition(mockTask, "p2", jobId, false, env, resultPartitionConsumableNotifier); - - ResultPartitionWriter[] writers = new ResultPartitionWriter[2]; - writers[0] = new ResultPartitionWriter(partitions[0]); - writers[1] = new ResultPartitionWriter(partitions[1]); - - when(mockTask.getAllInputGates()).thenReturn(new SingleInputGate[0]); - when(mockTask.getAllWriters()).thenReturn(writers); - when(mockTask.getProducedPartitions()).thenReturn(partitions); - - env.registerTask(mockTask); - - // Verify - ResultPartitionID eagerPartitionId = partitions[0].getPartitionId(); - - verify(jobManager, times(1)).ask( - eq(new ScheduleOrUpdateConsumers(jobId, eagerPartitionId)), - any(FiniteDuration.class)); - } - - /** - * Helper to create a mock result partition. - */ - private static ResultPartition createPartition( - Task owningTask, - String name, - JobID jobId, - boolean eagerlyDeployConsumers, - NetworkEnvironment env, - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier) { - - return new ResultPartition( - name, - owningTask, - jobId, - new ResultPartitionID(), - ResultPartitionType.PIPELINED, - eagerlyDeployConsumers, - 1, - env.getResultPartitionManager(), - resultPartitionConsumableNotifier, - mock(IOManager.class), - env.getDefaultIOMode()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 19bb67e..2d3797d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.collect.Lists; - import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.execution.CancelTaskException; @@ -41,7 +40,6 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; - import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -122,7 +120,6 @@ public class LocalInputChannelTest { jobId, partitionIds[i], ResultPartitionType.PIPELINED, - false, parallelism, partitionManager, partitionConsumableNotifier, http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java index c3ba909..48f06b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.jobgraph; -import java.io.IOException; - import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.common.io.InitializeOnMaster; @@ -29,10 +27,11 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.operators.util.TaskConfig; import org.junit.Test; +import java.io.IOException; + import static org.junit.Assert.*; @SuppressWarnings("serial") @@ -130,36 +129,7 @@ public class JobTaskVertexTest { fail(e.getMessage()); } } - - /** - * Verifies correct setting of eager deploy settings. - */ - @Test - public void testEagerlyDeployConsumers() throws Exception { - JobVertex producer = new JobVertex("producer"); - - { - JobVertex consumer = new JobVertex("consumer"); - JobEdge edge = consumer.connectNewDataSetAsInput( - producer, DistributionPattern.ALL_TO_ALL); - assertFalse(edge.getSource().getEagerlyDeployConsumers()); - } - - { - JobVertex consumer = new JobVertex("consumer"); - JobEdge edge = consumer.connectNewDataSetAsInput( - producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); - assertFalse(edge.getSource().getEagerlyDeployConsumers()); - } - - { - JobVertex consumer = new JobVertex("consumer"); - JobEdge edge = consumer.connectNewDataSetAsInput( - producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true); - assertTrue(edge.getSource().getEagerlyDeployConsumers()); - } - } - + // -------------------------------------------------------------------------------------------- private static final class TestingOutputFormat extends DiscardingOutputFormat<Object> implements InitializeOnMaster { http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index ad107b1..15947f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -18,11 +18,7 @@ package org.apache.flink.runtime.taskmanager; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Kill; -import akka.actor.Props; -import akka.actor.Status; +import akka.actor.*; import akka.japi.Creator; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.ExecutionConfig; @@ -35,11 +31,7 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; -import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; -import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionLocation; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.deployment.*; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; @@ -630,7 +622,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -775,7 +767,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -1427,9 +1419,7 @@ public class TaskManagerTest extends TestLogger { new IntermediateDataSetID(), new IntermediateResultPartitionID(), ResultPartitionType.PIPELINED, - 1, - false // don't deploy eagerly but with the first completed memory buffer - ); + 1); final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, "TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 2065a16..48be2e9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -392,20 +392,17 @@ public class StreamingJobGraphGenerator { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED, - true); + ResultPartitionType.PIPELINED); } else if (partitioner instanceof RescalePartitioner){ jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED, - true); + ResultPartitionType.PIPELINED); } else { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.ALL_TO_ALL, - ResultPartitionType.PIPELINED, - true); + ResultPartitionType.PIPELINED); } // set strategy name so that web interface can show it. jobEdge.setShipStrategyName(partitioner.toString());
