This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7404c95 [FLINK-20619][runtime] Remove unused InputDependencyConstraint
7404c95 is described below
commit 7404c951ee1e449be916e718a9f2c51070d4864d
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Jan 7 13:34:20 2021 +0100
[FLINK-20619][runtime] Remove unused InputDependencyConstraint
This work is part of FLINK-20589
This closes #14579
---
.../apache/flink/api/common/ExecutionConfig.java | 46 ++--
.../api/common/InputDependencyConstraint.java | 9 +-
.../optimizer/plantranslate/JobGraphGenerator.java | 8 +-
flink-python/pyflink/common/execution_config.py | 16 ++
.../pyflink/common/tests/test_execution_config.py | 17 +-
.../runtime/executiongraph/ExecutionJobVertex.java | 5 -
.../runtime/executiongraph/ExecutionVertex.java | 63 -----
.../apache/flink/runtime/jobgraph/JobVertex.java | 12 -
.../adapter/DefaultExecutionTopology.java | 5 +-
.../scheduler/adapter/DefaultExecutionVertex.java | 12 +-
.../strategy/InputDependencyConstraintChecker.java | 194 ---------------
.../strategy/SchedulingExecutionVertex.java | 8 -
.../runtime/scheduler/DefaultSchedulerTest.java | 48 ----
.../adapter/DefaultExecutionTopologyTest.java | 16 +-
.../adapter/DefaultExecutionVertexTest.java | 7 +-
.../DefaultSchedulingPipelinedRegionTest.java | 4 +-
.../InputDependencyConstraintCheckerTest.java | 274 ---------------------
.../strategy/TestingSchedulingExecutionVertex.java | 23 +-
.../strategy/TestingSchedulingTopology.java | 10 -
.../api/graph/StreamingJobGraphGenerator.java | 4 -
20 files changed, 50 insertions(+), 731 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 8005e85..c6f7f58 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -151,9 +151,6 @@ public class ExecutionConfig implements Serializable,
Archiveable<ArchivedExecut
* This flag defines if we use compression for the state snapshot data or
not. Default: false
*/
private boolean useSnapshotCompression = false;
- /** The default input dependency constraint to schedule tasks. */
- private InputDependencyConstraint defaultInputDependencyConstraint =
- InputDependencyConstraint.ANY;
// ------------------------------- User code values
--------------------------------------------
@@ -528,36 +525,29 @@ public class ExecutionConfig implements Serializable,
Archiveable<ArchivedExecut
}
/**
- * Sets the default input dependency constraint for vertex scheduling. It
indicates when a task
- * should be scheduled considering its inputs status.
+ * This method is deprecated. It was used to set the {@link
InputDependencyConstraint} utilized
+ * by the old scheduler implementations which got removed as part of
FLINK-20589. The current
+ * implementation has no effect.
*
- * <p>The default constraint is {@link InputDependencyConstraint#ANY}.
- *
- * @param inputDependencyConstraint The input dependency constraint.
+ * @param ignored Ignored parameter.
+ * @deprecated due to the deprecation of {@code InputDependencyConstraint}.
*/
@PublicEvolving
- public void setDefaultInputDependencyConstraint(
- InputDependencyConstraint inputDependencyConstraint) {
- if (inputDependencyConstraint != null) {
- this.defaultInputDependencyConstraint = inputDependencyConstraint;
- } else {
- // defaultInputDependencyConstraint is not allowed to be null
- // setting it to ANY to not break existing jobs
- this.defaultInputDependencyConstraint =
InputDependencyConstraint.ANY;
- }
- }
+ @Deprecated
+ public void setDefaultInputDependencyConstraint(InputDependencyConstraint
ignored) {}
/**
- * Gets the default input dependency constraint for vertex scheduling. It
indicates when a task
- * should be scheduled considering its inputs status.
- *
- * <p>The default constraint is {@link InputDependencyConstraint#ANY}.
+ * This method is deprecated. It was used to return the {@link
InputDependencyConstraint}
+ * utilized by the old scheduler implementations. These implementations
were removed as part of
+ * FLINK-20589.
*
- * @return The input dependency constraint of this job.
+ * @return The previous default constraint {@link
InputDependencyConstraint#ANY}.
+ * @deprecated due to the deprecation of {@code InputDependencyConstraint}.
*/
@PublicEvolving
+ @Deprecated
public InputDependencyConstraint getDefaultInputDependencyConstraint() {
- return defaultInputDependencyConstraint;
+ return InputDependencyConstraint.ANY;
}
/**
@@ -913,8 +903,7 @@ public class ExecutionConfig implements Serializable,
Archiveable<ArchivedExecut
&& registeredKryoTypes.equals(other.registeredKryoTypes)
&& registeredPojoTypes.equals(other.registeredPojoTypes)
&& taskCancellationIntervalMillis ==
other.taskCancellationIntervalMillis
- && useSnapshotCompression == other.useSnapshotCompression
- && defaultInputDependencyConstraint ==
other.defaultInputDependencyConstraint;
+ && useSnapshotCompression == other.useSnapshotCompression;
} else {
return false;
@@ -940,8 +929,7 @@ public class ExecutionConfig implements Serializable,
Archiveable<ArchivedExecut
registeredKryoTypes,
registeredPojoTypes,
taskCancellationIntervalMillis,
- useSnapshotCompression,
- defaultInputDependencyConstraint);
+ useSnapshotCompression);
}
@Override
@@ -985,8 +973,6 @@ public class ExecutionConfig implements Serializable,
Archiveable<ArchivedExecut
+ taskCancellationTimeoutMillis
+ ", useSnapshotCompression="
+ useSnapshotCompression
- + ", defaultInputDependencyConstraint="
- + defaultInputDependencyConstraint
+ ", globalJobParameters="
+ globalJobParameters
+ ", registeredTypesWithKryoSerializers="
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
index 5b57495..40022c4 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -20,8 +20,15 @@ package org.apache.flink.api.common;
import org.apache.flink.annotation.PublicEvolving;
-/** This constraint indicates when a task should be scheduled considering its
inputs status. */
+/**
+ * This constraint indicates when a task should be scheduled considering its
inputs status.
+ *
+ * @deprecated {@code InputDependencyConstraint} is not used anymore and will
be deleted in one of
+ * the future versions. It was previously used in the scheduler
implementations that were
+ * removed as part of FLINK-20589.
+ */
@PublicEvolving
+@Deprecated
public enum InputDependencyConstraint {
/** Schedule the task if any input is consumable. */
diff --git
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index f80847c..4da3354 100644
---
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -255,13 +255,7 @@ public class JobGraphGenerator implements
Visitor<PlanNode> {
}
// add vertices to the graph
- for (JobVertex vertex : this.vertices.values()) {
- vertex.setInputDependencyConstraint(
- program.getOriginalPlan()
- .getExecutionConfig()
- .getDefaultInputDependencyConstraint());
- graph.addVertex(vertex);
- }
+ this.vertices.values().forEach(graph::addVertex);
for (JobVertex vertex : this.auxVertices) {
graph.addVertex(vertex);
diff --git a/flink-python/pyflink/common/execution_config.py
b/flink-python/pyflink/common/execution_config.py
index 9a08105..4fcf7b9 100644
--- a/flink-python/pyflink/common/execution_config.py
+++ b/flink-python/pyflink/common/execution_config.py
@@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+import warnings
+
from typing import Dict, List
from pyflink.common.execution_mode import ExecutionMode
@@ -328,7 +330,14 @@ class ExecutionConfig(object):
:param input_dependency_constraint: The input dependency constraint.
The constraints could
be
:data:`InputDependencyConstraint.ANY` or
:data:`InputDependencyConstraint.ALL`.
+
+ .. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is
not used anymore in the
+ current scheduler implementations.
"""
+ warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not
used anywhere. "
+ "Therefore, the method call
set_default_input_dependency_constraint is "
+ "obsolete.", DeprecationWarning)
+
self._j_execution_config.setDefaultInputDependencyConstraint(
input_dependency_constraint._to_j_input_dependency_constraint())
return self
@@ -344,7 +353,14 @@ class ExecutionConfig(object):
:return: The input dependency constraint of this job. The possible
constraints are
:data:`InputDependencyConstraint.ANY` and
:data:`InputDependencyConstraint.ALL`.
+
+ .. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is
not used anymore in the
+ current scheduler implementations.
"""
+ warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not
used anywhere. "
+ "Therefore, the method call
get_default_input_dependency_constraint is "
+ "obsolete.", DeprecationWarning)
+
j_input_dependency_constraint = self._j_execution_config\
.getDefaultInputDependencyConstraint()
return InputDependencyConstraint._from_j_input_dependency_constraint(
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py
b/flink-python/pyflink/common/tests/test_execution_config.py
index a5aab74..8f3ba67 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -16,8 +16,7 @@
# limitations under the License.
################################################################################
from pyflink.dataset import ExecutionEnvironment
-from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode,
- InputDependencyConstraint)
+from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode)
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkTestCase
@@ -132,20 +131,6 @@ class ExecutionConfigTests(PyFlinkTestCase):
self.assertEqual(self.execution_config.get_execution_mode(),
ExecutionMode.PIPELINED_FORCED)
- def test_get_set_default_input_dependency_constraint(self):
-
- self.execution_config.set_default_input_dependency_constraint(
- InputDependencyConstraint.ALL)
-
-
self.assertEqual(self.execution_config.get_default_input_dependency_constraint(),
- InputDependencyConstraint.ALL)
-
- self.execution_config.set_default_input_dependency_constraint(
- InputDependencyConstraint.ANY)
-
-
self.assertEqual(self.execution_config.get_default_input_dependency_constraint(),
- InputDependencyConstraint.ANY)
-
def test_disable_enable_force_kryo(self):
self.execution_config.disable_force_kryo()
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 1db69e1..10dd636 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -365,10 +364,6 @@ public class ExecutionJobVertex
return inputs;
}
- public InputDependencyConstraint getInputDependencyConstraint() {
- return getJobVertex().getInputDependencyConstraint();
- }
-
public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
return operatorCoordinators;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0acfb10..fe984b5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.InputSplit;
@@ -346,10 +345,6 @@ public class ExecutionVertex
return resultPartitions;
}
- public InputDependencyConstraint getInputDependencyConstraint() {
- return getJobVertex().getInputDependencyConstraint();
- }
-
//
--------------------------------------------------------------------------------------------
// Graph building
//
--------------------------------------------------------------------------------------------
@@ -777,64 +772,6 @@ public class ExecutionVertex
}
}
- /**
- * Check whether the InputDependencyConstraint is satisfied for this
vertex.
- *
- * @return whether the input constraint is satisfied
- */
- boolean checkInputDependencyConstraints() {
- if (inputEdges.length == 0) {
- return true;
- }
-
- final InputDependencyConstraint inputDependencyConstraint =
getInputDependencyConstraint();
- switch (inputDependencyConstraint) {
- case ANY:
- return isAnyInputConsumable();
- case ALL:
- return areAllInputsConsumable();
- default:
- throw new IllegalStateException(
- "Unknown InputDependencyConstraint " +
inputDependencyConstraint);
- }
- }
-
- private boolean isAnyInputConsumable() {
- for (int inputNumber = 0; inputNumber < inputEdges.length;
inputNumber++) {
- if (isInputConsumable(inputNumber)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean areAllInputsConsumable() {
- for (int inputNumber = 0; inputNumber < inputEdges.length;
inputNumber++) {
- if (!isInputConsumable(inputNumber)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Get whether an input of the vertex is consumable. An input is
consumable when when any
- * partition in it is consumable.
- *
- * <p>Note that a BLOCKING result partition is only consumable when all
partitions in the result
- * are FINISHED.
- *
- * @return whether the input is consumable
- */
- boolean isInputConsumable(int inputNumber) {
- for (ExecutionEdge executionEdge : inputEdges[inputNumber]) {
- if (executionEdge.getSource().isConsumable()) {
- return true;
- }
- }
- return false;
- }
-
//
--------------------------------------------------------------------------------------------
// Notifications from the Execution Attempt
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index d482941..1100826 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.jobgraph;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitSource;
@@ -133,9 +132,6 @@ public class JobVertex implements java.io.Serializable {
*/
private String resultOptimizerProperties;
- /** The input dependency constraint to schedule this vertex. */
- private InputDependencyConstraint inputDependencyConstraint =
InputDependencyConstraint.ANY;
-
//
--------------------------------------------------------------------------------------------
/**
@@ -577,14 +573,6 @@ public class JobVertex implements java.io.Serializable {
this.resultOptimizerProperties = resultOptimizerProperties;
}
- public InputDependencyConstraint getInputDependencyConstraint() {
- return inputDependencyConstraint;
- }
-
- public void setInputDependencyConstraint(InputDependencyConstraint
inputDependencyConstraint) {
- this.inputDependencyConstraint = inputDependencyConstraint;
- }
-
//
--------------------------------------------------------------------------------------------
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
index be13c51..45a6de4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
@@ -194,10 +194,7 @@ public class DefaultExecutionTopology implements
SchedulingTopology {
DefaultExecutionVertex schedulingVertex =
new DefaultExecutionVertex(
- vertex.getID(),
- producedPartitions,
- vertex::getExecutionState,
- vertex.getInputDependencyConstraint());
+ vertex.getID(), producedPartitions,
vertex::getExecutionState);
producedPartitions.forEach(partition ->
partition.setProducer(schedulingVertex));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
index fa7ad8a..125c622 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.scheduler.adapter;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
@@ -40,18 +39,14 @@ class DefaultExecutionVertex implements
SchedulingExecutionVertex {
private final Supplier<ExecutionState> stateSupplier;
- private final InputDependencyConstraint inputDependencyConstraint;
-
DefaultExecutionVertex(
ExecutionVertexID executionVertexId,
List<DefaultResultPartition> producedPartitions,
- Supplier<ExecutionState> stateSupplier,
- InputDependencyConstraint constraint) {
+ Supplier<ExecutionState> stateSupplier) {
this.executionVertexId = checkNotNull(executionVertexId);
this.consumedResults = new ArrayList<>();
this.stateSupplier = checkNotNull(stateSupplier);
this.producedResults = checkNotNull(producedPartitions);
- this.inputDependencyConstraint = checkNotNull(constraint);
}
@Override
@@ -74,11 +69,6 @@ class DefaultExecutionVertex implements
SchedulingExecutionVertex {
return producedResults;
}
- @Override
- public InputDependencyConstraint getInputDependencyConstraint() {
- return inputDependencyConstraint;
- }
-
void addConsumedResult(DefaultResultPartition result) {
consumedResults.add(result);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
deleted file mode 100644
index 0b702d7..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.strategy;
-
-import org.apache.flink.api.common.InputDependencyConstraint;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-
-/** A wrapper class for {@link InputDependencyConstraint} checker. */
-public class InputDependencyConstraintChecker {
- private final SchedulingIntermediateDataSetManager
intermediateDataSetManager =
- new SchedulingIntermediateDataSetManager();
-
- public boolean check(final SchedulingExecutionVertex
schedulingExecutionVertex) {
- if (Iterables.isEmpty(schedulingExecutionVertex.getConsumedResults()))
{
- return true;
- }
-
- final InputDependencyConstraint inputConstraint =
- schedulingExecutionVertex.getInputDependencyConstraint();
- switch (inputConstraint) {
- case ANY:
- return checkAny(schedulingExecutionVertex);
- case ALL:
- return checkAll(schedulingExecutionVertex);
- default:
- throw new IllegalStateException(
- "Unknown InputDependencyConstraint " +
inputConstraint);
- }
- }
-
- List<SchedulingResultPartition> markSchedulingResultPartitionFinished(
- SchedulingResultPartition srp) {
- return
intermediateDataSetManager.markSchedulingResultPartitionFinished(srp);
- }
-
- void resetSchedulingResultPartition(SchedulingResultPartition srp) {
- intermediateDataSetManager.resetSchedulingResultPartition(srp);
- }
-
- void addSchedulingResultPartition(SchedulingResultPartition srp) {
- intermediateDataSetManager.addSchedulingResultPartition(srp);
- }
-
- private boolean checkAll(final SchedulingExecutionVertex
schedulingExecutionVertex) {
- for (SchedulingResultPartition consumedResultPartition :
- schedulingExecutionVertex.getConsumedResults()) {
- if (!partitionConsumable(consumedResultPartition)) {
- return false;
- }
- }
- return true;
- }
-
- private boolean checkAny(final SchedulingExecutionVertex
schedulingExecutionVertex) {
- for (SchedulingResultPartition consumedResultPartition :
- schedulingExecutionVertex.getConsumedResults()) {
- if (partitionConsumable(consumedResultPartition)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean partitionConsumable(SchedulingResultPartition partition) {
- if (BLOCKING.equals(partition.getResultType())) {
- return intermediateDataSetManager.allPartitionsFinished(partition);
- } else {
- final ResultPartitionState state = partition.getState();
- return ResultPartitionState.CONSUMABLE.equals(state);
- }
- }
-
- private static class SchedulingIntermediateDataSetManager {
-
- private final Map<IntermediateDataSetID, SchedulingIntermediateDataSet>
- intermediateDataSets = new HashMap<>();
-
- List<SchedulingResultPartition> markSchedulingResultPartitionFinished(
- SchedulingResultPartition srp) {
- SchedulingIntermediateDataSet intermediateDataSet =
- getSchedulingIntermediateDataSet(srp.getResultId());
- if (intermediateDataSet.markPartitionFinished(srp.getId())) {
- return intermediateDataSet.getSchedulingResultPartitions();
- }
- return Collections.emptyList();
- }
-
- void resetSchedulingResultPartition(SchedulingResultPartition srp) {
- SchedulingIntermediateDataSet sid =
getSchedulingIntermediateDataSet(srp.getResultId());
- sid.resetPartition(srp.getId());
- }
-
- void addSchedulingResultPartition(SchedulingResultPartition srp) {
- SchedulingIntermediateDataSet sid =
-
getOrCreateSchedulingIntermediateDataSetIfAbsent(srp.getResultId());
- sid.addSchedulingResultPartition(srp);
- }
-
- boolean allPartitionsFinished(SchedulingResultPartition srp) {
- SchedulingIntermediateDataSet sid =
getSchedulingIntermediateDataSet(srp.getResultId());
- return sid.allPartitionsFinished();
- }
-
- private SchedulingIntermediateDataSet getSchedulingIntermediateDataSet(
- final IntermediateDataSetID intermediateDataSetId) {
- return
getSchedulingIntermediateDataSetInternal(intermediateDataSetId, false);
- }
-
- private SchedulingIntermediateDataSet
getOrCreateSchedulingIntermediateDataSetIfAbsent(
- final IntermediateDataSetID intermediateDataSetId) {
- return
getSchedulingIntermediateDataSetInternal(intermediateDataSetId, true);
- }
-
- private SchedulingIntermediateDataSet
getSchedulingIntermediateDataSetInternal(
- final IntermediateDataSetID intermediateDataSetId, boolean
createIfAbsent) {
-
- return intermediateDataSets.computeIfAbsent(
- intermediateDataSetId,
- (key) -> {
- if (createIfAbsent) {
- return new SchedulingIntermediateDataSet();
- } else {
- throw new IllegalArgumentException(
- "can not find data set for " +
intermediateDataSetId);
- }
- });
- }
- }
-
- /** Representation of {@link IntermediateDataSet}. */
- private static class SchedulingIntermediateDataSet {
-
- private final List<SchedulingResultPartition> partitions;
-
- private final Set<IntermediateResultPartitionID> producingPartitionIds;
-
- SchedulingIntermediateDataSet() {
- partitions = new ArrayList<>();
- producingPartitionIds = new HashSet<>();
- }
-
- boolean markPartitionFinished(IntermediateResultPartitionID
partitionId) {
- producingPartitionIds.remove(partitionId);
- return producingPartitionIds.isEmpty();
- }
-
- void resetPartition(IntermediateResultPartitionID partitionId) {
- producingPartitionIds.add(partitionId);
- }
-
- boolean allPartitionsFinished() {
- return producingPartitionIds.isEmpty();
- }
-
- void addSchedulingResultPartition(SchedulingResultPartition partition)
{
- partitions.add(partition);
- producingPartitionIds.add(partition.getId());
- }
-
- List<SchedulingResultPartition> getSchedulingResultPartitions() {
- return Collections.unmodifiableList(partitions);
- }
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
index b2a2943..80204be 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.scheduler.strategy;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -38,11 +37,4 @@ public interface SchedulingExecutionVertex
* @return state of the execution vertex
*/
ExecutionState getState();
-
- /**
- * Get {@link InputDependencyConstraint}.
- *
- * @return input dependency constraint
- */
- InputDependencyConstraint getInputDependencyConstraint();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 3f9689c..4bb7b07 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.scheduler;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
@@ -28,7 +27,6 @@ import
org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -68,7 +66,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -91,7 +88,6 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
@@ -679,35 +675,6 @@ public class DefaultSchedulerTest extends TestLogger {
}
@Test
- public void testInputConstraintALLPerf() throws Exception {
- final int parallelism = 1000;
- final JobVertex v1 = createVertexWithAllInputConstraints("vertex1",
parallelism);
- final JobVertex v2 = createVertexWithAllInputConstraints("vertex2",
parallelism);
- final JobVertex v3 = createVertexWithAllInputConstraints("vertex3",
parallelism);
- v2.connectNewDataSetAsInput(
- v1, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
- v2.connectNewDataSetAsInput(
- v3, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
-
- final JobGraph jobGraph = new JobGraph(v1, v2, v3);
- final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
- final AccessExecutionJobVertex ejv1 =
- scheduler.requestJob().getAllVertices().get(v1.getID());
-
- for (int i = 0; i < parallelism - 1; i++) {
- finishSubtask(scheduler, ejv1, i);
- }
-
- final long startTime = System.nanoTime();
- finishSubtask(scheduler, ejv1, parallelism - 1);
-
- final Duration duration = Duration.ofNanos(System.nanoTime() -
startTime);
- final Duration timeout = Duration.ofSeconds(5);
-
- assertThat(duration, lessThan(timeout));
- }
-
- @Test
public void failJobWillIncrementVertexVersions() {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
@@ -931,12 +898,6 @@ public class DefaultSchedulerTest extends TestLogger {
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(),
hasSize(0));
}
- private static JobVertex createVertexWithAllInputConstraints(String name,
int parallelism) {
- final JobVertex v = createVertex(name, parallelism);
- v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
- return v;
- }
-
private static JobVertex createVertex(String name, int parallelism) {
final JobVertex v = new JobVertex(name);
v.setParallelism(parallelism);
@@ -944,15 +905,6 @@ public class DefaultSchedulerTest extends TestLogger {
return v;
}
- private static void finishSubtask(
- DefaultScheduler scheduler, AccessExecutionJobVertex vertex, int
subtask) {
- final ExecutionAttemptID attemptId =
-
vertex.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
- scheduler.updateTaskExecutionState(
- new TaskExecutionState(
- scheduler.getJobGraph().getJobID(), attemptId,
ExecutionState.FINISHED));
- }
-
private void waitForTermination(final DefaultScheduler scheduler) throws
Exception {
scheduler.getTerminationFuture().get(TIMEOUT_MS,
TimeUnit.MILLISECONDS);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
index 7109a1a..a7d6bbb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
@@ -47,8 +47,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static junit.framework.TestCase.assertTrue;
-import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
@@ -71,8 +69,6 @@ public class DefaultExecutionTopologyTest extends TestLogger {
jobVertices[0] = createNoOpVertex(parallelism);
jobVertices[1] = createNoOpVertex(parallelism);
jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL,
PIPELINED);
- jobVertices[0].setInputDependencyConstraint(ALL);
- jobVertices[1].setInputDependencyConstraint(ANY);
executionGraph = createSimpleTestGraph(jobVertices);
adapter = DefaultExecutionTopology.fromExecutionGraph(executionGraph);
}
@@ -182,7 +178,7 @@ public class DefaultExecutionTopologyTest extends
TestLogger {
ExecutionVertex originalVertex = originalVertices.next();
DefaultExecutionVertex adaptedVertex = adaptedVertices.next();
- assertVertexEquals(originalVertex, adaptedVertex);
+ assertEquals(originalVertex.getID(), adaptedVertex.getId());
List<IntermediateResultPartition> originalConsumedPartitions =
IntStream.range(0, originalVertex.getNumberOfInputs())
@@ -262,15 +258,7 @@ public class DefaultExecutionTopologyTest extends
TestLogger {
assertEquals(
originalPartition.getIntermediateResult().getId(),
adaptedPartition.getResultId());
assertEquals(originalPartition.getResultType(),
adaptedPartition.getResultType());
- assertVertexEquals(originalPartition.getProducer(),
adaptedPartition.getProducer());
- }
-
- private static void assertVertexEquals(
- ExecutionVertex originalVertex, DefaultExecutionVertex
adaptedVertex) {
-
- assertEquals(originalVertex.getID(), adaptedVertex.getId());
assertEquals(
- originalVertex.getInputDependencyConstraint(),
- adaptedVertex.getInputDependencyConstraint());
+ originalPartition.getProducer().getID(),
adaptedPartition.getProducer().getId());
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
index 0b41b6b..7cf970c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
@@ -34,7 +34,6 @@ import org.junit.Test;
import java.util.Collections;
import java.util.function.Supplier;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
import static org.junit.Assert.assertEquals;
@@ -64,15 +63,13 @@ public class DefaultExecutionVertexTest extends TestLogger {
new DefaultExecutionVertex(
new ExecutionVertexID(new JobVertexID(), 0),
Collections.singletonList(schedulingResultPartition),
- stateSupplier,
- ANY);
+ stateSupplier);
schedulingResultPartition.setProducer(producerVertex);
consumerVertex =
new DefaultExecutionVertex(
new ExecutionVertexID(new JobVertexID(), 0),
Collections.emptyList(),
- stateSupplier,
- ANY);
+ stateSupplier);
consumerVertex.addConsumedResult(schedulingResultPartition);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
index 70094ea..5cdb6bd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.scheduler.adapter;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
@@ -70,8 +69,7 @@ public class DefaultSchedulingPipelinedRegionTest extends
TestLogger {
new DefaultExecutionVertex(
new ExecutionVertexID(new JobVertexID(), 0),
Collections.emptyList(),
- () -> ExecutionState.CREATED,
- InputDependencyConstraint.ANY);
+ () -> ExecutionState.CREATED);
final Set<DefaultExecutionVertex> vertices =
Collections.singleton(vertex);
final DefaultSchedulingPipelinedRegion pipelinedRegion =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
deleted file mode 100644
index 6b15bbf..0000000
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.strategy;
-
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/** Unit tests for {@link InputDependencyConstraintChecker}. */
-public class InputDependencyConstraintCheckerTest extends TestLogger {
-
- @Test
- public void testCheckInputVertex() {
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder().build();
- final InputDependencyConstraintChecker inputChecker =
-
createInputDependencyConstraintChecker(Collections.emptyList());
-
- assertTrue(inputChecker.check(vertex));
- }
-
- @Test
- public void testCheckCreatedPipelinedInput() {
- final List<TestingSchedulingResultPartition> partitions =
- addResultPartition()
- .withPartitionType(PIPELINED)
- .withPartitionState(ResultPartitionState.CREATED)
- .finish();
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder()
- .withConsumedPartitions(partitions)
- .build();
-
- final InputDependencyConstraintChecker inputChecker =
- createInputDependencyConstraintChecker(partitions);
-
- assertFalse(inputChecker.check(vertex));
- }
-
- @Test
- public void testCheckConsumablePipelinedInput() {
- final List<TestingSchedulingResultPartition> partitions =
- addResultPartition()
- .withPartitionType(PIPELINED)
- .withPartitionState(ResultPartitionState.CONSUMABLE)
- .finish();
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder()
- .withConsumedPartitions(partitions)
- .build();
-
- final InputDependencyConstraintChecker inputChecker =
- createInputDependencyConstraintChecker(partitions);
-
- assertTrue(inputChecker.check(vertex));
- }
-
- @Test
- public void testCheckDoneBlockingInput() {
- final List<TestingSchedulingResultPartition> partitions =
- addResultPartition().withPartitionCntPerDataSet(2).finish();
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder()
- .withConsumedPartitions(partitions)
- .build();
-
- final InputDependencyConstraintChecker inputChecker =
- createInputDependencyConstraintChecker(partitions);
-
- for (TestingSchedulingResultPartition srp : partitions) {
- inputChecker.markSchedulingResultPartitionFinished(srp);
- }
-
- assertTrue(inputChecker.check(vertex));
- }
-
- @Test
- public void testCheckPartialDoneBlockingInput() {
- final List<TestingSchedulingResultPartition> partitions =
- addResultPartition().withPartitionCntPerDataSet(2).finish();
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder()
- .withConsumedPartitions(partitions)
- .build();
-
- final InputDependencyConstraintChecker inputChecker =
- createInputDependencyConstraintChecker(partitions);
-
- inputChecker.markSchedulingResultPartitionFinished(partitions.get(0));
-
- assertFalse(inputChecker.check(vertex));
- }
-
- @Test
- public void testCheckResetBlockingInput() {
- final List<TestingSchedulingResultPartition> partitions =
- addResultPartition().withPartitionCntPerDataSet(2).finish();
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder()
- .withConsumedPartitions(partitions)
- .build();
-
- final InputDependencyConstraintChecker inputChecker =
- createInputDependencyConstraintChecker(partitions);
-
- for (TestingSchedulingResultPartition srp : partitions) {
- inputChecker.markSchedulingResultPartitionFinished(srp);
- }
-
- for (TestingSchedulingResultPartition srp : partitions) {
- inputChecker.resetSchedulingResultPartition(srp);
- }
-
- assertFalse(inputChecker.check(vertex));
- }
-
- @Test
- public void testCheckAnyBlockingInput() {
- final List<TestingSchedulingResultPartition> partitions =
- addResultPartition().withDataSetCnt(2).finish();
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder()
- .withConsumedPartitions(partitions)
- .build();
-
- final InputDependencyConstraintChecker inputChecker =
- createInputDependencyConstraintChecker(partitions);
-
- inputChecker.markSchedulingResultPartitionFinished(partitions.get(0));
-
- assertTrue(inputChecker.check(vertex));
- }
-
- @Test
- public void testCheckAllBlockingInput() {
- final List<TestingSchedulingResultPartition> partitions =
- addResultPartition().withDataSetCnt(2).finish();
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder()
- .withInputDependencyConstraint(ALL)
- .withConsumedPartitions(partitions)
- .build();
-
- final InputDependencyConstraintChecker inputChecker =
- createInputDependencyConstraintChecker(partitions);
-
- for (TestingSchedulingResultPartition srp : partitions) {
- inputChecker.markSchedulingResultPartitionFinished(srp);
- }
-
- assertTrue(inputChecker.check(vertex));
- }
-
- @Test
- public void testCheckAllPartialDatasetBlockingInput() {
- final List<TestingSchedulingResultPartition> partitions =
- addResultPartition().withDataSetCnt(2).finish();
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder()
- .withInputDependencyConstraint(ALL)
- .withConsumedPartitions(partitions)
- .build();
-
- final InputDependencyConstraintChecker inputChecker =
- createInputDependencyConstraintChecker(partitions);
-
- inputChecker.markSchedulingResultPartitionFinished(partitions.get(0));
- assertFalse(inputChecker.check(vertex));
- }
-
- @Test
- public void testCheckAllPartialPartitionBlockingInput() {
- final List<TestingSchedulingResultPartition> partitions =
-
addResultPartition().withDataSetCnt(2).withPartitionCntPerDataSet(2).finish();
- final TestingSchedulingExecutionVertex vertex =
- TestingSchedulingExecutionVertex.newBuilder()
- .withInputDependencyConstraint(ALL)
- .withConsumedPartitions(partitions)
- .build();
-
- final InputDependencyConstraintChecker inputChecker =
- createInputDependencyConstraintChecker(partitions);
-
- for (int idx = 0; idx < 3; idx++) {
-
inputChecker.markSchedulingResultPartitionFinished(partitions.get(idx));
- }
-
- assertFalse(inputChecker.check(vertex));
- }
-
- private static TestingSchedulingResultPartitionBuilder
addResultPartition() {
- return new TestingSchedulingResultPartitionBuilder();
- }
-
- private static InputDependencyConstraintChecker
createInputDependencyConstraintChecker(
- List<TestingSchedulingResultPartition> partitions) {
-
- InputDependencyConstraintChecker inputChecker = new
InputDependencyConstraintChecker();
- for (SchedulingResultPartition partition : partitions) {
- inputChecker.addSchedulingResultPartition(partition);
- }
- return inputChecker;
- }
-
- private static class TestingSchedulingResultPartitionBuilder {
- private int dataSetCnt = 1;
- private int partitionCntPerDataSet = 1;
- private ResultPartitionType partitionType = BLOCKING;
- private ResultPartitionState partitionState =
ResultPartitionState.CONSUMABLE;
-
- TestingSchedulingResultPartitionBuilder withDataSetCnt(int dataSetCnt)
{
- this.dataSetCnt = dataSetCnt;
- return this;
- }
-
- TestingSchedulingResultPartitionBuilder withPartitionCntPerDataSet(int
partitionCnt) {
- this.partitionCntPerDataSet = partitionCnt;
- return this;
- }
-
- TestingSchedulingResultPartitionBuilder
withPartitionType(ResultPartitionType type) {
- this.partitionType = type;
- return this;
- }
-
- TestingSchedulingResultPartitionBuilder
withPartitionState(ResultPartitionState state) {
- this.partitionState = state;
- return this;
- }
-
- List<TestingSchedulingResultPartition> finish() {
- List<TestingSchedulingResultPartition> partitions =
- new ArrayList<>(dataSetCnt * partitionCntPerDataSet);
- for (int dataSetIdx = 0; dataSetIdx < dataSetCnt; dataSetIdx++) {
- IntermediateDataSetID dataSetId = new IntermediateDataSetID();
- for (int partitionIdx = 0; partitionIdx <
partitionCntPerDataSet; partitionIdx++) {
- partitions.add(
- new TestingSchedulingResultPartition(
- dataSetId, partitionType, partitionState));
- }
- }
-
- return partitions;
- }
- }
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
index 166a904..593e363 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.scheduler.strategy;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -26,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A simple scheduling execution vertex for testing purposes. */
@@ -38,19 +36,15 @@ public class TestingSchedulingExecutionVertex implements
SchedulingExecutionVert
private final Collection<TestingSchedulingResultPartition>
producedPartitions;
- private final InputDependencyConstraint inputDependencyConstraint;
-
private ExecutionState executionState;
public TestingSchedulingExecutionVertex(
JobVertexID jobVertexId,
int subtaskIndex,
- InputDependencyConstraint constraint,
Collection<TestingSchedulingResultPartition> consumedPartitions,
ExecutionState executionState) {
this.executionVertexId = new ExecutionVertexID(jobVertexId,
subtaskIndex);
- this.inputDependencyConstraint = constraint;
this.consumedPartitions = checkNotNull(consumedPartitions);
this.producedPartitions = new ArrayList<>();
this.executionState = executionState;
@@ -80,11 +74,6 @@ public class TestingSchedulingExecutionVertex implements
SchedulingExecutionVert
return producedPartitions;
}
- @Override
- public InputDependencyConstraint getInputDependencyConstraint() {
- return inputDependencyConstraint;
- }
-
void addConsumedPartition(TestingSchedulingResultPartition partition) {
consumedPartitions.add(partition);
}
@@ -106,7 +95,6 @@ public class TestingSchedulingExecutionVertex implements
SchedulingExecutionVert
public static class Builder {
private JobVertexID jobVertexId = new JobVertexID();
private int subtaskIndex = 0;
- private InputDependencyConstraint inputDependencyConstraint = ANY;
private List<TestingSchedulingResultPartition> partitions = new
ArrayList<>();
private ExecutionState executionState = ExecutionState.CREATED;
@@ -116,11 +104,6 @@ public class TestingSchedulingExecutionVertex implements
SchedulingExecutionVert
return this;
}
- Builder withInputDependencyConstraint(InputDependencyConstraint
constraint) {
- this.inputDependencyConstraint = constraint;
- return this;
- }
-
public Builder
withConsumedPartitions(List<TestingSchedulingResultPartition> partitions) {
this.partitions = partitions;
return this;
@@ -133,11 +116,7 @@ public class TestingSchedulingExecutionVertex implements
SchedulingExecutionVert
public TestingSchedulingExecutionVertex build() {
return new TestingSchedulingExecutionVertex(
- jobVertexId,
- subtaskIndex,
- inputDependencyConstraint,
- partitions,
- executionState);
+ jobVertexId, subtaskIndex, partitions, executionState);
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index e68fc20..806cfda 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.scheduler.strategy;
-import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.execution.ExecutionState;
import
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -352,19 +351,11 @@ public class TestingSchedulingTopology implements
SchedulingTopology {
private int parallelism = 1;
- private InputDependencyConstraint inputDependencyConstraint =
InputDependencyConstraint.ANY;
-
public SchedulingExecutionVerticesBuilder withParallelism(final int
parallelism) {
this.parallelism = parallelism;
return this;
}
- public SchedulingExecutionVerticesBuilder
withInputDependencyConstraint(
- final InputDependencyConstraint inputDependencyConstraint) {
- this.inputDependencyConstraint = inputDependencyConstraint;
- return this;
- }
-
public List<TestingSchedulingExecutionVertex> finish() {
final List<TestingSchedulingExecutionVertex> vertices = new
ArrayList<>();
for (int subtaskIndex = 0; subtaskIndex < parallelism;
subtaskIndex++) {
@@ -380,7 +371,6 @@ public class TestingSchedulingTopology implements
SchedulingTopology {
final int subtaskIndex) {
return TestingSchedulingExecutionVertex.newBuilder()
.withExecutionVertexID(jobVertexId, subtaskIndex)
- .withInputDependencyConstraint(inputDependencyConstraint)
.build();
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d0da83b..26720ea 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -597,10 +597,6 @@ public class StreamingJobGraphGenerator {
LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
}
- // TODO: inherit InputDependencyConstraint from the head operator
- jobVertex.setInputDependencyConstraint(
-
streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());
-
jobVertices.put(streamNodeId, jobVertex);
builtVertices.add(streamNodeId);
jobGraph.addVertex(jobVertex);