Revert "[FLINK-3232] [runtime] Add option to eagerly deploy channels"
The reverted commit did not really fix anything, but hid the problem by brute force, sending many more schedule or update consumers messages. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5a4cb6c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5a4cb6c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5a4cb6c Branch: refs/heads/release-1.1 Commit: b5a4cb6cc9ba7099045f145a2fe3c58567253b4f Parents: 9a19ca1 Author: Ufuk Celebi <[email protected]> Authored: Thu Nov 10 14:01:22 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Thu Nov 10 21:53:30 2016 +0100 ---------------------------------------------------------------------- .../ResultPartitionDeploymentDescriptor.java | 32 ++----- .../executiongraph/ExecutionJobVertex.java | 3 +- .../executiongraph/IntermediateResult.java | 17 +--- .../runtime/io/network/NetworkEnvironment.java | 13 --- .../io/network/partition/ResultPartition.java | 40 ++------ .../runtime/jobgraph/IntermediateDataSet.java | 23 ----- .../flink/runtime/jobgraph/JobVertex.java | 12 +-- .../apache/flink/runtime/taskmanager/Task.java | 1 - .../NetworkEnvironmentConfiguration.scala | 1 + ...ResultPartitionDeploymentDescriptorTest.java | 6 +- .../ExecutionGraphDeploymentTest.java | 2 +- .../io/network/NetworkEnvironmentTest.java | 98 -------------------- .../consumer/LocalInputChannelTest.java | 3 +- .../runtime/jobgraph/JobTaskVertexTest.java | 40 ++------ .../runtime/taskmanager/TaskManagerTest.java | 8 +- .../api/graph/StreamingJobGraphGenerator.java | 9 +- 16 files changed, 41 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index e72d468..ecdacbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -48,20 +48,11 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { /** The number of subpartitions. */ private final int numberOfSubpartitions; - /** - * Flag indicating whether to eagerly deploy consumers. - * - * <p>If <code>true</code>, the consumers are deployed as soon as the - * runtime result is registered at the result manager of the task manager. - */ - private final boolean eagerlyDeployConsumers; - public ResultPartitionDeploymentDescriptor( - IntermediateDataSetID resultId, - IntermediateResultPartitionID partitionId, - ResultPartitionType partitionType, - int numberOfSubpartitions, - boolean eagerlyDeployConsumers) { + IntermediateDataSetID resultId, + IntermediateResultPartitionID partitionId, + ResultPartitionType partitionType, + int numberOfSubpartitions) { this.resultId = checkNotNull(resultId); this.partitionId = checkNotNull(partitionId); @@ -69,7 +60,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; - this.eagerlyDeployConsumers = eagerlyDeployConsumers; } public IntermediateDataSetID getResultId() { @@ -88,16 +78,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { return numberOfSubpartitions; } - /** - * Returns whether consumers should be deployed eagerly (as soon as they - * are registered at the result manager of the task manager). - * - * @return Whether consumers should be deployed eagerly - */ - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } - @Override public String toString() { return String.format("ResultPartitionDeploymentDescriptor [result id: %s, " @@ -129,7 +109,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { } return new ResultPartitionDeploymentDescriptor( - resultId, partitionId, partitionType, numberOfSubpartitions, - partition.getIntermediateResult().getEagerlyDeployConsumers()); + resultId, partitionId, partitionType, numberOfSubpartitions + ); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 9e175f1..65259ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -147,8 +147,7 @@ public class ExecutionJobVertex implements Serializable { result.getId(), this, numTaskVertices, - result.getResultType(), - result.getEagerlyDeployConsumers()); + result.getResultType()); } // create all task vertices http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 9d57014..67b1fe0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -46,14 +46,11 @@ public class IntermediateResult { private final ResultPartitionType resultType; - private final boolean eagerlyDeployConsumers; - public IntermediateResult( - IntermediateDataSetID id, - ExecutionJobVertex producer, - int numParallelProducers, - ResultPartitionType resultType, - boolean eagerlyDeployConsumers) { + IntermediateDataSetID id, + ExecutionJobVertex producer, + int numParallelProducers, + ResultPartitionType resultType) { this.id = checkNotNull(id); this.producer = checkNotNull(producer); @@ -71,8 +68,6 @@ public class IntermediateResult { // The runtime type for this produced result this.resultType = checkNotNull(resultType); - - this.eagerlyDeployConsumers = eagerlyDeployConsumers; } public void setPartition(int partitionNumber, IntermediateResultPartition partition) { @@ -108,10 +103,6 @@ public class IntermediateResult { return resultType; } - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } - public int registerConsumer() { final int index = numConsumers; numConsumers++; http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 30d2e38..11661cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -277,8 +277,6 @@ public class NetworkEnvironment { throw new IllegalStateException("Unequal number of writers and partitions."); } - ResultPartitionConsumableNotifier jobManagerNotifier; - synchronized (lock) { if (isShutdown) { throw new IllegalStateException("NetworkEnvironment is shut down"); @@ -340,17 +338,6 @@ public class NetworkEnvironment { } } } - - // Copy the reference to prevent races with concurrent shut downs - jobManagerNotifier = partitionConsumableNotifier; - } - - for (ResultPartition partition : producedPartitions) { - // Eagerly notify consumers if required. - if (partition.getEagerlyDeployConsumers()) { - jobManagerNotifier.notifyPartitionConsumable( - partition.getJobId(), partition.getPartitionId()); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 7c109f3..a60f95d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; @@ -28,7 +29,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.taskmanager.TaskManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,14 +86,6 @@ public class ResultPartition implements BufferPoolOwner { /** Type of this partition. Defines the concrete subpartition implementation to use. */ private final ResultPartitionType partitionType; - /** - * Flag indicating whether to eagerly deploy consumers. - * - * <p>If <code>true</code>, the consumers are deployed as soon as the - * runtime result is registered at the result manager of the task manager. - */ - private final boolean eagerlyDeployConsumers; - /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; @@ -129,22 +121,20 @@ public class ResultPartition implements BufferPoolOwner { private long totalNumberOfBytes; public ResultPartition( - String owningTaskName, - JobID jobId, - ResultPartitionID partitionId, - ResultPartitionType partitionType, - boolean eagerlyDeployConsumers, - int numberOfSubpartitions, - ResultPartitionManager partitionManager, - ResultPartitionConsumableNotifier partitionConsumableNotifier, - IOManager ioManager, - IOMode defaultIoMode) { + String owningTaskName, + JobID jobId, + ResultPartitionID partitionId, + ResultPartitionType partitionType, + int numberOfSubpartitions, + ResultPartitionManager partitionManager, + ResultPartitionConsumableNotifier partitionConsumableNotifier, + IOManager ioManager, + IOMode defaultIoMode) { this.owningTaskName = checkNotNull(owningTaskName); this.jobId = checkNotNull(jobId); this.partitionId = checkNotNull(partitionId); this.partitionType = checkNotNull(partitionType); - this.eagerlyDeployConsumers = eagerlyDeployConsumers; this.subpartitions = new ResultSubpartition[numberOfSubpartitions]; this.partitionManager = checkNotNull(partitionManager); this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier); @@ -211,16 +201,6 @@ public class ResultPartition implements BufferPoolOwner { return subpartitions.length; } - /** - * Returns whether consumers should be deployed eagerly (as soon as they - * are registered at the result manager of the task manager). - * - * @return Whether consumers should be deployed eagerly - */ - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } - public BufferProvider getBufferProvider() { return bufferPool; } http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java index c30c78e..fdc5d1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java @@ -88,29 +88,6 @@ public class IntermediateDataSet implements java.io.Serializable { return resultType; } - /** - * Sets the flag indicating whether to eagerly deploy consumers (default: - * <code>false</code>). - * - * @param eagerlyDeployConsumers If <code>true</code>, the consumers are - * deployed as soon as the runtime result is - * registered at the result manager of the - * task manager. Default is <code>false</code>. - */ - public void setEagerlyDeployConsumers(boolean eagerlyDeployConsumers) { - this.eagerlyDeployConsumers = eagerlyDeployConsumers; - } - - /** - * Returns whether consumers should be deployed eagerly (as soon as they - * are registered at the result manager of the task manager). - * - * @return Whether consumers should be deployed eagerly - */ - public boolean getEagerlyDeployConsumers() { - return eagerlyDeployConsumers; - } - // -------------------------------------------------------------------------------------------- public void addConsumer(JobEdge edge) { http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 379a42a..47dcb36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -357,7 +357,7 @@ public class JobVertex implements java.io.Serializable { } public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) { - return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED, false); + return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED); } public JobEdge connectNewDataSetAsInput( @@ -365,17 +365,7 @@ public class JobVertex implements java.io.Serializable { DistributionPattern distPattern, ResultPartitionType partitionType) { - return connectNewDataSetAsInput(input, distPattern, partitionType, false); - } - - public JobEdge connectNewDataSetAsInput( - JobVertex input, - DistributionPattern distPattern, - ResultPartitionType partitionType, - boolean eagerlyDeployConsumers) { - IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); - dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers); JobEdge edge = new JobEdge(dataSet, this, distPattern); this.inputs.add(edge); http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 00046b7..dd14aaf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -332,7 +332,6 @@ public class Task implements Runnable { jobId, partitionId, desc.getPartitionType(), - desc.getEagerlyDeployConsumers(), desc.getNumberOfSubpartitions(), networkEnvironment.getPartitionManager(), networkEnvironment.getPartitionConsumableNotifier(), http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala index 065211c..619da96 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager import org.apache.flink.core.memory.MemoryType import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode + import org.apache.flink.runtime.io.network.netty.NettyConfig case class NetworkEnvironmentConfiguration( http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index d2fcc7b..1986eae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -38,15 +38,14 @@ public class ResultPartitionDeploymentDescriptorTest { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); ResultPartitionType partitionType = ResultPartitionType.PIPELINED; int numberOfSubpartitions = 24; - boolean eagerlyDeployConsumers = true; ResultPartitionDeploymentDescriptor orig = new ResultPartitionDeploymentDescriptor( resultId, partitionId, partitionType, - numberOfSubpartitions, - eagerlyDeployConsumers); + numberOfSubpartitions + ); ResultPartitionDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); @@ -55,6 +54,5 @@ public class ResultPartitionDeploymentDescriptorTest { assertEquals(partitionId, copy.getPartitionId()); assertEquals(partitionType, copy.getPartitionType()); assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions()); - assertEquals(eagerlyDeployConsumers, copy.getEagerlyDeployConsumers()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 332c8cd..cfbde6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -312,7 +312,7 @@ public class ExecutionGraphDeploymentTest { v1.setInvokableClass(BatchTask.class); v2.setInvokableClass(BatchTask.class); - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, false); + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index fca3ceb..a659be3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -18,29 +18,19 @@ package org.apache.flink.runtime.io.network; -import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; -import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.NetUtils; import org.junit.Test; import scala.Some; import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; -import scala.concurrent.impl.Promise; import java.net.InetAddress; import java.util.concurrent.TimeUnit; @@ -51,13 +41,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class NetworkEnvironmentTest { @@ -151,85 +134,4 @@ public class NetworkEnvironmentTest { fail(e.getMessage()); } } - - - /** - * Registers a task with an eager and non-eager partition at the network - * environment and verifies that there is exactly on schedule or update - * message to the job manager for the eager partition. - */ - @Test - @SuppressWarnings("unchecked") - public void testEagerlyDeployConsumers() throws Exception { - // Mock job manager => expected interactions will be verified - ActorGateway jobManager = mock(ActorGateway.class); - when(jobManager.ask(anyObject(), any(FiniteDuration.class))) - .thenReturn(new Promise.DefaultPromise<>().future()); - - // Network environment setup - NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration( - 20, - 1024, - MemoryType.HEAP, - IOManager.IOMode.SYNC, - Some.<NettyConfig>empty(), - new Tuple2<>(0, 0)); - - NetworkEnvironment env = new NetworkEnvironment( - TestingUtils.defaultExecutionContext(), - new FiniteDuration(30, TimeUnit.SECONDS), - config); - - // Associate the environment with the mock actors - env.associateWithTaskManagerAndJobManager( - jobManager, - DummyActorGateway.INSTANCE); - - // Register mock task - JobID jobId = new JobID(); - - ResultPartition[] partitions = new ResultPartition[2]; - partitions[0] = createPartition("p1", jobId, true, env); - partitions[1] = createPartition("p2", jobId, false, env); - - ResultPartitionWriter[] writers = new ResultPartitionWriter[2]; - writers[0] = new ResultPartitionWriter(partitions[0]); - writers[1] = new ResultPartitionWriter(partitions[1]); - - Task mockTask = mock(Task.class); - when(mockTask.getAllInputGates()).thenReturn(new SingleInputGate[0]); - when(mockTask.getAllWriters()).thenReturn(writers); - when(mockTask.getProducedPartitions()).thenReturn(partitions); - - env.registerTask(mockTask); - - // Verify - ResultPartitionID eagerPartitionId = partitions[0].getPartitionId(); - - verify(jobManager, times(1)).ask( - eq(new ScheduleOrUpdateConsumers(jobId, eagerPartitionId)), - any(FiniteDuration.class)); - } - - /** - * Helper to create a mock result partition. - */ - private static ResultPartition createPartition( - String name, - JobID jobId, - boolean eagerlyDeployConsumers, - NetworkEnvironment env) { - - return new ResultPartition( - name, - jobId, - new ResultPartitionID(), - ResultPartitionType.PIPELINED, - eagerlyDeployConsumers, - 1, - env.getPartitionManager(), - env.getPartitionConsumableNotifier(), - mock(IOManager.class), - env.getDefaultIOMode()); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index f91a4ba..88a3ff5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -119,8 +119,7 @@ public class LocalInputChannelTest { jobId, partitionIds[i], ResultPartitionType.PIPELINED, - false, - parallelism, + parallelism, partitionManager, partitionConsumableNotifier, ioManager, http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java index c3ba909..4f2d807 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.jobgraph; -import java.io.IOException; - import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.common.io.InitializeOnMaster; @@ -29,11 +27,16 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.operators.util.TaskConfig; import org.junit.Test; -import static org.junit.Assert.*; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @SuppressWarnings("serial") public class JobTaskVertexTest { @@ -131,35 +134,6 @@ public class JobTaskVertexTest { } } - /** - * Verifies correct setting of eager deploy settings. - */ - @Test - public void testEagerlyDeployConsumers() throws Exception { - JobVertex producer = new JobVertex("producer"); - - { - JobVertex consumer = new JobVertex("consumer"); - JobEdge edge = consumer.connectNewDataSetAsInput( - producer, DistributionPattern.ALL_TO_ALL); - assertFalse(edge.getSource().getEagerlyDeployConsumers()); - } - - { - JobVertex consumer = new JobVertex("consumer"); - JobEdge edge = consumer.connectNewDataSetAsInput( - producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); - assertFalse(edge.getSource().getEagerlyDeployConsumers()); - } - - { - JobVertex consumer = new JobVertex("consumer"); - JobEdge edge = consumer.connectNewDataSetAsInput( - producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true); - assertTrue(edge.getSource().getEagerlyDeployConsumers()); - } - } - // -------------------------------------------------------------------------------------------- private static final class TestingOutputFormat extends DiscardingOutputFormat<Object> implements InitializeOnMaster { http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 53117d0..779a17d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -630,7 +630,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -775,7 +775,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -1430,8 +1430,8 @@ public class TaskManagerTest extends TestLogger { new IntermediateDataSetID(), new IntermediateResultPartitionID(), ResultPartitionType.PIPELINED, - 1, - false // don't deploy eagerly but with the first completed memory buffer + 1 + // don't deploy eagerly but with the first completed memory buffer ); final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index a63b089..20f5981 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -371,20 +371,17 @@ public class StreamingJobGraphGenerator { downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED, - true); + ResultPartitionType.PIPELINED); } else if (partitioner instanceof RescalePartitioner){ downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED, - true); + ResultPartitionType.PIPELINED); } else { downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.ALL_TO_ALL, - ResultPartitionType.PIPELINED, - true); + ResultPartitionType.PIPELINED); } if (LOG.isDebugEnabled()) {
