Repository: flink Updated Branches: refs/heads/release-1.1 9a19ca115 -> 0bd8e0279
[FLINK-5040] [jobmanager] Set correct input channel types with eager scheduling This closes #2784. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55c506f2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55c506f2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55c506f2 Branch: refs/heads/release-1.1 Commit: 55c506f2ee58f70d9220d507256146df2a434381 Parents: b5a4cb6 Author: Ufuk Celebi <[email protected]> Authored: Wed Nov 9 18:25:06 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Thu Nov 10 21:53:30 2016 +0100 ---------------------------------------------------------------------- .../InputChannelDeploymentDescriptor.java | 12 +- .../ResultPartitionDeploymentDescriptor.java | 24 ++- .../runtime/executiongraph/ExecutionVertex.java | 14 +- .../io/network/partition/ResultPartition.java | 10 +- .../flink/runtime/jobgraph/ScheduleMode.java | 10 +- .../apache/flink/runtime/taskmanager/Task.java | 10 +- .../flink/runtime/jobmanager/JobManager.scala | 2 +- .../InputChannelDeploymentDescriptorTest.java | 202 +++++++++++++++++++ ...ResultPartitionDeploymentDescriptorTest.java | 6 +- .../ExecutionVertexDeploymentTest.java | 60 +++++- .../network/partition/ResultPartitionTest.java | 90 +++++++++ .../consumer/LocalInputChannelTest.java | 5 +- .../runtime/jobgraph/ScheduleModeTest.java | 37 ++++ .../runtime/taskmanager/TaskManagerTest.java | 9 +- .../flink/runtime/taskmanager/TaskTest.java | 5 +- 15 files changed, 454 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index e00a480..6b87e69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -85,7 +85,7 @@ public class InputChannelDeploymentDescriptor implements Serializable { * Creates an input channel deployment descriptor for each partition. */ public static InputChannelDeploymentDescriptor[] fromEdges( - ExecutionEdge[] edges, SimpleSlot consumerSlot) { + ExecutionEdge[] edges, SimpleSlot consumerSlot, boolean allowLazyDeployment) { final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length]; @@ -101,8 +101,10 @@ public class InputChannelDeploymentDescriptor implements Serializable { // The producing task needs to be RUNNING or already FINISHED if (consumedPartition.isConsumable() && producerSlot != null && - (producerState == ExecutionState.RUNNING - || producerState == ExecutionState.FINISHED)) { + (producerState == ExecutionState.RUNNING || + producerState == ExecutionState.FINISHED || + producerState == ExecutionState.SCHEDULED || + producerState == ExecutionState.DEPLOYING)) { final Instance partitionInstance = producerSlot.getInstance(); @@ -119,9 +121,11 @@ public class InputChannelDeploymentDescriptor implements Serializable { partitionLocation = ResultPartitionLocation.createRemote(connectionId); } } - else { + else if (allowLazyDeployment) { // The producing task might not have registered the partition yet partitionLocation = ResultPartitionLocation.createUnknown(); + } else { + throw new IllegalStateException("Trying to eagerly schedule a task whose inputs are not ready."); } final ResultPartitionID consumedPartitionId = new ResultPartitionID( http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index ecdacbb..2ecde80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -47,12 +47,16 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { /** The number of subpartitions. */ private final int numberOfSubpartitions; + + /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ + private final boolean lazyScheduling; public ResultPartitionDeploymentDescriptor( - IntermediateDataSetID resultId, - IntermediateResultPartitionID partitionId, - ResultPartitionType partitionType, - int numberOfSubpartitions) { + IntermediateDataSetID resultId, + IntermediateResultPartitionID partitionId, + ResultPartitionType partitionType, + int numberOfSubpartitions, + boolean lazyScheduling) { this.resultId = checkNotNull(resultId); this.partitionId = checkNotNull(partitionId); @@ -60,6 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; + this.lazyScheduling = lazyScheduling; } public IntermediateDataSetID getResultId() { @@ -78,6 +83,10 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { return numberOfSubpartitions; } + public boolean allowLazyScheduling() { + return lazyScheduling; + } + @Override public String toString() { return String.format("ResultPartitionDeploymentDescriptor [result id: %s, " @@ -87,7 +96,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { // ------------------------------------------------------------------------ - public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition) { + public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition, boolean lazyScheduling) { final IntermediateDataSetID resultId = partition.getIntermediateResult().getId(); final IntermediateResultPartitionID partitionId = partition.getPartitionId(); @@ -102,14 +111,13 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) { if (partition.getConsumers().size() > 1) { - new IllegalStateException("Currently, only a single consumer group per partition is supported."); + throw new IllegalStateException("Currently, only a single consumer group per partition is supported."); } numberOfSubpartitions = partition.getConsumers().get(0).size(); } return new ResultPartitionDeploymentDescriptor( - resultId, partitionId, partitionType, numberOfSubpartitions - ); + resultId, partitionId, partitionType, numberOfSubpartitions, lazyScheduling); } } http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 309548d..c101548 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -638,18 +638,20 @@ public class ExecutionVertex implements Serializable { int attemptNumber) { // Produced intermediate results - List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size()); + List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<>(resultPartitions.size()); + + // Consumed intermediate results + List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<>(inputEdges.length); + + boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment(); for (IntermediateResultPartition partition : resultPartitions.values()) { - producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition)); + producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling)); } - // Consumed intermediate results - List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<InputGateDeploymentDescriptor>(); - for (ExecutionEdge[] edges : inputEdges) { InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor - .fromEdges(edges, targetSlot); + .fromEdges(edges, targetSlot, lazyScheduling); // If the produced partition has multiple consumers registered, we // need to request the one matching our sub task index. http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index a60f95d..c30f333 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -93,6 +93,8 @@ public class ResultPartition implements BufferPoolOwner { private final ResultPartitionConsumableNotifier partitionConsumableNotifier; + private final boolean sendScheduleOrUpdateConsumersMessage; + // - Runtime state -------------------------------------------------------- private final AtomicBoolean isReleased = new AtomicBoolean(); @@ -129,7 +131,8 @@ public class ResultPartition implements BufferPoolOwner { ResultPartitionManager partitionManager, ResultPartitionConsumableNotifier partitionConsumableNotifier, IOManager ioManager, - IOMode defaultIoMode) { + IOMode defaultIoMode, + boolean sendScheduleOrUpdateConsumersMessage) { this.owningTaskName = checkNotNull(owningTaskName); this.jobId = checkNotNull(jobId); @@ -138,6 +141,7 @@ public class ResultPartition implements BufferPoolOwner { this.subpartitions = new ResultSubpartition[numberOfSubpartitions]; this.partitionManager = checkNotNull(partitionManager); this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier); + this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage; // Create the subpartitions. switch (partitionType) { @@ -417,8 +421,8 @@ public class ResultPartition implements BufferPoolOwner { /** * Notifies pipelined consumers of this result partition once. */ - private void notifyPipelinedConsumers() throws IOException { - if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) { + private void notifyPipelinedConsumers() { + if (sendScheduleOrUpdateConsumersMessage && !hasNotifiedPipelinedConsumers && partitionType.isPipelined()) { partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId); hasNotifiedPipelinedConsumers = true; http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java index 330519d..78b7b45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java @@ -30,6 +30,14 @@ public enum ScheduleMode { /** * Schedule tasks all at once instead of lazy deployment of receiving tasks. */ - ALL + ALL; + + /** + * Returns whether we are allowed to deploy consumers lazily. + */ + public boolean allowLazyDeployment() { + return this != ALL; + } } + http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index dd14aaf..2179fc1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -336,7 +336,8 @@ public class Task implements Runnable { networkEnvironment.getPartitionManager(), networkEnvironment.getPartitionConsumableNotifier(), ioManager, - networkEnvironment.getDefaultIOMode()); + networkEnvironment.getDefaultIOMode(), + desc.allowLazyScheduling()); writers[counter] = new ResultPartitionWriter(producedPartitions[counter]); @@ -1088,7 +1089,11 @@ public class Task implements Runnable { final SingleInputGate inputGate = inputGatesById.get(resultId); if (inputGate != null) { - if (partitionState == ExecutionState.RUNNING) { + if (partitionState == ExecutionState.RUNNING || + partitionState == ExecutionState.FINISHED || + partitionState == ExecutionState.SCHEDULED || + partitionState == ExecutionState.DEPLOYING) { + // Retrigger the partition request inputGate.retriggerPartitionRequest(partitionId); } @@ -1245,7 +1250,6 @@ public class Task implements Runnable { try { if (watchDogThread != null) { watchDogThread.start(); - logger.info("Started cancellation watch dog"); } // the user-defined cancel method may throw errors. http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 106ffb6..41218c9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -855,7 +855,7 @@ class JobManager( if (execution != null) execution.getState else null case None => // Nothing to do. This is not an error, because the request is received when a sending - // task fails during a remote partition request. + // task fails or is not yet available during a remote partition request. log.debug(s"Cannot find execution graph for job $jobId.") null http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java new file mode 100644 index 0000000..cda0f4d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.junit.Test; + +import java.net.InetAddress; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class InputChannelDeploymentDescriptorTest { + + /** + * Tests the deployment descriptors for local, remote, and unknown partition + * locations (with lazy deployment allowed and all execution states for the + * producers). + */ + @Test + public void testMixedLocalRemoteUnknownDeployment() throws Exception { + boolean allowLazyDeployment = true; + + ExecutionVertex consumer = mock(ExecutionVertex.class); + SimpleSlot consumerSlot = mockSlot(createConnInfo(5000)); + + // Local and remote channel are only allowed for certain execution + // states. + for (ExecutionState state : ExecutionState.values()) { + // Local partition + ExecutionVertex localProducer = mockExecutionVertex(state, consumerSlot); + IntermediateResultPartition localPartition = mockPartition(localProducer); + ResultPartitionID localPartitionId = new ResultPartitionID(localPartition.getPartitionId(), localProducer.getCurrentExecutionAttempt().getAttemptId()); + ExecutionEdge localEdge = new ExecutionEdge(localPartition, consumer, 0); + + // Remote partition + InstanceConnectionInfo connInfo = createConnInfo(6000); + ExecutionVertex remoteProducer = mockExecutionVertex(state, mockSlot(connInfo)); // new slot + IntermediateResultPartition remotePartition = mockPartition(remoteProducer); + ResultPartitionID remotePartitionId = new ResultPartitionID(remotePartition.getPartitionId(), remoteProducer.getCurrentExecutionAttempt().getAttemptId()); + ConnectionID remoteConnectionId = new ConnectionID(connInfo, 0); + ExecutionEdge remoteEdge = new ExecutionEdge(remotePartition, consumer, 1); + + // Unknown partition + ExecutionVertex unknownProducer = mockExecutionVertex(state, null); // no assigned resource + IntermediateResultPartition unknownPartition = mockPartition(unknownProducer); + ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId()); + ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2); + + InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges( + new ExecutionEdge[]{localEdge, remoteEdge, unknownEdge}, + consumerSlot, + allowLazyDeployment); + + assertEquals(3, desc.length); + + // These states are allowed + if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHED || + state == ExecutionState.SCHEDULED || state == ExecutionState.DEPLOYING) { + + // Create local or remote channels + assertEquals(localPartitionId, desc[0].getConsumedPartitionId()); + assertTrue(desc[0].getConsumedPartitionLocation().isLocal()); + assertNull(desc[0].getConsumedPartitionLocation().getConnectionId()); + + assertEquals(remotePartitionId, desc[1].getConsumedPartitionId()); + assertTrue(desc[1].getConsumedPartitionLocation().isRemote()); + assertEquals(remoteConnectionId, desc[1].getConsumedPartitionLocation().getConnectionId()); + } else { + // Unknown (lazy deployment allowed) + assertEquals(localPartitionId, desc[0].getConsumedPartitionId()); + assertTrue(desc[0].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[0].getConsumedPartitionLocation().getConnectionId()); + + assertEquals(remotePartitionId, desc[1].getConsumedPartitionId()); + assertTrue(desc[1].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[1].getConsumedPartitionLocation().getConnectionId()); + } + + assertEquals(unknownPartitionId, desc[2].getConsumedPartitionId()); + assertTrue(desc[2].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[2].getConsumedPartitionLocation().getConnectionId()); + } + } + + @Test + public void testUnknownChannelWithoutLazyDeploymentThrows() throws Exception { + ExecutionVertex consumer = mock(ExecutionVertex.class); + SimpleSlot consumerSlot = mock(SimpleSlot.class); + + // Unknown partition + ExecutionVertex unknownProducer = mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource + IntermediateResultPartition unknownPartition = mockPartition(unknownProducer); + ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId()); + ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2); + + // This should work if lazy deployment is allowed + boolean allowLazyDeployment = true; + + InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges( + new ExecutionEdge[]{unknownEdge}, + consumerSlot, + allowLazyDeployment); + + assertEquals(1, desc.length); + + assertEquals(unknownPartitionId, desc[0].getConsumedPartitionId()); + assertTrue(desc[0].getConsumedPartitionLocation().isUnknown()); + assertNull(desc[0].getConsumedPartitionLocation().getConnectionId()); + + + try { + // Fail if lazy deployment is *not* allowed + allowLazyDeployment = false; + + InputChannelDeploymentDescriptor.fromEdges( + new ExecutionEdge[]{unknownEdge}, + consumerSlot, + allowLazyDeployment); + + fail("Did not throw expected IllegalStateException"); + } catch (IllegalStateException ignored) { + } + } + + // ------------------------------------------------------------------------ + + private static InstanceConnectionInfo createConnInfo(int port) { + return new InstanceConnectionInfo(InetAddress.getLoopbackAddress(), port); + } + + private static SimpleSlot mockSlot(InstanceConnectionInfo connInfo) { + SimpleSlot slot = mock(SimpleSlot.class); + Instance instance = mock(Instance.class); + when(slot.getInstance()).thenReturn(instance); + when(instance.getInstanceConnectionInfo()).thenReturn(connInfo); + + return slot; + } + + private static ExecutionVertex mockExecutionVertex(ExecutionState state, SimpleSlot slot) { + ExecutionVertex vertex = mock(ExecutionVertex.class); + + Execution exec = mock(Execution.class); + when(exec.getState()).thenReturn(state); + when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID()); + + when(exec.getAssignedResource()).thenReturn(slot); + when(vertex.getCurrentAssignedResource()).thenReturn(slot); + + when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); + + return vertex; + } + + private static IntermediateResultPartition mockPartition(ExecutionVertex producer) { + IntermediateResultPartition partition = mock(IntermediateResultPartition.class); + when(partition.isConsumable()).thenReturn(true); + + IntermediateResult result = mock(IntermediateResult.class); + when(result.getConnectionIndex()).thenReturn(0); + + when(partition.getIntermediateResult()).thenReturn(result); + when(partition.getPartitionId()).thenReturn(new IntermediateResultPartitionID()); + + when(partition.getProducer()).thenReturn(producer); + + return partition; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 1986eae..4223b49 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class ResultPartitionDeploymentDescriptorTest { @@ -44,8 +45,8 @@ public class ResultPartitionDeploymentDescriptorTest { resultId, partitionId, partitionType, - numberOfSubpartitions - ); + numberOfSubpartitions, + true); ResultPartitionDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); @@ -54,5 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest { assertEquals(partitionId, copy.getPartitionId()); assertEquals(partitionType, copy.getPartitionType()); assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions()); + assertTrue(copy.allowLazyScheduling()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 81ec6c9..1f5c915 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -18,19 +18,37 @@ package org.apache.flink.runtime.executiongraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; - -import static org.junit.Assert.*; - import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.testingUtils.TestingUtils; - import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ERROR_MESSAGE; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleFailingActorGateway; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ExecutionVertexDeploymentTest { @@ -331,4 +349,34 @@ public class ExecutionVertexDeploymentTest { fail(e.getMessage()); } } -} \ No newline at end of file + + /** + * Tests that the lazy scheduling flag is correctly forwarded to the produced partition descriptors. + */ + @Test + public void testTddProducedPartitionsLazyScheduling() throws Exception { + TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext(); + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), context); + IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED); + ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, new FiniteDuration(1, TimeUnit.MINUTES)); + + Slot root = mock(Slot.class); + when(root.getSlotNumber()).thenReturn(1); + SimpleSlot slot = mock(SimpleSlot.class); + when(slot.getRoot()).thenReturn(root); + + for (ScheduleMode mode : ScheduleMode.values()) { + vertex.getExecutionGraph().setScheduleMode(mode); + + TaskDeploymentDescriptor tdd = vertex.createDeploymentDescriptor(new ExecutionAttemptID(), slot, null, null, 1); + + List<ResultPartitionDeploymentDescriptor> producedPartitions = tdd.getProducedPartitions(); + + assertEquals(1, producedPartitions.size()); + ResultPartitionDeploymentDescriptor desc = producedPartitions.get(0); + assertEquals(mode.allowLazyDeployment(), desc.allowLazyScheduling()); + } + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java new file mode 100644 index 0000000..302b667 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.util.TestBufferFactory; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ResultPartitionTest { + + /** + * Tests the schedule or update consumers message sending behaviour depending on the relevant flags. + */ + @Test + public void testSendScheduleOrUpdateConsumersMessage() throws Exception { + { + // Pipelined, send message => notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, times(1)).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class)); + } + + { + // Pipelined, don't send message => don't notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, false); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class)); + } + + { + // Blocking, send message => don't notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, true); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class)); + } + + { + // Blocking, don't send message => don't notify + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, false); + partition.add(TestBufferFactory.createBuffer(), 0); + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class)); + } + } + + // ------------------------------------------------------------------------ + + private static ResultPartition createPartition( + ResultPartitionConsumableNotifier notifier, + ResultPartitionType type, + boolean sendScheduleOrUpdateConsumersMessage) { + return new ResultPartition( + "TestTask", + new JobID(), + new ResultPartitionID(), + type, + 1, + mock(ResultPartitionManager.class), + notifier, + mock(IOManager.class), + IOManager.IOMode.SYNC, + sendScheduleOrUpdateConsumersMessage); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 88a3ff5..ee28b5a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -119,11 +119,12 @@ public class LocalInputChannelTest { jobId, partitionIds[i], ResultPartitionType.PIPELINED, - parallelism, + parallelism, partitionManager, partitionConsumableNotifier, ioManager, - ASYNC); + ASYNC, + true); // Create a buffer pool for this partition partition.registerBufferPool( http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java new file mode 100644 index 0000000..aa5d12c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ScheduleModeTest { + + /** + * Test that schedule modes set the lazy deployment flag correctly. + */ + @Test + public void testAllowLazyDeployment() throws Exception { + assertTrue(ScheduleMode.FROM_SOURCES.allowLazyDeployment()); + assertTrue(ScheduleMode.BACKTRACKING.allowLazyDeployment()); + assertFalse(ScheduleMode.ALL.allowLazyDeployment()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 779a17d..431cbb8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -630,7 +630,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -775,7 +775,7 @@ public class TaskManagerTest extends TestLogger { IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>(); - irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1)); + irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true)); InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor( @@ -1430,9 +1430,8 @@ public class TaskManagerTest extends TestLogger { new IntermediateDataSetID(), new IntermediateResultPartitionID(), ResultPartitionType.PIPELINED, - 1 - // don't deploy eagerly but with the first completed memory buffer - ); + 1, + true); final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, "TestTask", 0, 1, 0, new Configuration(), new Configuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 403836c..b5056ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -542,6 +542,9 @@ public class TaskTest extends TestLogger { } expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING); + expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING); + expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING); + expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING); expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING); expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING); @@ -557,7 +560,7 @@ public class TaskTest extends TestLogger { assertEquals(expected.get(state), newTaskState); } - verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); + verify(inputGate, times(4)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } /**
