[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711599#comment-16711599 ]
ASF GitHub Bot commented on FLINK-10945: ---------------------------------------- zhuzhurk closed pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7252 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3b3132b3d11..6def1dfa488 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -159,6 +159,9 @@ /** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */ private boolean failTaskOnCheckpointError = true; + /** The input dependency constraint to schedule tasks. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // ------------------------------- User code values -------------------------------------------- private GlobalJobParameters globalJobParameters; @@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() { return executionMode; } + /** + * Sets the input dependency constraint for vertex scheduling. It indicates when a task + * should be scheduled considering its inputs status. + * + * The default constraint is {@link InputDependencyConstraint#ANY}. + * + * @param inputDependencyConstraint The input dependency constraint. + */ + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + + /** + * Gets the input dependency constraint for vertex scheduling. It indicates when a task + * should be scheduled considering its inputs status. + * + * The default constraint is {@link InputDependencyConstraint#ANY}. + * + * @return The input dependency constraint of this job. + */ + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + /** * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. * In some cases this might be preferable. For example, when using interfaces diff --git a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java new file mode 100644 index 00000000000..c7a1e6a3519 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +/** + * This constraint indicates when a task should be scheduled considering its inputs status. + */ +public enum InputDependencyConstraint { + + /** + * Schedule the task if any input is consumable. + */ + ANY, + + /** + * Schedule the task if all the inputs are consumable. + */ + ALL +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index 7c2b30db32a..15bdb36f1d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -101,7 +101,8 @@ public String toString() { final ResultPartitionLocation partitionLocation; // The producing task needs to be RUNNING or already FINISHED - if (consumedPartition.isConsumable() && producerSlot != null && + if ((consumedPartition.getResultType().isPipelined() || consumedPartition.isConsumable()) && + producerSlot != null && (producerState == ExecutionState.RUNNING || producerState == ExecutionState.FINISHED || producerState == ExecutionState.SCHEDULED || @@ -136,7 +137,8 @@ else if (producerState == ExecutionState.CANCELING } else { String msg = String.format("Trying to eagerly schedule a task whose inputs " + - "are not ready (partition consumable? %s, producer state: %s, producer slot: %s).", + "are not ready (result type: %s, partition consumable: %s, producer state: %s, producer slot: %s).", + consumedPartition.getResultType(), consumedPartition.isConsumable(), producerState, producerSlot); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index e3b501e52e8..59f76502ed6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -747,31 +747,34 @@ else if (numConsumers == 0) { consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge( partition, partitionExecution)); - // When deploying a consuming task, its task deployment descriptor will contain all - // deployment information available at the respective time. It is possible that some - // of the partitions to be consumed have not been created yet. These are updated - // runtime via the update messages. - // - // TODO The current approach may send many update messages even though the consuming - // task has already been deployed with all necessary information. We have to check - // whether this is a problem and fix it, if it is. - CompletableFuture.supplyAsync( - () -> { - try { - final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); - consumerVertex.scheduleForExecution( - executionGraph.getSlotProvider(), - executionGraph.isQueuedSchedulingAllowed(), - LocationPreferenceConstraint.ANY, // there must be at least one known location - Collections.emptySet()); - } catch (Throwable t) { - consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + + // Schedule the consumer vertex if its inputs constraint is satisfied, otherwise postpone the scheduling + if (consumerVertex.checkInputDependencyConstraints()) { + // When deploying a consuming task, its task deployment descriptor will contain all + // deployment information available at the respective time. It is possible that some + // of the partitions to be consumed have not been created yet. These are updated + // runtime via the update messages. + // + // TODO The current approach may send many update messages even though the consuming + // task has already been deployed with all necessary information. We have to check + // whether this is a problem and fix it, if it is. + CompletableFuture.supplyAsync( + () -> { + try { + final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); + consumerVertex.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY, // there must be at least one known location + Collections.emptySet()); + } catch (Throwable t) { + consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + "vertex " + consumerVertex, t)); - } + } - return null; - }, - executor); + return null; + }, + executor); + } // double check to resolve race conditions if (consumerVertex.getExecutionState() == RUNNING) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 56315e07146..e89409f032c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -255,6 +256,9 @@ * from results than need to be materialized. */ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; + /** The input dependency constraint to schedule tasks. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- private final AtomicInteger verticesFinished; @@ -456,6 +460,14 @@ public ScheduleMode getScheduleMode() { return scheduleMode; } + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + public Time getAllocationTimeout() { return allocationTimeout; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index f1a861d2ca1..c0d877dcb28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -179,6 +179,13 @@ public static ExecutionGraph buildGraph( executionGraph.setScheduleMode(jobGraph.getScheduleMode()); executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()); + try { + executionGraph.setInputDependencyConstraint( + jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getInputDependencyConstraint()); + } catch (IOException | ClassNotFoundException e) { + throw new JobException("Fail to deserialize execution config.", e); + } + try { executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a0747296c53..22e02e2b13d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.JobManagerOptions; @@ -686,6 +687,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition + partition.markDataProduced(); execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { @@ -726,6 +728,56 @@ void sendPartitionInfos() { } } + /** + * Check whether the InputDependencyConstraint is satisfied for this vertex. + * + * @return whether the input constraint is satisfied + */ + public boolean checkInputDependencyConstraints() { + if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) { + // InputDependencyConstraint == ANY + for (int i = 0; i < inputEdges.length; i++) { + if (isInputConsumable(i)) { + return true; + } + } + return false; + } else { + // InputDependencyConstraint == ALL + for (int i = 0; i < inputEdges.length; i++) { + if (!isInputConsumable(i)) { + return false; + } + } + return true; + } + } + + /** + * An input is consumable when + * 1. the source result is PIPELINED and one of the result partition has produced data. + * 2. the source result is BLOCKING and is FINISHED(all partitions are FINISHED). + * + * @return whether the input is consumable + */ + public boolean isInputConsumable(int inputNumber) { + IntermediateResult result = jobVertex.getInputs().get(inputNumber); + + if (result.getResultType().isPipelined()) { + // For PIPELINED result, the input is consumable if any result partition has produced records or is finished + for (ExecutionEdge edge : inputEdges[inputNumber]) { + if (edge.getSource().hasDataProduced()) { + return true; + } + } + } else { + // For BLOCKING result, the input is consumable if all the partitions in the result are finished + return result.areAllPartitionsFinished(); + } + + return false; + } + // -------------------------------------------------------------------------------------------- // Notifications from the Execution Attempt // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 313272cf86b..6e1d9ba69fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -156,19 +156,17 @@ public int getConnectionIndex() { void resetForNewExecution() { this.numberOfRunningProducers.set(numParallelProducers); + for (IntermediateResultPartition partition : partitions) { + partition.resetForNewExecution(); + } } int decrementNumberOfRunningProducersAndGetRemaining() { return numberOfRunningProducers.decrementAndGet(); } - boolean isConsumable() { - if (resultType.isPipelined()) { - return true; - } - else { - return numberOfRunningProducers.get() == 0; - } + boolean areAllPartitionsFinished() { + return numberOfRunningProducers.get() == 0; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 124ceb2b6bc..56fb137eb59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -36,6 +36,11 @@ private List<List<ExecutionEdge>> consumers; + /** + * Whether this partition has data produced. For pipelined result only. + */ + private boolean dataProduced = false; + public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) { this.totalResult = totalResult; this.producer = producer; @@ -60,7 +65,7 @@ public IntermediateResultPartitionID getPartitionId() { return partitionId; } - ResultPartitionType getResultType() { + public ResultPartitionType getResultType() { return totalResult.getResultType(); } @@ -68,8 +73,24 @@ ResultPartitionType getResultType() { return consumers; } + public void markDataProduced() { + dataProduced = true; + } + + public boolean hasDataProduced() { + return dataProduced; + } + public boolean isConsumable() { - return totalResult.isConsumable(); + if (getResultType().isPipelined()) { + return dataProduced; + } else { + return totalResult.areAllPartitionsFinished(); + } + } + + void resetForNewExecution() { + dataProduced = false; } int addConsumerGroup() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java index 6aa36b70ab8..4f7417f27fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -192,6 +193,7 @@ private static ExecutionVertex mockExecutionVertex(ExecutionState state, Resourc private static IntermediateResultPartition mockPartition(ExecutionVertex producer) { IntermediateResultPartition partition = mock(IntermediateResultPartition.class); + when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED); when(partition.isConsumable()).thenReturn(true); IntermediateResult result = mock(IntermediateResult.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java new file mode 100644 index 00000000000..8d131a0cb30 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the inputs constraint for {@link ExecutionVertex}. + */ +public class ExecutionVertexInputConstraintTest extends TestLogger { + + @Test + public void testInputConsumable() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List<JobVertex> ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs not consumable on init + assertFalse(ev31.isInputConsumable(0)); + assertFalse(ev31.isInputConsumable(1)); + + // One pipelined input consumable on data produced + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertTrue(ev31.isInputConsumable(0)); + + // The blocking input not consumable if only one partition is FINISHED + ev21.getCurrentExecutionAttempt().markFinished(); + assertFalse(ev31.isInputConsumable(1)); + + // The blocking input consumable if all partitions are FINISHED + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + + // Inputs not consumable after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.isInputConsumable(0)); + assertFalse(ev31.isInputConsumable(1)); + } + + @Test + public void testInputConstraintANY() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List<JobVertex> ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + eg.setInputDependencyConstraint(InputDependencyConstraint.ANY); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs constraint not satisfied on init + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input1 consumable satisfies the constraint + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertTrue(ev31.checkInputDependencyConstraints()); + + // Inputs constraint not satisfied after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input2 consumable satisfies the constraint + waitUntilExecutionVertexState(ev21, ExecutionState.DEPLOYING, 2000L); + waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING, 2000L); + ev21.getCurrentExecutionAttempt().markFinished(); + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + } + + @Test + public void testInputConstraintALL() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List<JobVertex> ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + eg.setInputDependencyConstraint(InputDependencyConstraint.ALL); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs constraint not satisfied on init + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input1 consumable does not satisfy the constraint + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input2 consumable satisfies the constraint + ev21.getCurrentExecutionAttempt().markFinished(); + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + + // Inputs constraint not satisfied after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.checkInputDependencyConstraints()); + } + + private static ExecutionGraph createExecutionGraph(List<JobVertex> ordered) throws Exception { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + + final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20); + + ExecutionGraph eg = new ExecutionGraph( + new DummyJobInformation( + jobId, + jobName), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + AkkaUtils.getDefaultTimeout(), + new FixedDelayRestartStrategy(1, 0), + new RestartAllStrategy.Factory(), + slotProvider); + + eg.attachJobGraph(ordered); + + return eg; + } + + private void waitUntilJobRestarted(ExecutionGraph eg) throws Exception { + waitForAllExecutionsPredicate(eg, + isInExecutionState(ExecutionState.CANCELING) + .or(isInExecutionState(ExecutionState.CANCELED)) + .or(isInExecutionState(ExecutionState.FAILED)) + .or(isInExecutionState(ExecutionState.FINISHED)), + 2000L); + + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + if (ev.getCurrentExecutionAttempt().getState() == ExecutionState.CANCELING) { + ev.getCurrentExecutionAttempt().cancelingComplete(); + } + } + waitUntilJobStatus(eg, JobStatus.RUNNING, 2000L); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java new file mode 100644 index 00000000000..2cd00617d90 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link IntermediateResultPartition}. + */ +public class IntermediateResultPartitionTest extends TestLogger { + + @Test + public void testPipelinedPartitionConsumable() throws Exception { + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); + IntermediateResult result = + new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.PIPELINED); + ExecutionVertex vertex1 = + new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + ExecutionVertex vertex2 = + new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1)); + + IntermediateResultPartition partition1 = result.getPartitions()[0]; + IntermediateResultPartition partition2 = result.getPartitions()[1]; + + // Not consumable on init + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Partition 1 consumable after data are produced + partition1.markDataProduced(); + assertTrue(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Not consumable if failover happens + result.resetForNewExecution(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + } + + @Test + public void testBlockingPartitionConsumable() throws Exception { + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); + IntermediateResult result = + new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.BLOCKING); + ExecutionVertex vertex1 = + new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + ExecutionVertex vertex2 = + new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1)); + + IntermediateResultPartition partition1 = result.getPartitions()[0]; + IntermediateResultPartition partition2 = result.getPartitions()[1]; + + // Not consumable on init + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Not consumable if only one partition is FINISHED + partition1.markFinished(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Consumable after all partitions are FINISHED + partition2.markFinished(); + assertTrue(partition1.isConsumable()); + assertTrue(partition2.isConsumable()); + + // Not consumable if failover happens + result.resetForNewExecution(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid resource deadlocks for finite stream jobs when resources are limited > -------------------------------------------------------------------------- > > Key: FLINK-10945 > URL: https://issues.apache.org/jira/browse/FLINK-10945 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination > Affects Versions: 1.7.1 > Reporter: Zhu Zhu > Assignee: Zhu Zhu > Priority: Major > Labels: pull-request-available > > Currently *resource deadlocks* can happen to finite stream jobs(or batch > jobs) when resources are limited. In 2 cases as below: > # Task Y is a pipelined downstream task of task X. When X takes all > resources(slots), Y cannot acquire slots to start, thus the back pressure > will block X to finish > # Task Y is a upstream task of task X. When X takes all resources(slots) and > Y cannot start, X cannot finish as some of its inputs are not finished > > We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline > back pressure. However, case 2 cannot be avoided as X(downstream task) will > be launched when any of its input result is ready. > To be detailed, say task X has BLOCKING upstream task Y and Z, X can be > launched when Z finishes, though task Y is not launched yet. This pre-launch > behaviour can be beneficial when there are plenty of resources, thus X can > process data from Z earlier before Y finishes its data processing. However, > resource deadlocks may happen when the resources are limited, e.g. in small > sessions. > > I’d propose introducing a constraint named as *InputDependencyConstraint* to > control the scheduling of vertices. It has 2 values: > # *ANY*. The vertex can be scheduled when any of its inputs is consumable. > # *ALL*. The vertex can be scheduled when all of its inputs are consumable. > > The design doc is here. > [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)