This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2c6521a0a62ba302cd821f1fecb12191e6e913e7 Author: Zhu Zhu <[email protected]> AuthorDate: Wed Dec 23 16:11:36 2020 +0800 [FLINK-20439][runtime] Rework scheduleOrUpdateConsumers in ExecutionGraph and its sub-components notifyPartitionDataAvailable() is introduced to replace scheduleOrUpdateConsumers() in the RPC code path. Execution#scheduleOrUpdateConsumers is renamed to #updatePartitionConsumers to be aligned with what id does. --- .../flink/runtime/executiongraph/Execution.java | 21 +-- .../runtime/executiongraph/ExecutionGraph.java | 23 +-- .../runtime/executiongraph/ExecutionVertex.java | 31 +--- .../flink/runtime/scheduler/SchedulerBase.java | 7 +- .../ExecutionGraphVariousFailuesTest.java | 8 +- .../utils/SimpleAckingTaskManagerGateway.java | 9 ++ .../scheduler/ScheduleOrUpdateConsumersTest.java | 174 -------------------- .../scheduler/UpdatePartitionConsumersTest.java | 178 +++++++++++++++++++++ 8 files changed, 211 insertions(+), 240 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 7792794..98d428c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -70,7 +70,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -671,17 +670,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution return releaseFuture; } - void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) { - assertRunningInJobMasterMainThread(); - - final HashSet<ExecutionVertex> consumerDeduplicator = new HashSet<>(); - scheduleOrUpdateConsumers(allConsumers, consumerDeduplicator); - } - - private void scheduleOrUpdateConsumers( - final List<List<ExecutionEdge>> allConsumers, - final HashSet<ExecutionVertex> consumerDeduplicator) { - + private void updatePartitionConsumers(final List<List<ExecutionEdge>> allConsumers) { if (allConsumers.size() == 0) { return; } @@ -888,7 +877,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution if (transitionState(current, FINISHED)) { try { - finishPartitionsAndScheduleOrUpdateConsumers(); + finishPartitionsAndUpdateConsumers(); updateAccumulatorsAndMetrics(userAccumulators, metrics); releaseAssignedResource(null); vertex.getExecutionGraph().deregisterExecution(this); @@ -919,20 +908,18 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } } - private void finishPartitionsAndScheduleOrUpdateConsumers() { + private void finishPartitionsAndUpdateConsumers() { final List<IntermediateResultPartition> newlyFinishedResults = getVertex().finishAllBlockingPartitions(); if (newlyFinishedResults.isEmpty()) { return; } - final HashSet<ExecutionVertex> consumerDeduplicator = new HashSet<>(); - for (IntermediateResultPartition finishedPartition : newlyFinishedResults) { final IntermediateResultPartition[] allPartitionsOfNewlyFinishedResults = finishedPartition.getIntermediateResult().getPartitions(); for (IntermediateResultPartition partition : allPartitionsOfNewlyFinishedResults) { - scheduleOrUpdateConsumers(partition.getConsumers(), consumerDeduplicator); + updatePartitionConsumers(partition.getConsumers()); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index e1daefc..4e3b54a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1320,27 +1320,20 @@ public class ExecutionGraph implements AccessExecutionGraph { } /** - * Schedule or updates consumers of the given result partition. + * Mark the data of a result partition to be available. Note that only PIPELINED partitions are accepted + * because it is for the case that a TM side PIPELINED result partition has data produced and notifies JM. * - * @param partitionId specifying the result partition whose consumer shall be scheduled or updated - * @throws ExecutionGraphException if the schedule or update consumers operation could not be executed + * @param partitionId specifying the result partition whose data have become available */ - public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) throws ExecutionGraphException { - + public void notifyPartitionDataAvailable(ResultPartitionID partitionId) { assertRunningInJobMasterMainThread(); final Execution execution = currentExecutions.get(partitionId.getProducerId()); - if (execution == null) { - throw new ExecutionGraphException("Cannot find execution for execution Id " + - partitionId.getPartitionId() + '.'); - } - else if (execution.getVertex() == null){ - throw new ExecutionGraphException("Execution with execution Id " + - partitionId.getPartitionId() + " has no vertex assigned."); - } else { - execution.getVertex().scheduleOrUpdateConsumers(partitionId); - } + checkState(execution != null, "Cannot find execution for execution Id " + + partitionId.getPartitionId() + "."); + + execution.getVertex().notifyPartitionDataAvailable(partitionId); } public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 55727f4..c70da77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -59,6 +59,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; /** * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of @@ -705,34 +707,15 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi currentExecution.markFailed(t); } - /** - * Schedules or updates the consumer tasks of the result partition with the given ID. - */ - void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { - - final Execution execution = currentExecution; - - // Abort this request if there was a concurrent reset - if (!partitionId.getProducerId().equals(execution.getAttemptId())) { - return; - } + void notifyPartitionDataAvailable(ResultPartitionID partitionId) { + checkArgument(partitionId.getProducerId().equals(currentExecution.getAttemptId())); final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId()); - - if (partition == null) { - throw new IllegalStateException("Unknown partition " + partitionId + "."); - } + checkState(partition != null, "Unknown partition " + partitionId + "."); + checkState(partition.getResultType().isPipelined(), "partition data available notification is " + + "only valid for pipelined partitions."); partition.markDataProduced(); - - if (partition.getIntermediateResult().getResultType().isPipelined()) { - // Schedule or update receivers of this partition - execution.scheduleOrUpdateConsumers(partition.getConsumers()); - } - else { - throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" + - "pipelined partitions."); - } } void cachePartitionInfo(PartitionInfo partitionInfo){ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 93da3f4..a5738cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -47,7 +47,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; -import org.apache.flink.runtime.executiongraph.ExecutionGraphException; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -689,11 +688,7 @@ public abstract class SchedulerBase implements SchedulerNG { public final void scheduleOrUpdateConsumers(final ResultPartitionID partitionId) { mainThreadExecutor.assertRunningInMainThread(); - try { - executionGraph.scheduleOrUpdateConsumers(partitionId); - } catch (ExecutionGraphException e) { - throw new RuntimeException(e); - } + executionGraph.notifyPartitionDataAvailable(partitionId); scheduleOrUpdateConsumersInternal(partitionId.getPartitionId()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java index b20c0ce..493415b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java @@ -29,7 +29,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; -import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -60,10 +60,10 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger { try { scheduler.scheduleOrUpdateConsumers(resultPartitionId); - fail("Expected ExecutionGraphException."); - } catch (RuntimeException e) { + fail("Error expected."); + } catch (IllegalStateException e) { // we've expected this exception to occur - assertThat(e.getCause(), instanceOf(ExecutionGraphException.class)); + assertThat(e.getMessage(), containsString("Cannot find execution for execution Id")); } assertEquals(JobStatus.RUNNING, eg.getState()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index 59aee81..d23c5ae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.TaskBackPressureResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.function.TriConsumer; import java.util.Collection; import java.util.Set; @@ -66,6 +67,9 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { checkpointOptions, advanceToEndOfEventTime) -> { }; + private TriConsumer<ExecutionAttemptID, Iterable<PartitionInfo>, Time> updatePartitionsConsumer = + (ignore1, ignore2, ignore3) -> { }; + public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) { this.submitConsumer = submitConsumer; } @@ -86,6 +90,10 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { this.checkpointConsumer = checkpointConsumer; } + public void setUpdatePartitionsConsumer(TriConsumer<ExecutionAttemptID, Iterable<PartitionInfo>, Time> updatePartitionsConsumer) { + this.updatePartitionsConsumer = updatePartitionsConsumer; + } + @Override public String getAddress() { return address; @@ -113,6 +121,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { @Override public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) { + updatePartitionsConsumer.accept(executionAttemptID, partitionInfos, timeout); return CompletableFuture.completedFuture(Acknowledge.get()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java deleted file mode 100644 index ece1c80..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.scheduler; - -import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testutils.MiniClusterResource; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.types.IntValue; -import org.apache.flink.util.TestLogger; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; - -import org.junit.ClassRule; -import org.junit.Test; - -import java.util.List; - -import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY; - -/** - * Tests for the lazy scheduling/updating of consumers depending on the - * producers result. - */ -public class ScheduleOrUpdateConsumersTest extends TestLogger { - - private static final int NUMBER_OF_TMS = 2; - private static final int NUMBER_OF_SLOTS_PER_TM = 2; - private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; - - @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResourceConfiguration.Builder() - .setConfiguration(getFlinkConfiguration()) - .setNumberTaskManagers(NUMBER_OF_TMS) - .setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM) - .build()); - - private static Configuration getFlinkConfiguration() { - final Configuration config = new Configuration(); - config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); - - return config; - } - - /** - * Tests notifications of multiple receivers when a task produces both a pipelined and blocking - * result. - * - * <pre> - * +----------+ - * +-- pipelined -> | Receiver | - * +--------+ | +----------+ - * | Sender |-| - * +--------+ | +----------+ - * +-- blocking --> | Receiver | - * +----------+ - * </pre> - * - * <p>The pipelined receiver gets deployed after the first buffer is available and the blocking - * one after all subtasks are finished. - */ - @Test - public void testMixedPipelinedAndBlockingResults() throws Exception { - final JobVertex sender = new JobVertex("Sender"); - sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class); - sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM); - sender.setParallelism(PARALLELISM); - - final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver"); - pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class); - pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM); - pipelinedReceiver.setParallelism(PARALLELISM); - - pipelinedReceiver.connectNewDataSetAsInput( - sender, - DistributionPattern.ALL_TO_ALL, - ResultPartitionType.PIPELINED); - - final JobVertex blockingReceiver = new JobVertex("Blocking Receiver"); - blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class); - blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM); - blockingReceiver.setParallelism(PARALLELISM); - - blockingReceiver.connectNewDataSetAsInput(sender, - DistributionPattern.ALL_TO_ALL, - ResultPartitionType.BLOCKING); - - SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); - - sender.setSlotSharingGroup(slotSharingGroup); - pipelinedReceiver.setSlotSharingGroup(slotSharingGroup); - blockingReceiver.setSlotSharingGroup(slotSharingGroup); - - final JobGraph jobGraph = new JobGraph( - "Mixed pipelined and blocking result", - sender, - pipelinedReceiver, - blockingReceiver); - - MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph); - } - - // --------------------------------------------------------------------------------------------- - - /** - * Invokable which writes a configurable number of events to a pipelined - * and blocking partition alternatingly. - */ - public static class BinaryRoundRobinSubtaskIndexSender extends AbstractInvokable { - - static final String CONFIG_KEY = "number-of-times-to-send"; - - public BinaryRoundRobinSubtaskIndexSender(Environment environment) { - super(environment); - } - - @Override - public void invoke() throws Exception { - List<RecordWriter<IntValue>> writers = Lists.newArrayListWithCapacity(2); - - // The order of intermediate result creation in the job graph specifies which produced - // result partition is pipelined/blocking. - final RecordWriter<IntValue> pipelinedWriter = new RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0)); - final RecordWriter<IntValue> blockingWriter = new RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(1)); - writers.add(pipelinedWriter); - writers.add(blockingWriter); - - final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0); - - final IntValue subtaskIndex = new IntValue( - getEnvironment().getTaskInfo().getIndexOfThisSubtask()); - - // Produce the first intermediate result and then the second in a serial fashion. - for (RecordWriter<IntValue> writer : writers) { - try { - for (int i = 0; i < numberOfTimesToSend; i++) { - writer.emit(subtaskIndex); - } - writer.flushAll(); - } - finally { - writer.close(); - } - } - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java new file mode 100644 index 0000000..13ed1c5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.scheduler.SchedulerBase; +import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; +import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.util.IterableUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertThat; + +/** + * Tests for the updating of consumers depending on the + * producers result. + */ +public class UpdatePartitionConsumersTest extends TestLogger { + + private static final long TIMEOUT = 5000L; + + private JobGraph jobGraph; + private JobVertex v1; + private JobVertex v2; + private JobVertex v3; + private JobVertex v4; + + @Before + public void setUp() { + buildJobGraphWithBlockingEdgeWithinRegion(); + } + + /** + * Build a graph which allows consumer vertex v4 to be deployed before its BLOCKING input v3 finishes. + * + * <pre> + * +----+ + * +-- pipelined -> | v2 | -- pipelined -+ + * +----+ | +----+ | +----+ + * | v1 |-| | -> | v4 | + * +----+ | +----+ | +----+ + * +-- pipelined -> | v3 | -- blocking --+ + * +----+ + * </pre> + */ + private void buildJobGraphWithBlockingEdgeWithinRegion() { + v1 = new JobVertex("v1"); + v1.setInvokableClass(AbstractInvokable.class); + v1.setParallelism(1); + + v2 = new JobVertex("v2"); + v2.setInvokableClass(AbstractInvokable.class); + v2.setParallelism(1); + + v3 = new JobVertex("v3"); + v3.setInvokableClass(AbstractInvokable.class); + v3.setParallelism(1); + + v4 = new JobVertex("v4"); + v4.setInvokableClass(AbstractInvokable.class); + v4.setParallelism(1); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + jobGraph = new JobGraph(v1, v2, v3, v4); + } + + /** + * Test BLOCKING partition information are properly updated to consumers when its producer finishes. + */ + @Test + public void testUpdatePartitionConsumers() throws Exception { + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final SchedulerBase scheduler = SchedulerTestingUtils + .newSchedulerBuilder(jobGraph) + .setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway)) + .build(); + scheduler.initialize(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + + final ExecutionVertex ev1 = scheduler.getExecutionVertex(new ExecutionVertexID(v1.getID(), 0)); + final ExecutionVertex ev2 = scheduler.getExecutionVertex(new ExecutionVertexID(v2.getID(), 0)); + final ExecutionVertex ev3 = scheduler.getExecutionVertex(new ExecutionVertexID(v3.getID(), 0)); + final ExecutionVertex ev4 = scheduler.getExecutionVertex(new ExecutionVertexID(v4.getID(), 0)); + + final CompletableFuture<TaskDeploymentDescriptor> ev4TddFuture = new CompletableFuture<>(); + taskManagerGateway.setSubmitConsumer(tdd -> { + if (tdd.getExecutionAttemptId().equals(ev4.getCurrentExecutionAttempt().getAttemptId())) { + ev4TddFuture.complete(tdd); + } + }); + + scheduler.startScheduling(); + + assertThat(ev1.getExecutionState(), is(ExecutionState.DEPLOYING)); + assertThat(ev2.getExecutionState(), is(ExecutionState.DEPLOYING)); + assertThat(ev3.getExecutionState(), is(ExecutionState.DEPLOYING)); + assertThat(ev4.getExecutionState(), is(ExecutionState.DEPLOYING)); + + updateState(scheduler, ev1, ExecutionState.RUNNING); + updateState(scheduler, ev2, ExecutionState.RUNNING); + updateState(scheduler, ev3, ExecutionState.RUNNING); + updateState(scheduler, ev4, ExecutionState.RUNNING); + + final InputGateDeploymentDescriptor ev4Igdd2 = ev4TddFuture.get(TIMEOUT, TimeUnit.MILLISECONDS).getInputGates().get(1); + assertThat(ev4Igdd2.getShuffleDescriptors()[0], instanceOf(UnknownShuffleDescriptor.class)); + + final CompletableFuture<Void> updatePartitionFuture = new CompletableFuture<>(); + taskManagerGateway.setUpdatePartitionsConsumer((attemptId, partitionInfos, time) -> { + assertThat(attemptId, equalTo(ev4.getCurrentExecutionAttempt().getAttemptId())); + final List<PartitionInfo> partitionInfoList = IterableUtils.toStream(partitionInfos).collect(Collectors.toList()); + assertThat(partitionInfoList, hasSize(1)); + final PartitionInfo partitionInfo = partitionInfoList.get(0); + assertThat(partitionInfo.getIntermediateDataSetID(), equalTo(v3.getProducedDataSets().get(0).getId())); + assertThat(partitionInfo.getShuffleDescriptor(), instanceOf(NettyShuffleDescriptor.class)); + updatePartitionFuture.complete(null); + }); + + updateState(scheduler, ev1, ExecutionState.FINISHED); + updateState(scheduler, ev3, ExecutionState.FINISHED); + + updatePartitionFuture.get(TIMEOUT, TimeUnit.MILLISECONDS); + } + + private void updateState(SchedulerBase scheduler, ExecutionVertex vertex, ExecutionState state) { + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + vertex.getCurrentExecutionAttempt().getAttemptId(), + state)); + } +}
