[FLINK-5040] [jobmanager] Set correct input channel types with eager scheduling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2742d5c1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2742d5c1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2742d5c1 Branch: refs/heads/master Commit: 2742d5c1761ca02d871333e91a8ecbc6d0a52f6c Parents: 0d2e8b2 Author: Ufuk Celebi <[email protected]> Authored: Wed Nov 9 18:25:06 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Thu Nov 10 23:06:55 2016 +0100 ---------------------------------------------------------------------- .../ResultPartitionDeploymentDescriptor.java | 17 +- .../runtime/executiongraph/ExecutionVertex.java | 21 +- .../runtime/io/network/PartitionState.java | 18 +- .../io/network/partition/ResultPartition.java | 8 +- .../flink/runtime/jobgraph/ScheduleMode.java | 10 +- .../apache/flink/runtime/taskmanager/Task.java | 10 +- .../flink/runtime/jobmanager/JobManager.scala | 2 +- .../InputChannelDeploymentDescriptorTest.java | 206 +++++++++++++++++++ ...ResultPartitionDeploymentDescriptorTest.java | 6 +- .../ExecutionVertexDeploymentTest.java | 106 ++++++---- .../network/partition/ResultPartitionTest.java | 92 +++++++++ .../consumer/LocalInputChannelTest.java | 3 +- .../runtime/jobgraph/ScheduleModeTest.java | 36 ++++ .../runtime/taskmanager/TaskManagerTest.java | 19 +- .../flink/runtime/taskmanager/TaskTest.java | 5 +- 15 files changed, 491 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index 2881dde..2ecde80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -47,12 +47,16 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { /** The number of subpartitions. */ private final int numberOfSubpartitions; + + /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ + private final boolean lazyScheduling; public ResultPartitionDeploymentDescriptor( IntermediateDataSetID resultId, IntermediateResultPartitionID partitionId, ResultPartitionType partitionType, - int numberOfSubpartitions) { + int numberOfSubpartitions, + boolean lazyScheduling) { this.resultId = checkNotNull(resultId); this.partitionId = checkNotNull(partitionId); @@ -60,6 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; + this.lazyScheduling = lazyScheduling; } public IntermediateDataSetID getResultId() { @@ -78,6 +83,10 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { return numberOfSubpartitions; } + public boolean allowLazyScheduling() { + return lazyScheduling; + } + @Override public String toString() { return String.format("ResultPartitionDeploymentDescriptor [result id: %s, " @@ -87,7 +96,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { // ------------------------------------------------------------------------ - public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition) { + public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition, boolean lazyScheduling) { final IntermediateDataSetID resultId = partition.getIntermediateResult().getId(); final IntermediateResultPartitionID partitionId = partition.getPartitionId(); @@ -102,13 +111,13 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) { if (partition.getConsumers().size() > 1) { - new IllegalStateException("Currently, only a single consumer group per partition is supported."); + throw new IllegalStateException("Currently, only a single consumer group per partition is supported."); } numberOfSubpartitions = partition.getConsumers().get(0).size(); } return new ResultPartitionDeploymentDescriptor( - resultId, partitionId, partitionType, numberOfSubpartitions); + resultId, partitionId, partitionType, numberOfSubpartitions, lazyScheduling); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index e7f000c..01e8660 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -568,21 +568,24 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi ExecutionAttemptID executionId, SimpleSlot targetSlot, TaskStateHandles taskStateHandles, - int attemptNumber) { - + int attemptNumber) throws ExecutionGraphException { + // Produced intermediate results - List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size()); + List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<>(resultPartitions.size()); + + // Consumed intermediate results + List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<>(inputEdges.length); + + boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment(); for (IntermediateResultPartition partition : resultPartitions.values()) { - producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition)); + producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling)); } - - // Consumed intermediate results - List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<InputGateDeploymentDescriptor>(); - + + for (ExecutionEdge[] edges : inputEdges) { InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor - .fromEdges(edges, targetSlot); + .fromEdges(edges, targetSlot, lazyScheduling); // If the produced partition has multiple consumers registered, we // need to request the one matching our sub task index. http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java index 083412b..59357fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java @@ -23,18 +23,25 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; +import java.io.Serializable; + /** * Contains information about the state of a result partition. */ -public class PartitionState { +public class PartitionState implements Serializable { + + private static final long serialVersionUID = -4693651272083825031L; + private final IntermediateDataSetID intermediateDataSetID; private final IntermediateResultPartitionID intermediateResultPartitionID; private final ExecutionState executionState; public PartitionState( - IntermediateDataSetID intermediateDataSetID, - IntermediateResultPartitionID intermediateResultPartitionID, - ExecutionState executionState) { + IntermediateDataSetID intermediateDataSetID, + IntermediateResultPartitionID intermediateResultPartitionID, + @Nullable ExecutionState executionState) { + this.intermediateDataSetID = Preconditions.checkNotNull(intermediateDataSetID); this.intermediateResultPartitionID = Preconditions.checkNotNull(intermediateResultPartitionID); this.executionState = executionState; @@ -48,6 +55,9 @@ public class PartitionState { return intermediateResultPartitionID; } + /** + * Returns the execution state of the partition producer or <code>null</code> if it is not available. + */ public ExecutionState getExecutionState() { return executionState; } http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 034b27a..834318c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -96,6 +96,8 @@ public class ResultPartition implements BufferPoolOwner { private final ResultPartitionConsumableNotifier partitionConsumableNotifier; + private final boolean sendScheduleOrUpdateConsumersMessage; + // - Runtime state -------------------------------------------------------- private final AtomicBoolean isReleased = new AtomicBoolean(); @@ -133,7 +135,8 @@ public class ResultPartition implements BufferPoolOwner { ResultPartitionManager partitionManager, ResultPartitionConsumableNotifier partitionConsumableNotifier, IOManager ioManager, - IOMode defaultIoMode) { + IOMode defaultIoMode, + boolean sendScheduleOrUpdateConsumersMessage) { this.owningTaskName = checkNotNull(owningTaskName); this.taskActions = checkNotNull(taskActions); @@ -143,6 +146,7 @@ public class ResultPartition implements BufferPoolOwner { this.subpartitions = new ResultSubpartition[numberOfSubpartitions]; this.partitionManager = checkNotNull(partitionManager); this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier); + this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage; // Create the subpartitions. switch (partitionType) { @@ -437,7 +441,7 @@ public class ResultPartition implements BufferPoolOwner { * Notifies pipelined consumers of this result partition once. */ private void notifyPipelinedConsumers() { - if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) { + if (sendScheduleOrUpdateConsumersMessage && !hasNotifiedPipelinedConsumers && partitionType.isPipelined()) { partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, taskActions); hasNotifiedPipelinedConsumers = true; http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java index 9405067..6a98e46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java @@ -27,5 +27,13 @@ public enum ScheduleMode { LAZY_FROM_SOURCES, /** Schedules all tasks immediately. */ - EAGER + EAGER; + + /** + * Returns whether we are allowed to deploy consumers lazily. + */ + public boolean allowLazyDeployment() { + return this == LAZY_FROM_SOURCES; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 6907606..4f3dd54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -345,7 +345,8 @@ public class Task implements Runnable, TaskActions { networkEnvironment.getResultPartitionManager(), resultPartitionConsumableNotifier, ioManager, - networkEnvironment.getDefaultIOMode()); + networkEnvironment.getDefaultIOMode(), + desc.allowLazyScheduling()); writers[counter] = new ResultPartitionWriter(producedPartitions[counter]); @@ -568,6 +569,7 @@ public class Task implements Runnable, TaskActions { // ---------------------------------------------------------------- LOG.info("Registering task at network: " + this); + network.registerTask(this); // next, kick off the background copying of files for the distributed cache @@ -1135,7 +1137,11 @@ public class Task implements Runnable, TaskActions { final SingleInputGate inputGate = inputGatesById.get(resultId); if (inputGate != null) { - if (partitionState == ExecutionState.RUNNING) { + if (partitionState == ExecutionState.RUNNING || + partitionState == ExecutionState.FINISHED || + partitionState == ExecutionState.SCHEDULED || + partitionState == ExecutionState.DEPLOYING) { + // Retrigger the partition request inputGate.retriggerPartitionRequest(partitionId); } http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9af5355..b2e1002 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -948,7 +948,7 @@ class JobManager( if (execution != null) execution.getState else null case None => // Nothing to do. This is not an error, because the request is received when a sending - // task fails during a remote partition request. + // task fails or is not yet available during a remote partition request. log.debug(s"Cannot find execution graph for job $jobId.") null http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java new file mode 100644 index 0000000..e9e8901 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.junit.Test; + +import java.net.InetAddress; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class InputChannelDeploymentDescriptorTest { + + /** + * Tests the deployment descriptors for local, remote, and unknown partition + * locations (with lazy deployment allowed and all execution states for the + * producers). + */ + @Test + public void testMixedLocalRemoteUnknownDeployment() throws Exception { + boolean allowLazyDeployment = true; + + ResourceID consumerResourceId = ResourceID.generate(); + ExecutionVertex consumer = mock(ExecutionVertex.class); + SimpleSlot consumerSlot = mockSlot(consumerResourceId); + + // Local and remote channel are only allowed for certain execution + // states. + for (ExecutionState state : ExecutionState.values()) { + // Local partition + ExecutionVertex localProducer = mockExecutionVertex(state, consumerResourceId); + IntermediateResultPartition localPartition = mockPartition(localProducer); + ResultPartitionID localPartitionId = new ResultPartitionID(localPartition.getPartitionId(), localProducer.getCurrentExecutionAttempt().getAttemptId()); + ExecutionEdge localEdge = new ExecutionEdge(localPartition, consumer, 0); + + // Remote partition + ExecutionVertex remoteProducer = mockExecutionVertex(state, ResourceID.generate()); // new resource ID + IntermediateResultPartition remotePartition = mockPartition(remoteProducer); + ResultPartitionID remotePartitionId = new ResultPartitionID(remotePartition.getPartitionId(), remoteProducer.getCurrentExecutionAttempt().getAttemptId()); + ConnectionID remoteConnectionId = new ConnectionID(remoteProducer.getCurrentAssignedResource().getTaskManagerLocation(), 0); + ExecutionEdge remoteEdge = new ExecutionEdge(remotePartition, consumer, 1); + + // Unknown partition + ExecutionVertex unknownProducer = mockExecutionVertex(state, null); // no assigned resource + IntermediateResultPartition unknownPartition = mockPartition(unknownProducer); + ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId()); + ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2); + + InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges( + new ExecutionEdge[]{localEdge, remoteEdge, unknownEdge}, + consumerSlot, + allowLazyDeployment); + + assertEquals(3, desc.length); + + // These states are allowed + if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHED || + state == ExecutionState.SCHEDULED || state == ExecutionState.DEPLOYING) { + + // Create local or remote channels + assertEquals(localPartitionId, desc[0].getConsumedPartitionId()); + assertTrue(desc[0].getConsumedPartitionLocation().isLocal()); + assertNull(desc[0].getConsumedPartitionLocation().getConnectionId()); + + assertEquals(remotePartitionId, desc[1].getConsumedPartitionId()); + assertTrue(desc[1].getConsumedPartitionLocation().isRemote()); + assertEquals(remoteConnectionId, desc[1].getConsumedPartitionLocation().getConnectionId()); + } else { + // Unknown (lazy deployment allowed) + assertEquals(localPartitionId, desc[0].getConsumedPartitionId()); + assertTrue(desc[0].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[0].getConsumedPartitionLocation().getConnectionId()); + + assertEquals(remotePartitionId, desc[1].getConsumedPartitionId()); + assertTrue(desc[1].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[1].getConsumedPartitionLocation().getConnectionId()); + } + + assertEquals(unknownPartitionId, desc[2].getConsumedPartitionId()); + assertTrue(desc[2].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[2].getConsumedPartitionLocation().getConnectionId()); + } + } + + @Test + public void testUnknownChannelWithoutLazyDeploymentThrows() throws Exception { + ResourceID consumerResourceId = ResourceID.generate(); + ExecutionVertex consumer = mock(ExecutionVertex.class); + SimpleSlot consumerSlot = mockSlot(consumerResourceId); + + + // Unknown partition + ExecutionVertex unknownProducer = mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource + IntermediateResultPartition unknownPartition = mockPartition(unknownProducer); + ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId()); + ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2); + + // This should work if lazy deployment is allowed + boolean allowLazyDeployment = true; + + InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges( + new ExecutionEdge[]{unknownEdge}, + consumerSlot, + allowLazyDeployment); + + assertEquals(1, desc.length); + + assertEquals(unknownPartitionId, desc[0].getConsumedPartitionId()); + assertTrue(desc[0].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[0].getConsumedPartitionLocation().getConnectionId()); + + + try { + // Fail if lazy deployment is *not* allowed + allowLazyDeployment = false; + + InputChannelDeploymentDescriptor.fromEdges( + new ExecutionEdge[]{unknownEdge}, + consumerSlot, + allowLazyDeployment); + + fail("Did not throw expected ExecutionGraphException"); + } catch (ExecutionGraphException ignored) { + } + } + + // ------------------------------------------------------------------------ + + private static SimpleSlot mockSlot(ResourceID resourceId) { + SimpleSlot slot = mock(SimpleSlot.class); + when(slot.getTaskManagerLocation()).thenReturn(new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 5000)); + when(slot.getTaskManagerID()).thenReturn(resourceId); + + return slot; + } + + private static ExecutionVertex mockExecutionVertex(ExecutionState state, ResourceID resourceId) { + ExecutionVertex vertex = mock(ExecutionVertex.class); + + Execution exec = mock(Execution.class); + when(exec.getState()).thenReturn(state); + when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID()); + + if (resourceId != null) { + SimpleSlot slot = mockSlot(resourceId); + when(exec.getAssignedResource()).thenReturn(slot); + when(vertex.getCurrentAssignedResource()).thenReturn(slot); + } else { + when(exec.getAssignedResource()).thenReturn(null); // no resource + when(vertex.getCurrentAssignedResource()).thenReturn(null); + } + + when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); + + return vertex; + } + + private static IntermediateResultPartition mockPartition(ExecutionVertex producer) { + IntermediateResultPartition partition = mock(IntermediateResultPartition.class); + when(partition.isConsumable()).thenReturn(true); + + IntermediateResult result = mock(IntermediateResult.class); + when(result.getConnectionIndex()).thenReturn(0); + + when(partition.getIntermediateResult()).thenReturn(result); + when(partition.getPartitionId()).thenReturn(new IntermediateResultPartitionID()); + + when(partition.getProducer()).thenReturn(producer); + + return partition; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 4b1e546..4223b49 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class ResultPartitionDeploymentDescriptorTest { @@ -38,14 +39,14 @@ public class ResultPartitionDeploymentDescriptorTest { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); ResultPartitionType partitionType = ResultPartitionType.PIPELINED; int numberOfSubpartitions = 24; - boolean eagerlyDeployConsumers = true; ResultPartitionDeploymentDescriptor orig = new ResultPartitionDeploymentDescriptor( resultId, partitionId, partitionType, - numberOfSubpartitions); + numberOfSubpartitions, + true); ResultPartitionDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); @@ -54,5 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest { assertEquals(partitionId, copy.getPartitionId()); assertEquals(partitionType, copy.getPartitionType()); assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions()); + assertTrue(copy.allowLazyScheduling()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 54aeff9..8bc39a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -18,20 +18,37 @@ package org.apache.flink.runtime.executiongraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; - -import static org.junit.Assert.*; - +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.testingUtils.TestingUtils; - import org.junit.Test; +import java.util.Collection; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ERROR_MESSAGE; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleFailingActorGateway; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class ExecutionVertexDeploymentTest { @Test @@ -48,7 +65,7 @@ public class ExecutionVertexDeploymentTest { final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); vertex.deployToSlot(slot); @@ -58,8 +75,7 @@ public class ExecutionVertexDeploymentTest { try { vertex.deployToSlot(slot); fail("Scheduled from wrong state"); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { // as expected } @@ -67,8 +83,7 @@ public class ExecutionVertexDeploymentTest { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -82,12 +97,12 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); final Instance instance = getInstance( - new ActorTaskManagerGateway( - new SimpleActorGateway(TestingUtils.directExecutionContext()))); + new ActorTaskManagerGateway( + new SimpleActorGateway(TestingUtils.directExecutionContext()))); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -99,8 +114,7 @@ public class ExecutionVertexDeploymentTest { try { vertex.deployToSlot(slot); fail("Scheduled from wrong state"); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { // as expected } @@ -109,8 +123,7 @@ public class ExecutionVertexDeploymentTest { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -123,7 +136,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( new ActorTaskManagerGateway( @@ -138,8 +151,7 @@ public class ExecutionVertexDeploymentTest { try { vertex.deployToSlot(slot); fail("Scheduled from wrong state"); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { // as expected } @@ -149,16 +161,14 @@ public class ExecutionVertexDeploymentTest { try { vertex.deployToSlot(slot); fail("Scheduled from wrong state"); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { // as expected } assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -171,7 +181,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( new ActorTaskManagerGateway( @@ -189,8 +199,7 @@ public class ExecutionVertexDeploymentTest { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -202,7 +211,7 @@ public class ExecutionVertexDeploymentTest { final JobVertexID jid = new JobVertexID(); final ExecutionJobVertex ejv = getExecutionVertex(jid); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( new ActorTaskManagerGateway( @@ -229,8 +238,7 @@ public class ExecutionVertexDeploymentTest { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -247,7 +255,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid, ec); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( new ActorTaskManagerGateway( @@ -270,8 +278,7 @@ public class ExecutionVertexDeploymentTest { assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -288,7 +295,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid, context); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId(); @@ -334,10 +341,37 @@ public class ExecutionVertexDeploymentTest { assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); assertTrue(queue.isEmpty()); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } + + /** + * Tests that the lazy scheduling flag is correctly forwarded to the produced partition descriptors. + */ + @Test + public void testTddProducedPartitionsLazyScheduling() throws Exception { + TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext(); + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), context); + IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED); + ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + + Slot root = mock(Slot.class); + when(root.getSlotNumber()).thenReturn(1); + SimpleSlot slot = mock(SimpleSlot.class); + when(slot.getRoot()).thenReturn(root); + + for (ScheduleMode mode : ScheduleMode.values()) { + vertex.getExecutionGraph().setScheduleMode(mode); + + TaskDeploymentDescriptor tdd = vertex.createDeploymentDescriptor(new ExecutionAttemptID(), slot, null, 1); + + Collection<ResultPartitionDeploymentDescriptor> producedPartitions = tdd.getProducedPartitions(); + + assertEquals(1, producedPartitions.size()); + ResultPartitionDeploymentDescriptor desc = producedPartitions.iterator().next(); + assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage()); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java new file mode 100644 index 0000000..f6fddfa --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.util.TestBufferFactory; +import org.apache.flink.runtime.taskmanager.TaskActions; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ResultPartitionTest { + + /** + * Tests the schedule or update consumers message sending behaviour depending on the relevant flags. + */ + @Test + public void testSendScheduleOrUpdateConsumersMessage() throws Exception { + { + // Pipelined, send message => notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, times(1)).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + + { + // Pipelined, don't send message => don't notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, false); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + + { + // Blocking, send message => don't notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, true); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + + { + // Blocking, don't send message => don't notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, false); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + } + + // ------------------------------------------------------------------------ + + private static ResultPartition createPartition( + ResultPartitionConsumableNotifier notifier, + ResultPartitionType type, + boolean sendScheduleOrUpdateConsumersMessage) { + return new ResultPartition( + "TestTask", + mock(TaskActions.class), + new JobID(), + new ResultPartitionID(), + type, + 1, + mock(ResultPartitionManager.class), + notifier, + mock(IOManager.class), + IOManager.IOMode.SYNC, + sendScheduleOrUpdateConsumersMessage); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 2d3797d..4ca1d1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -124,7 +124,8 @@ public class LocalInputChannelTest { partitionManager, partitionConsumableNotifier, ioManager, - ASYNC); + ASYNC, + true); // Create a buffer pool for this partition partition.registerBufferPool( http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java new file mode 100644 index 0000000..144ef12 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ScheduleModeTest { + + /** + * Test that schedule modes set the lazy deployment flag correctly. + */ + @Test + public void testAllowLazyDeployment() throws Exception { + assertTrue(ScheduleMode.LAZY_FROM_SOURCES.allowLazyDeployment()); + assertFalse(ScheduleMode.EAGER.allowLazyDeployment()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 15947f9..22f0c60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -18,7 +18,11 @@ package org.apache.flink.runtime.taskmanager; -import akka.actor.*; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Kill; +import akka.actor.Props; +import akka.actor.Status; import akka.japi.Creator; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.ExecutionConfig; @@ -31,7 +35,11 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; -import org.apache.flink.runtime.deployment.*; +import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionLocation; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; @@ -622,7 +630,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -767,7 +775,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -1419,7 +1427,8 @@ public class TaskManagerTest extends TestLogger { new IntermediateDataSetID(), new IntermediateResultPartitionID(), ResultPartitionType.PIPELINED, - 1); + 1, + true); final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, "TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 1eebe12..5d26050 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -553,6 +553,9 @@ public class TaskTest extends TestLogger { } expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING); + expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING); + expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING); + expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING); expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING); expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING); @@ -568,7 +571,7 @@ public class TaskTest extends TestLogger { assertEquals(expected.get(state), newTaskState); } - verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); + verify(inputGate, times(4)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } /**
