[
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711340#comment-16711340
]
ASF GitHub Bot commented on FLINK-10945:
----------------------------------------
zhuzhurk closed pull request #7250: [FLINK-10945] Add an
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7250
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3b3132b3d11..6def1dfa488 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -159,6 +159,9 @@
/** Determines if a task fails or not if there is an error in writing
its checkpoint data. Default: true */
private boolean failTaskOnCheckpointError = true;
+ /** The input dependency constraint to schedule tasks. */
+ private InputDependencyConstraint inputDependencyConstraint =
InputDependencyConstraint.ANY;
+
// ------------------------------- User code values
--------------------------------------------
private GlobalJobParameters globalJobParameters;
@@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() {
return executionMode;
}
+ /**
+ * Sets the input dependency constraint for vertex scheduling. It
indicates when a task
+ * should be scheduled considering its inputs status.
+ *
+ * The default constraint is {@link InputDependencyConstraint#ANY}.
+ *
+ * @param inputDependencyConstraint The input dependency constraint.
+ */
+ public void setInputDependencyConstraint(InputDependencyConstraint
inputDependencyConstraint) {
+ this.inputDependencyConstraint = inputDependencyConstraint;
+ }
+
+ /**
+ * Gets the input dependency constraint for vertex scheduling. It
indicates when a task
+ * should be scheduled considering its inputs status.
+ *
+ * The default constraint is {@link InputDependencyConstraint#ANY}.
+ *
+ * @return The input dependency constraint of this job.
+ */
+ public InputDependencyConstraint getInputDependencyConstraint() {
+ return inputDependencyConstraint;
+ }
+
/**
* Force TypeExtractor to use Kryo serializer for POJOS even though we
could analyze as POJO.
* In some cases this might be preferable. For example, when using
interfaces
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
new file mode 100644
index 00000000000..c7a1e6a3519
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * This constraint indicates when a task should be scheduled considering its
inputs status.
+ */
+public enum InputDependencyConstraint {
+
+ /**
+ * Schedule the task if any input is consumable.
+ */
+ ANY,
+
+ /**
+ * Schedule the task if all the inputs are consumable.
+ */
+ ALL
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 7c2b30db32a..15bdb36f1d3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -101,7 +101,8 @@ public String toString() {
final ResultPartitionLocation partitionLocation;
// The producing task needs to be RUNNING or already
FINISHED
- if (consumedPartition.isConsumable() && producerSlot !=
null &&
+ if ((consumedPartition.getResultType().isPipelined() ||
consumedPartition.isConsumable()) &&
+ producerSlot != null &&
(producerState ==
ExecutionState.RUNNING ||
producerState ==
ExecutionState.FINISHED ||
producerState ==
ExecutionState.SCHEDULED ||
@@ -136,7 +137,8 @@ else if (producerState == ExecutionState.CANCELING
}
else {
String msg = String.format("Trying to eagerly
schedule a task whose inputs " +
- "are not ready (partition consumable?
%s, producer state: %s, producer slot: %s).",
+ "are not ready (result type: %s,
partition consumable: %s, producer state: %s, producer slot: %s).",
+
consumedPartition.getResultType(),
consumedPartition.isConsumable(),
producerState,
producerSlot);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e3b501e52e8..59f76502ed6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -747,31 +747,34 @@ else if (numConsumers == 0) {
consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(
partition, partitionExecution));
- // When deploying a consuming task, its task
deployment descriptor will contain all
- // deployment information available at the
respective time. It is possible that some
- // of the partitions to be consumed have not
been created yet. These are updated
- // runtime via the update messages.
- //
- // TODO The current approach may send many
update messages even though the consuming
- // task has already been deployed with all
necessary information. We have to check
- // whether this is a problem and fix it, if it
is.
- CompletableFuture.supplyAsync(
- () -> {
- try {
- final ExecutionGraph
executionGraph = consumerVertex.getExecutionGraph();
-
consumerVertex.scheduleForExecution(
-
executionGraph.getSlotProvider(),
-
executionGraph.isQueuedSchedulingAllowed(),
-
LocationPreferenceConstraint.ANY, // there must be at least one known location
-
Collections.emptySet());
- } catch (Throwable t) {
- consumerVertex.fail(new
IllegalStateException("Could not schedule consumer " +
+ // Schedule the consumer vertex if its inputs
constraint is satisfied, otherwise postpone the scheduling
+ if
(consumerVertex.checkInputDependencyConstraints()) {
+ // When deploying a consuming task, its
task deployment descriptor will contain all
+ // deployment information available at
the respective time. It is possible that some
+ // of the partitions to be consumed
have not been created yet. These are updated
+ // runtime via the update messages.
+ //
+ // TODO The current approach may send
many update messages even though the consuming
+ // task has already been deployed with
all necessary information. We have to check
+ // whether this is a problem and fix
it, if it is.
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ final
ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
+
consumerVertex.scheduleForExecution(
+
executionGraph.getSlotProvider(),
+
executionGraph.isQueuedSchedulingAllowed(),
+
LocationPreferenceConstraint.ANY, // there must be at least one known location
+
Collections.emptySet());
+ } catch (Throwable t) {
+
consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
"vertex
" + consumerVertex, t));
- }
+ }
- return null;
- },
- executor);
+ return null;
+ },
+ executor);
+ }
// double check to resolve race conditions
if (consumerVertex.getExecutionState() ==
RUNNING) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 56315e07146..e89409f032c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -255,6 +256,9 @@
* from results than need to be materialized. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
+ /** The input dependency constraint to schedule tasks. */
+ private InputDependencyConstraint inputDependencyConstraint =
InputDependencyConstraint.ANY;
+
// ------ Execution status and progress. These values are volatile, and
accessed under the lock -------
private final AtomicInteger verticesFinished;
@@ -456,6 +460,14 @@ public ScheduleMode getScheduleMode() {
return scheduleMode;
}
+ public void setInputDependencyConstraint(InputDependencyConstraint
inputDependencyConstraint) {
+ this.inputDependencyConstraint = inputDependencyConstraint;
+ }
+
+ public InputDependencyConstraint getInputDependencyConstraint() {
+ return inputDependencyConstraint;
+ }
+
public Time getAllocationTimeout() {
return allocationTimeout;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f1a861d2ca1..c0d877dcb28 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -179,6 +179,13 @@ public static ExecutionGraph buildGraph(
executionGraph.setScheduleMode(jobGraph.getScheduleMode());
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
+ try {
+ executionGraph.setInputDependencyConstraint(
+
jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getInputDependencyConstraint());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new JobException("Fail to deserialize execution
config.", e);
+ }
+
try {
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a0747296c53..22e02e2b13d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
@@ -686,6 +687,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID
partitionId) {
if
(partition.getIntermediateResult().getResultType().isPipelined()) {
// Schedule or update receivers of this partition
+ partition.markDataProduced();
execution.scheduleOrUpdateConsumers(partition.getConsumers());
}
else {
@@ -726,6 +728,56 @@ void sendPartitionInfos() {
}
}
+ /**
+ * Check whether the InputDependencyConstraint is satisfied for this
vertex.
+ *
+ * @return whether the input constraint is satisfied
+ */
+ public boolean checkInputDependencyConstraints() {
+ if (getExecutionGraph().getInputDependencyConstraint() ==
InputDependencyConstraint.ANY) {
+ // InputDependencyConstraint == ANY
+ for (int i = 0; i < inputEdges.length; i++) {
+ if (isInputConsumable(i)) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ // InputDependencyConstraint == ALL
+ for (int i = 0; i < inputEdges.length; i++) {
+ if (!isInputConsumable(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * An input is consumable when
+ * 1. the source result is PIPELINED and one of the result partition
has produced data.
+ * 2. the source result is BLOCKING and is FINISHED(all partitions are
FINISHED).
+ *
+ * @return whether the input is consumable
+ */
+ public boolean isInputConsumable(int inputNumber) {
+ IntermediateResult result =
jobVertex.getInputs().get(inputNumber);
+
+ if (result.getResultType().isPipelined()) {
+ // For PIPELINED result, the input is consumable if any
result partition has produced records or is finished
+ for (ExecutionEdge edge : inputEdges[inputNumber]) {
+ if (edge.getSource().hasDataProduced()) {
+ return true;
+ }
+ }
+ } else {
+ // For BLOCKING result, the input is consumable if all
the partitions in the result are finished
+ return result.areAllPartitionsFinished();
+ }
+
+ return false;
+ }
+
//
--------------------------------------------------------------------------------------------
// Notifications from the Execution Attempt
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 313272cf86b..6e1d9ba69fb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -156,19 +156,17 @@ public int getConnectionIndex() {
void resetForNewExecution() {
this.numberOfRunningProducers.set(numParallelProducers);
+ for (IntermediateResultPartition partition : partitions) {
+ partition.resetForNewExecution();
+ }
}
int decrementNumberOfRunningProducersAndGetRemaining() {
return numberOfRunningProducers.decrementAndGet();
}
- boolean isConsumable() {
- if (resultType.isPipelined()) {
- return true;
- }
- else {
- return numberOfRunningProducers.get() == 0;
- }
+ boolean areAllPartitionsFinished() {
+ return numberOfRunningProducers.get() == 0;
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 124ceb2b6bc..56fb137eb59 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -36,6 +36,11 @@
private List<List<ExecutionEdge>> consumers;
+ /**
+ * Whether this partition has data produced. For pipelined result only.
+ */
+ private boolean dataProduced = false;
+
public IntermediateResultPartition(IntermediateResult totalResult,
ExecutionVertex producer, int partitionNumber) {
this.totalResult = totalResult;
this.producer = producer;
@@ -60,7 +65,7 @@ public IntermediateResultPartitionID getPartitionId() {
return partitionId;
}
- ResultPartitionType getResultType() {
+ public ResultPartitionType getResultType() {
return totalResult.getResultType();
}
@@ -68,8 +73,24 @@ ResultPartitionType getResultType() {
return consumers;
}
+ public void markDataProduced() {
+ dataProduced = true;
+ }
+
+ public boolean hasDataProduced() {
+ return dataProduced;
+ }
+
public boolean isConsumable() {
- return totalResult.isConsumable();
+ if (getResultType().isPipelined()) {
+ return dataProduced;
+ } else {
+ return totalResult.areAllPartitionsFinished();
+ }
+ }
+
+ void resetForNewExecution() {
+ dataProduced = false;
}
int addConsumerGroup() {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
index 6aa36b70ab8..4f7417f27fd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -29,6 +29,7 @@
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -192,6 +193,7 @@ private static ExecutionVertex
mockExecutionVertex(ExecutionState state, Resourc
private static IntermediateResultPartition
mockPartition(ExecutionVertex producer) {
IntermediateResultPartition partition =
mock(IntermediateResultPartition.class);
+
when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED);
when(partition.isConsumable()).thenReturn(true);
IntermediateResult result = mock(IntermediateResult.class);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
new file mode 100644
index 00000000000..8d131a0cb30
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState;
+import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate;
+import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState;
+import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the inputs constraint for {@link ExecutionVertex}.
+ */
+public class ExecutionVertexInputConstraintTest extends TestLogger {
+
+ @Test
+ public void testInputConsumable() throws Exception {
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+ v3.setParallelism(2);
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+ v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+ List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+ ExecutionGraph eg = createExecutionGraph(ordered);
+
+ ExecutionVertex ev11 =
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+ ExecutionVertex ev12 =
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+ ExecutionVertex ev21 =
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+ ExecutionVertex ev22 =
eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+ ExecutionVertex ev31 =
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+ ExecutionVertex ev32 =
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+ eg.scheduleForExecution();
+
+ // Inputs not consumable on init
+ assertFalse(ev31.isInputConsumable(0));
+ assertFalse(ev31.isInputConsumable(1));
+
+ // One pipelined input consumable on data produced
+ IntermediateResultPartition partition11 =
ev11.getProducedPartitions().values().iterator().next();
+ ev11.scheduleOrUpdateConsumers(new
ResultPartitionID(partition11.getPartitionId(),
+ ev11.getCurrentExecutionAttempt().getAttemptId()));
+ assertTrue(ev31.isInputConsumable(0));
+
+ // The blocking input not consumable if only one partition is
FINISHED
+ ev21.getCurrentExecutionAttempt().markFinished();
+ assertFalse(ev31.isInputConsumable(1));
+
+ // The blocking input consumable if all partitions are FINISHED
+ ev22.getCurrentExecutionAttempt().markFinished();
+ assertTrue(ev31.isInputConsumable(1));
+
+ // Inputs not consumable after failover
+ ev11.fail(new Exception());
+ waitUntilJobRestarted(eg);
+ assertFalse(ev31.isInputConsumable(0));
+ assertFalse(ev31.isInputConsumable(1));
+ }
+
+ @Test
+ public void testInputConstraintANY() throws Exception {
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+ v3.setParallelism(2);
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+ v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+ List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+ ExecutionGraph eg = createExecutionGraph(ordered);
+ eg.setInputDependencyConstraint(InputDependencyConstraint.ANY);
+
+ ExecutionVertex ev11 =
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+ ExecutionVertex ev12 =
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+ ExecutionVertex ev21 =
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+ ExecutionVertex ev22 =
eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+ ExecutionVertex ev31 =
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+ ExecutionVertex ev32 =
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+ eg.scheduleForExecution();
+
+ // Inputs constraint not satisfied on init
+ assertFalse(ev31.checkInputDependencyConstraints());
+
+ // Input1 consumable satisfies the constraint
+ IntermediateResultPartition partition11 =
ev11.getProducedPartitions().values().iterator().next();
+ ev11.scheduleOrUpdateConsumers(new
ResultPartitionID(partition11.getPartitionId(),
+ ev11.getCurrentExecutionAttempt().getAttemptId()));
+ assertTrue(ev31.checkInputDependencyConstraints());
+
+ // Inputs constraint not satisfied after failover
+ ev11.fail(new Exception());
+ waitUntilJobRestarted(eg);
+ assertFalse(ev31.checkInputDependencyConstraints());
+
+ // Input2 consumable satisfies the constraint
+ waitUntilExecutionVertexState(ev21, ExecutionState.DEPLOYING,
2000L);
+ waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING,
2000L);
+ ev21.getCurrentExecutionAttempt().markFinished();
+ ev22.getCurrentExecutionAttempt().markFinished();
+ assertTrue(ev31.isInputConsumable(1));
+ }
+
+ @Test
+ public void testInputConstraintALL() throws Exception {
+ JobVertex v1 = new JobVertex("vertex1");
+ JobVertex v2 = new JobVertex("vertex2");
+ JobVertex v3 = new JobVertex("vertex3");
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+ v3.setParallelism(2);
+ v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
+ v3.setInvokableClass(AbstractInvokable.class);
+ v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+ List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+ ExecutionGraph eg = createExecutionGraph(ordered);
+ eg.setInputDependencyConstraint(InputDependencyConstraint.ALL);
+
+ ExecutionVertex ev11 =
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+ ExecutionVertex ev12 =
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+ ExecutionVertex ev21 =
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+ ExecutionVertex ev22 =
eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+ ExecutionVertex ev31 =
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+ ExecutionVertex ev32 =
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+ eg.scheduleForExecution();
+
+ // Inputs constraint not satisfied on init
+ assertFalse(ev31.checkInputDependencyConstraints());
+
+ // Input1 consumable does not satisfy the constraint
+ IntermediateResultPartition partition11 =
ev11.getProducedPartitions().values().iterator().next();
+ ev11.scheduleOrUpdateConsumers(new
ResultPartitionID(partition11.getPartitionId(),
+ ev11.getCurrentExecutionAttempt().getAttemptId()));
+ assertFalse(ev31.checkInputDependencyConstraints());
+
+ // Input2 consumable satisfies the constraint
+ ev21.getCurrentExecutionAttempt().markFinished();
+ ev22.getCurrentExecutionAttempt().markFinished();
+ assertTrue(ev31.isInputConsumable(1));
+
+ // Inputs constraint not satisfied after failover
+ ev11.fail(new Exception());
+ waitUntilJobRestarted(eg);
+ assertFalse(ev31.checkInputDependencyConstraints());
+ }
+
+ private static ExecutionGraph createExecutionGraph(List<JobVertex>
ordered) throws Exception {
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+
+ final SlotProvider slotProvider = new SimpleSlotProvider(jobId,
20);
+
+ ExecutionGraph eg = new ExecutionGraph(
+ new DummyJobInformation(
+ jobId,
+ jobName),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ AkkaUtils.getDefaultTimeout(),
+ new FixedDelayRestartStrategy(1, 0),
+ new RestartAllStrategy.Factory(),
+ slotProvider);
+
+ eg.attachJobGraph(ordered);
+
+ return eg;
+ }
+
+ private void waitUntilJobRestarted(ExecutionGraph eg) throws Exception {
+ waitForAllExecutionsPredicate(eg,
+ isInExecutionState(ExecutionState.CANCELING)
+ .or(isInExecutionState(ExecutionState.CANCELED))
+ .or(isInExecutionState(ExecutionState.FAILED))
+
.or(isInExecutionState(ExecutionState.FINISHED)),
+ 2000L);
+
+ for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+ if (ev.getCurrentExecutionAttempt().getState() ==
ExecutionState.CANCELING) {
+
ev.getCurrentExecutionAttempt().cancelingComplete();
+ }
+ }
+ waitUntilJobStatus(eg, JobStatus.RUNNING, 2000L);
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
new file mode 100644
index 00000000000..2cd00617d90
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link IntermediateResultPartition}.
+ */
+public class IntermediateResultPartitionTest extends TestLogger {
+
+ @Test
+ public void testPipelinedPartitionConsumable() throws Exception {
+ ExecutionJobVertex jobVertex = getExecutionVertex(new
JobVertexID(), new DirectScheduledExecutorService());
+ IntermediateResult result =
+ new IntermediateResult(new IntermediateDataSetID(),
jobVertex, 2, ResultPartitionType.PIPELINED);
+ ExecutionVertex vertex1 =
+ new ExecutionVertex(jobVertex, 0, new
IntermediateResult[]{result}, Time.minutes(1));
+ ExecutionVertex vertex2 =
+ new ExecutionVertex(jobVertex, 1, new
IntermediateResult[]{result}, Time.minutes(1));
+
+ IntermediateResultPartition partition1 =
result.getPartitions()[0];
+ IntermediateResultPartition partition2 =
result.getPartitions()[1];
+
+ // Not consumable on init
+ assertFalse(partition1.isConsumable());
+ assertFalse(partition2.isConsumable());
+
+ // Partition 1 consumable after data are produced
+ partition1.markDataProduced();
+ assertTrue(partition1.isConsumable());
+ assertFalse(partition2.isConsumable());
+
+ // Not consumable if failover happens
+ result.resetForNewExecution();
+ assertFalse(partition1.isConsumable());
+ assertFalse(partition2.isConsumable());
+ }
+
+ @Test
+ public void testBlockingPartitionConsumable() throws Exception {
+ ExecutionJobVertex jobVertex = getExecutionVertex(new
JobVertexID(), new DirectScheduledExecutorService());
+ IntermediateResult result =
+ new IntermediateResult(new IntermediateDataSetID(),
jobVertex, 2, ResultPartitionType.BLOCKING);
+ ExecutionVertex vertex1 =
+ new ExecutionVertex(jobVertex, 0, new
IntermediateResult[]{result}, Time.minutes(1));
+ ExecutionVertex vertex2 =
+ new ExecutionVertex(jobVertex, 1, new
IntermediateResult[]{result}, Time.minutes(1));
+
+ IntermediateResultPartition partition1 =
result.getPartitions()[0];
+ IntermediateResultPartition partition2 =
result.getPartitions()[1];
+
+ // Not consumable on init
+ assertFalse(partition1.isConsumable());
+ assertFalse(partition2.isConsumable());
+
+ // Not consumable if only one partition is FINISHED
+ partition1.markFinished();
+ assertFalse(partition1.isConsumable());
+ assertFalse(partition2.isConsumable());
+
+ // Consumable after all partitions are FINISHED
+ partition2.markFinished();
+ assertTrue(partition1.isConsumable());
+ assertTrue(partition2.isConsumable());
+
+ // Not consumable if failover happens
+ result.resetForNewExecution();
+ assertFalse(partition1.isConsumable());
+ assertFalse(partition2.isConsumable());
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Avoid resource deadlocks for finite stream jobs when resources are limited
> --------------------------------------------------------------------------
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination
> Affects Versions: 1.7.1
> Reporter: Zhu Zhu
> Assignee: Zhu Zhu
> Priority: Major
> Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch
> jobs) when resources are limited. In 2 cases as below:
> # Task Y is a pipelined downstream task of task X. When X takes all
> resources(slots), Y cannot acquire slots to start, thus the back pressure
> will block X to finish
> # Task Y is a upstream task of task X. When X takes all resources(slots) and
> Y cannot start, X cannot finish as some of its inputs are not finished
>
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline
> back pressure. However, case 2 cannot be avoided as X(downstream task) will
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be
> launched when Z finishes, though task Y is not launched yet. This pre-launch
> behaviour can be beneficial when there are plenty of resources, thus X can
> process data from Z earlier before Y finishes its data processing. However,
> resource deadlocks may happen when the resources are limited, e.g. in small
> sessions.
>
> I’d propose introducing a constraint named as *InputDependencyConstraint* to
> control the scheduling of vertices. It has 2 values:
> # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
> # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>
> The design doc is here.
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)