This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7404c95  [FLINK-20619][runtime] Remove unused InputDependencyConstraint
7404c95 is described below

commit 7404c951ee1e449be916e718a9f2c51070d4864d
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Jan 7 13:34:20 2021 +0100

    [FLINK-20619][runtime] Remove unused InputDependencyConstraint
    
    This work is part of FLINK-20589
    
    This closes #14579
---
 .../apache/flink/api/common/ExecutionConfig.java   |  46 ++--
 .../api/common/InputDependencyConstraint.java      |   9 +-
 .../optimizer/plantranslate/JobGraphGenerator.java |   8 +-
 flink-python/pyflink/common/execution_config.py    |  16 ++
 .../pyflink/common/tests/test_execution_config.py  |  17 +-
 .../runtime/executiongraph/ExecutionJobVertex.java |   5 -
 .../runtime/executiongraph/ExecutionVertex.java    |  63 -----
 .../apache/flink/runtime/jobgraph/JobVertex.java   |  12 -
 .../adapter/DefaultExecutionTopology.java          |   5 +-
 .../scheduler/adapter/DefaultExecutionVertex.java  |  12 +-
 .../strategy/InputDependencyConstraintChecker.java | 194 ---------------
 .../strategy/SchedulingExecutionVertex.java        |   8 -
 .../runtime/scheduler/DefaultSchedulerTest.java    |  48 ----
 .../adapter/DefaultExecutionTopologyTest.java      |  16 +-
 .../adapter/DefaultExecutionVertexTest.java        |   7 +-
 .../DefaultSchedulingPipelinedRegionTest.java      |   4 +-
 .../InputDependencyConstraintCheckerTest.java      | 274 ---------------------
 .../strategy/TestingSchedulingExecutionVertex.java |  23 +-
 .../strategy/TestingSchedulingTopology.java        |  10 -
 .../api/graph/StreamingJobGraphGenerator.java      |   4 -
 20 files changed, 50 insertions(+), 731 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 8005e85..c6f7f58 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -151,9 +151,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
      * This flag defines if we use compression for the state snapshot data or 
not. Default: false
      */
     private boolean useSnapshotCompression = false;
-    /** The default input dependency constraint to schedule tasks. */
-    private InputDependencyConstraint defaultInputDependencyConstraint =
-            InputDependencyConstraint.ANY;
 
     // ------------------------------- User code values 
--------------------------------------------
 
@@ -528,36 +525,29 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
     }
 
     /**
-     * Sets the default input dependency constraint for vertex scheduling. It 
indicates when a task
-     * should be scheduled considering its inputs status.
+     * This method is deprecated. It was used to set the {@link 
InputDependencyConstraint} utilized
+     * by the old scheduler implementations which got removed as part of 
FLINK-20589. The current
+     * implementation has no effect.
      *
-     * <p>The default constraint is {@link InputDependencyConstraint#ANY}.
-     *
-     * @param inputDependencyConstraint The input dependency constraint.
+     * @param ignored Ignored parameter.
+     * @deprecated due to the deprecation of {@code InputDependencyConstraint}.
      */
     @PublicEvolving
-    public void setDefaultInputDependencyConstraint(
-            InputDependencyConstraint inputDependencyConstraint) {
-        if (inputDependencyConstraint != null) {
-            this.defaultInputDependencyConstraint = inputDependencyConstraint;
-        } else {
-            // defaultInputDependencyConstraint is not allowed to be null
-            // setting it to ANY to not break existing jobs
-            this.defaultInputDependencyConstraint = 
InputDependencyConstraint.ANY;
-        }
-    }
+    @Deprecated
+    public void setDefaultInputDependencyConstraint(InputDependencyConstraint 
ignored) {}
 
     /**
-     * Gets the default input dependency constraint for vertex scheduling. It 
indicates when a task
-     * should be scheduled considering its inputs status.
-     *
-     * <p>The default constraint is {@link InputDependencyConstraint#ANY}.
+     * This method is deprecated. It was used to return the {@link 
InputDependencyConstraint}
+     * utilized by the old scheduler implementations. These implementations 
were removed as part of
+     * FLINK-20589.
      *
-     * @return The input dependency constraint of this job.
+     * @return The previous default constraint {@link 
InputDependencyConstraint#ANY}.
+     * @deprecated due to the deprecation of {@code InputDependencyConstraint}.
      */
     @PublicEvolving
+    @Deprecated
     public InputDependencyConstraint getDefaultInputDependencyConstraint() {
-        return defaultInputDependencyConstraint;
+        return InputDependencyConstraint.ANY;
     }
 
     /**
@@ -913,8 +903,7 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                     && registeredKryoTypes.equals(other.registeredKryoTypes)
                     && registeredPojoTypes.equals(other.registeredPojoTypes)
                     && taskCancellationIntervalMillis == 
other.taskCancellationIntervalMillis
-                    && useSnapshotCompression == other.useSnapshotCompression
-                    && defaultInputDependencyConstraint == 
other.defaultInputDependencyConstraint;
+                    && useSnapshotCompression == other.useSnapshotCompression;
 
         } else {
             return false;
@@ -940,8 +929,7 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                 registeredKryoTypes,
                 registeredPojoTypes,
                 taskCancellationIntervalMillis,
-                useSnapshotCompression,
-                defaultInputDependencyConstraint);
+                useSnapshotCompression);
     }
 
     @Override
@@ -985,8 +973,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                 + taskCancellationTimeoutMillis
                 + ", useSnapshotCompression="
                 + useSnapshotCompression
-                + ", defaultInputDependencyConstraint="
-                + defaultInputDependencyConstraint
                 + ", globalJobParameters="
                 + globalJobParameters
                 + ", registeredTypesWithKryoSerializers="
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
index 5b57495..40022c4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -20,8 +20,15 @@ package org.apache.flink.api.common;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-/** This constraint indicates when a task should be scheduled considering its 
inputs status. */
+/**
+ * This constraint indicates when a task should be scheduled considering its 
inputs status.
+ *
+ * @deprecated {@code InputDependencyConstraint} is not used anymore and will 
be deleted in one of
+ *     the future versions. It was previously used in the scheduler 
implementations that were
+ *     removed as part of FLINK-20589.
+ */
 @PublicEvolving
+@Deprecated
 public enum InputDependencyConstraint {
 
     /** Schedule the task if any input is consumable. */
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index f80847c..4da3354 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -255,13 +255,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
         }
 
         // add vertices to the graph
-        for (JobVertex vertex : this.vertices.values()) {
-            vertex.setInputDependencyConstraint(
-                    program.getOriginalPlan()
-                            .getExecutionConfig()
-                            .getDefaultInputDependencyConstraint());
-            graph.addVertex(vertex);
-        }
+        this.vertices.values().forEach(graph::addVertex);
 
         for (JobVertex vertex : this.auxVertices) {
             graph.addVertex(vertex);
diff --git a/flink-python/pyflink/common/execution_config.py 
b/flink-python/pyflink/common/execution_config.py
index 9a08105..4fcf7b9 100644
--- a/flink-python/pyflink/common/execution_config.py
+++ b/flink-python/pyflink/common/execution_config.py
@@ -15,6 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+import warnings
+
 from typing import Dict, List
 
 from pyflink.common.execution_mode import ExecutionMode
@@ -328,7 +330,14 @@ class ExecutionConfig(object):
         :param input_dependency_constraint: The input dependency constraint. 
The constraints could
                                             be 
:data:`InputDependencyConstraint.ANY` or
                                             
:data:`InputDependencyConstraint.ALL`.
+
+        .. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is 
not used anymore in the
+                  current scheduler implementations.
         """
+        warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not 
used anywhere. "
+                      "Therefore, the method call 
set_default_input_dependency_constraint is "
+                      "obsolete.", DeprecationWarning)
+
         self._j_execution_config.setDefaultInputDependencyConstraint(
             input_dependency_constraint._to_j_input_dependency_constraint())
         return self
@@ -344,7 +353,14 @@ class ExecutionConfig(object):
 
         :return: The input dependency constraint of this job. The possible 
constraints are
                  :data:`InputDependencyConstraint.ANY` and 
:data:`InputDependencyConstraint.ALL`.
+
+        .. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is 
not used anymore in the
+                  current scheduler implementations.
         """
+        warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not 
used anywhere. "
+                      "Therefore, the method call 
get_default_input_dependency_constraint is "
+                      "obsolete.", DeprecationWarning)
+
         j_input_dependency_constraint = self._j_execution_config\
             .getDefaultInputDependencyConstraint()
         return InputDependencyConstraint._from_j_input_dependency_constraint(
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py 
b/flink-python/pyflink/common/tests/test_execution_config.py
index a5aab74..8f3ba67 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -16,8 +16,7 @@
 # limitations under the License.
 
################################################################################
 from pyflink.dataset import ExecutionEnvironment
-from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode,
-                            InputDependencyConstraint)
+from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode)
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import PyFlinkTestCase
 
@@ -132,20 +131,6 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
         self.assertEqual(self.execution_config.get_execution_mode(), 
ExecutionMode.PIPELINED_FORCED)
 
-    def test_get_set_default_input_dependency_constraint(self):
-
-        self.execution_config.set_default_input_dependency_constraint(
-            InputDependencyConstraint.ALL)
-
-        
self.assertEqual(self.execution_config.get_default_input_dependency_constraint(),
-                         InputDependencyConstraint.ALL)
-
-        self.execution_config.set_default_input_dependency_constraint(
-            InputDependencyConstraint.ANY)
-
-        
self.assertEqual(self.execution_config.get_default_input_dependency_constraint(),
-                         InputDependencyConstraint.ANY)
-
     def test_disable_enable_force_kryo(self):
 
         self.execution_config.disable_force_kryo()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 1db69e1..10dd636 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -365,10 +364,6 @@ public class ExecutionJobVertex
         return inputs;
     }
 
-    public InputDependencyConstraint getInputDependencyConstraint() {
-        return getJobVertex().getInputDependencyConstraint();
-    }
-
     public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
         return operatorCoordinators;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0acfb10..fe984b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.io.InputSplit;
@@ -346,10 +345,6 @@ public class ExecutionVertex
         return resultPartitions;
     }
 
-    public InputDependencyConstraint getInputDependencyConstraint() {
-        return getJobVertex().getInputDependencyConstraint();
-    }
-
     // 
--------------------------------------------------------------------------------------------
     //  Graph building
     // 
--------------------------------------------------------------------------------------------
@@ -777,64 +772,6 @@ public class ExecutionVertex
         }
     }
 
-    /**
-     * Check whether the InputDependencyConstraint is satisfied for this 
vertex.
-     *
-     * @return whether the input constraint is satisfied
-     */
-    boolean checkInputDependencyConstraints() {
-        if (inputEdges.length == 0) {
-            return true;
-        }
-
-        final InputDependencyConstraint inputDependencyConstraint = 
getInputDependencyConstraint();
-        switch (inputDependencyConstraint) {
-            case ANY:
-                return isAnyInputConsumable();
-            case ALL:
-                return areAllInputsConsumable();
-            default:
-                throw new IllegalStateException(
-                        "Unknown InputDependencyConstraint " + 
inputDependencyConstraint);
-        }
-    }
-
-    private boolean isAnyInputConsumable() {
-        for (int inputNumber = 0; inputNumber < inputEdges.length; 
inputNumber++) {
-            if (isInputConsumable(inputNumber)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private boolean areAllInputsConsumable() {
-        for (int inputNumber = 0; inputNumber < inputEdges.length; 
inputNumber++) {
-            if (!isInputConsumable(inputNumber)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Get whether an input of the vertex is consumable. An input is 
consumable when when any
-     * partition in it is consumable.
-     *
-     * <p>Note that a BLOCKING result partition is only consumable when all 
partitions in the result
-     * are FINISHED.
-     *
-     * @return whether the input is consumable
-     */
-    boolean isInputConsumable(int inputNumber) {
-        for (ExecutionEdge executionEdge : inputEdges[inputNumber]) {
-            if (executionEdge.getSource().isConsumable()) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     // 
--------------------------------------------------------------------------------------------
     //   Notifications from the Execution Attempt
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index d482941..1100826 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobgraph;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
@@ -133,9 +132,6 @@ public class JobVertex implements java.io.Serializable {
      */
     private String resultOptimizerProperties;
 
-    /** The input dependency constraint to schedule this vertex. */
-    private InputDependencyConstraint inputDependencyConstraint = 
InputDependencyConstraint.ANY;
-
     // 
--------------------------------------------------------------------------------------------
 
     /**
@@ -577,14 +573,6 @@ public class JobVertex implements java.io.Serializable {
         this.resultOptimizerProperties = resultOptimizerProperties;
     }
 
-    public InputDependencyConstraint getInputDependencyConstraint() {
-        return inputDependencyConstraint;
-    }
-
-    public void setInputDependencyConstraint(InputDependencyConstraint 
inputDependencyConstraint) {
-        this.inputDependencyConstraint = inputDependencyConstraint;
-    }
-
     // 
--------------------------------------------------------------------------------------------
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
index be13c51..45a6de4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
@@ -194,10 +194,7 @@ public class DefaultExecutionTopology implements 
SchedulingTopology {
 
         DefaultExecutionVertex schedulingVertex =
                 new DefaultExecutionVertex(
-                        vertex.getID(),
-                        producedPartitions,
-                        vertex::getExecutionState,
-                        vertex.getInputDependencyConstraint());
+                        vertex.getID(), producedPartitions, 
vertex::getExecutionState);
 
         producedPartitions.forEach(partition -> 
partition.setProducer(schedulingVertex));
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
index fa7ad8a..125c622 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
@@ -40,18 +39,14 @@ class DefaultExecutionVertex implements 
SchedulingExecutionVertex {
 
     private final Supplier<ExecutionState> stateSupplier;
 
-    private final InputDependencyConstraint inputDependencyConstraint;
-
     DefaultExecutionVertex(
             ExecutionVertexID executionVertexId,
             List<DefaultResultPartition> producedPartitions,
-            Supplier<ExecutionState> stateSupplier,
-            InputDependencyConstraint constraint) {
+            Supplier<ExecutionState> stateSupplier) {
         this.executionVertexId = checkNotNull(executionVertexId);
         this.consumedResults = new ArrayList<>();
         this.stateSupplier = checkNotNull(stateSupplier);
         this.producedResults = checkNotNull(producedPartitions);
-        this.inputDependencyConstraint = checkNotNull(constraint);
     }
 
     @Override
@@ -74,11 +69,6 @@ class DefaultExecutionVertex implements 
SchedulingExecutionVertex {
         return producedResults;
     }
 
-    @Override
-    public InputDependencyConstraint getInputDependencyConstraint() {
-        return inputDependencyConstraint;
-    }
-
     void addConsumedResult(DefaultResultPartition result) {
         consumedResults.add(result);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
deleted file mode 100644
index 0b702d7..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.strategy;
-
-import org.apache.flink.api.common.InputDependencyConstraint;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-
-/** A wrapper class for {@link InputDependencyConstraint} checker. */
-public class InputDependencyConstraintChecker {
-    private final SchedulingIntermediateDataSetManager 
intermediateDataSetManager =
-            new SchedulingIntermediateDataSetManager();
-
-    public boolean check(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
-        if (Iterables.isEmpty(schedulingExecutionVertex.getConsumedResults())) 
{
-            return true;
-        }
-
-        final InputDependencyConstraint inputConstraint =
-                schedulingExecutionVertex.getInputDependencyConstraint();
-        switch (inputConstraint) {
-            case ANY:
-                return checkAny(schedulingExecutionVertex);
-            case ALL:
-                return checkAll(schedulingExecutionVertex);
-            default:
-                throw new IllegalStateException(
-                        "Unknown InputDependencyConstraint " + 
inputConstraint);
-        }
-    }
-
-    List<SchedulingResultPartition> markSchedulingResultPartitionFinished(
-            SchedulingResultPartition srp) {
-        return 
intermediateDataSetManager.markSchedulingResultPartitionFinished(srp);
-    }
-
-    void resetSchedulingResultPartition(SchedulingResultPartition srp) {
-        intermediateDataSetManager.resetSchedulingResultPartition(srp);
-    }
-
-    void addSchedulingResultPartition(SchedulingResultPartition srp) {
-        intermediateDataSetManager.addSchedulingResultPartition(srp);
-    }
-
-    private boolean checkAll(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
-        for (SchedulingResultPartition consumedResultPartition :
-                schedulingExecutionVertex.getConsumedResults()) {
-            if (!partitionConsumable(consumedResultPartition)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private boolean checkAny(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
-        for (SchedulingResultPartition consumedResultPartition :
-                schedulingExecutionVertex.getConsumedResults()) {
-            if (partitionConsumable(consumedResultPartition)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private boolean partitionConsumable(SchedulingResultPartition partition) {
-        if (BLOCKING.equals(partition.getResultType())) {
-            return intermediateDataSetManager.allPartitionsFinished(partition);
-        } else {
-            final ResultPartitionState state = partition.getState();
-            return ResultPartitionState.CONSUMABLE.equals(state);
-        }
-    }
-
-    private static class SchedulingIntermediateDataSetManager {
-
-        private final Map<IntermediateDataSetID, SchedulingIntermediateDataSet>
-                intermediateDataSets = new HashMap<>();
-
-        List<SchedulingResultPartition> markSchedulingResultPartitionFinished(
-                SchedulingResultPartition srp) {
-            SchedulingIntermediateDataSet intermediateDataSet =
-                    getSchedulingIntermediateDataSet(srp.getResultId());
-            if (intermediateDataSet.markPartitionFinished(srp.getId())) {
-                return intermediateDataSet.getSchedulingResultPartitions();
-            }
-            return Collections.emptyList();
-        }
-
-        void resetSchedulingResultPartition(SchedulingResultPartition srp) {
-            SchedulingIntermediateDataSet sid = 
getSchedulingIntermediateDataSet(srp.getResultId());
-            sid.resetPartition(srp.getId());
-        }
-
-        void addSchedulingResultPartition(SchedulingResultPartition srp) {
-            SchedulingIntermediateDataSet sid =
-                    
getOrCreateSchedulingIntermediateDataSetIfAbsent(srp.getResultId());
-            sid.addSchedulingResultPartition(srp);
-        }
-
-        boolean allPartitionsFinished(SchedulingResultPartition srp) {
-            SchedulingIntermediateDataSet sid = 
getSchedulingIntermediateDataSet(srp.getResultId());
-            return sid.allPartitionsFinished();
-        }
-
-        private SchedulingIntermediateDataSet getSchedulingIntermediateDataSet(
-                final IntermediateDataSetID intermediateDataSetId) {
-            return 
getSchedulingIntermediateDataSetInternal(intermediateDataSetId, false);
-        }
-
-        private SchedulingIntermediateDataSet 
getOrCreateSchedulingIntermediateDataSetIfAbsent(
-                final IntermediateDataSetID intermediateDataSetId) {
-            return 
getSchedulingIntermediateDataSetInternal(intermediateDataSetId, true);
-        }
-
-        private SchedulingIntermediateDataSet 
getSchedulingIntermediateDataSetInternal(
-                final IntermediateDataSetID intermediateDataSetId, boolean 
createIfAbsent) {
-
-            return intermediateDataSets.computeIfAbsent(
-                    intermediateDataSetId,
-                    (key) -> {
-                        if (createIfAbsent) {
-                            return new SchedulingIntermediateDataSet();
-                        } else {
-                            throw new IllegalArgumentException(
-                                    "can not find data set for " + 
intermediateDataSetId);
-                        }
-                    });
-        }
-    }
-
-    /** Representation of {@link IntermediateDataSet}. */
-    private static class SchedulingIntermediateDataSet {
-
-        private final List<SchedulingResultPartition> partitions;
-
-        private final Set<IntermediateResultPartitionID> producingPartitionIds;
-
-        SchedulingIntermediateDataSet() {
-            partitions = new ArrayList<>();
-            producingPartitionIds = new HashSet<>();
-        }
-
-        boolean markPartitionFinished(IntermediateResultPartitionID 
partitionId) {
-            producingPartitionIds.remove(partitionId);
-            return producingPartitionIds.isEmpty();
-        }
-
-        void resetPartition(IntermediateResultPartitionID partitionId) {
-            producingPartitionIds.add(partitionId);
-        }
-
-        boolean allPartitionsFinished() {
-            return producingPartitionIds.isEmpty();
-        }
-
-        void addSchedulingResultPartition(SchedulingResultPartition partition) 
{
-            partitions.add(partition);
-            producingPartitionIds.add(partition.getId());
-        }
-
-        List<SchedulingResultPartition> getSchedulingResultPartitions() {
-            return Collections.unmodifiableList(partitions);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
index b2a2943..80204be 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler.strategy;
 
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -38,11 +37,4 @@ public interface SchedulingExecutionVertex
      * @return state of the execution vertex
      */
     ExecutionState getState();
-
-    /**
-     * Get {@link InputDependencyConstraint}.
-     *
-     * @return input dependency constraint
-     */
-    InputDependencyConstraint getInputDependencyConstraint();
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 3f9689c..4bb7b07 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
@@ -28,7 +27,6 @@ import 
org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -68,7 +66,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -91,7 +88,6 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
@@ -679,35 +675,6 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testInputConstraintALLPerf() throws Exception {
-        final int parallelism = 1000;
-        final JobVertex v1 = createVertexWithAllInputConstraints("vertex1", 
parallelism);
-        final JobVertex v2 = createVertexWithAllInputConstraints("vertex2", 
parallelism);
-        final JobVertex v3 = createVertexWithAllInputConstraints("vertex3", 
parallelism);
-        v2.connectNewDataSetAsInput(
-                v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
-        v2.connectNewDataSetAsInput(
-                v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
-
-        final JobGraph jobGraph = new JobGraph(v1, v2, v3);
-        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
-        final AccessExecutionJobVertex ejv1 =
-                scheduler.requestJob().getAllVertices().get(v1.getID());
-
-        for (int i = 0; i < parallelism - 1; i++) {
-            finishSubtask(scheduler, ejv1, i);
-        }
-
-        final long startTime = System.nanoTime();
-        finishSubtask(scheduler, ejv1, parallelism - 1);
-
-        final Duration duration = Duration.ofNanos(System.nanoTime() - 
startTime);
-        final Duration timeout = Duration.ofSeconds(5);
-
-        assertThat(duration, lessThan(timeout));
-    }
-
-    @Test
     public void failJobWillIncrementVertexVersions() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
@@ -931,12 +898,6 @@ public class DefaultSchedulerTest extends TestLogger {
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
     }
 
-    private static JobVertex createVertexWithAllInputConstraints(String name, 
int parallelism) {
-        final JobVertex v = createVertex(name, parallelism);
-        v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
-        return v;
-    }
-
     private static JobVertex createVertex(String name, int parallelism) {
         final JobVertex v = new JobVertex(name);
         v.setParallelism(parallelism);
@@ -944,15 +905,6 @@ public class DefaultSchedulerTest extends TestLogger {
         return v;
     }
 
-    private static void finishSubtask(
-            DefaultScheduler scheduler, AccessExecutionJobVertex vertex, int 
subtask) {
-        final ExecutionAttemptID attemptId =
-                
vertex.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
-        scheduler.updateTaskExecutionState(
-                new TaskExecutionState(
-                        scheduler.getJobGraph().getJobID(), attemptId, 
ExecutionState.FINISHED));
-    }
-
     private void waitForTermination(final DefaultScheduler scheduler) throws 
Exception {
         scheduler.getTerminationFuture().get(TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
index 7109a1a..a7d6bbb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
@@ -47,8 +47,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static junit.framework.TestCase.assertTrue;
-import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
@@ -71,8 +69,6 @@ public class DefaultExecutionTopologyTest extends TestLogger {
         jobVertices[0] = createNoOpVertex(parallelism);
         jobVertices[1] = createNoOpVertex(parallelism);
         jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, 
PIPELINED);
-        jobVertices[0].setInputDependencyConstraint(ALL);
-        jobVertices[1].setInputDependencyConstraint(ANY);
         executionGraph = createSimpleTestGraph(jobVertices);
         adapter = DefaultExecutionTopology.fromExecutionGraph(executionGraph);
     }
@@ -182,7 +178,7 @@ public class DefaultExecutionTopologyTest extends 
TestLogger {
             ExecutionVertex originalVertex = originalVertices.next();
             DefaultExecutionVertex adaptedVertex = adaptedVertices.next();
 
-            assertVertexEquals(originalVertex, adaptedVertex);
+            assertEquals(originalVertex.getID(), adaptedVertex.getId());
 
             List<IntermediateResultPartition> originalConsumedPartitions =
                     IntStream.range(0, originalVertex.getNumberOfInputs())
@@ -262,15 +258,7 @@ public class DefaultExecutionTopologyTest extends 
TestLogger {
         assertEquals(
                 originalPartition.getIntermediateResult().getId(), 
adaptedPartition.getResultId());
         assertEquals(originalPartition.getResultType(), 
adaptedPartition.getResultType());
-        assertVertexEquals(originalPartition.getProducer(), 
adaptedPartition.getProducer());
-    }
-
-    private static void assertVertexEquals(
-            ExecutionVertex originalVertex, DefaultExecutionVertex 
adaptedVertex) {
-
-        assertEquals(originalVertex.getID(), adaptedVertex.getId());
         assertEquals(
-                originalVertex.getInputDependencyConstraint(),
-                adaptedVertex.getInputDependencyConstraint());
+                originalPartition.getProducer().getID(), 
adaptedPartition.getProducer().getId());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
index 0b41b6b..7cf970c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
@@ -34,7 +34,6 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.function.Supplier;
 
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
 import static org.junit.Assert.assertEquals;
 
@@ -64,15 +63,13 @@ public class DefaultExecutionVertexTest extends TestLogger {
                 new DefaultExecutionVertex(
                         new ExecutionVertexID(new JobVertexID(), 0),
                         Collections.singletonList(schedulingResultPartition),
-                        stateSupplier,
-                        ANY);
+                        stateSupplier);
         schedulingResultPartition.setProducer(producerVertex);
         consumerVertex =
                 new DefaultExecutionVertex(
                         new ExecutionVertexID(new JobVertexID(), 0),
                         Collections.emptyList(),
-                        stateSupplier,
-                        ANY);
+                        stateSupplier);
         consumerVertex.addConsumedResult(schedulingResultPartition);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
index 70094ea..5cdb6bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
@@ -70,8 +69,7 @@ public class DefaultSchedulingPipelinedRegionTest extends 
TestLogger {
                 new DefaultExecutionVertex(
                         new ExecutionVertexID(new JobVertexID(), 0),
                         Collections.emptyList(),
-                        () -> ExecutionState.CREATED,
-                        InputDependencyConstraint.ANY);
+                        () -> ExecutionState.CREATED);
 
         final Set<DefaultExecutionVertex> vertices = 
Collections.singleton(vertex);
         final DefaultSchedulingPipelinedRegion pipelinedRegion =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
deleted file mode 100644
index 6b15bbf..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.strategy;
-
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/** Unit tests for {@link InputDependencyConstraintChecker}. */
-public class InputDependencyConstraintCheckerTest extends TestLogger {
-
-    @Test
-    public void testCheckInputVertex() {
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder().build();
-        final InputDependencyConstraintChecker inputChecker =
-                
createInputDependencyConstraintChecker(Collections.emptyList());
-
-        assertTrue(inputChecker.check(vertex));
-    }
-
-    @Test
-    public void testCheckCreatedPipelinedInput() {
-        final List<TestingSchedulingResultPartition> partitions =
-                addResultPartition()
-                        .withPartitionType(PIPELINED)
-                        .withPartitionState(ResultPartitionState.CREATED)
-                        .finish();
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder()
-                        .withConsumedPartitions(partitions)
-                        .build();
-
-        final InputDependencyConstraintChecker inputChecker =
-                createInputDependencyConstraintChecker(partitions);
-
-        assertFalse(inputChecker.check(vertex));
-    }
-
-    @Test
-    public void testCheckConsumablePipelinedInput() {
-        final List<TestingSchedulingResultPartition> partitions =
-                addResultPartition()
-                        .withPartitionType(PIPELINED)
-                        .withPartitionState(ResultPartitionState.CONSUMABLE)
-                        .finish();
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder()
-                        .withConsumedPartitions(partitions)
-                        .build();
-
-        final InputDependencyConstraintChecker inputChecker =
-                createInputDependencyConstraintChecker(partitions);
-
-        assertTrue(inputChecker.check(vertex));
-    }
-
-    @Test
-    public void testCheckDoneBlockingInput() {
-        final List<TestingSchedulingResultPartition> partitions =
-                addResultPartition().withPartitionCntPerDataSet(2).finish();
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder()
-                        .withConsumedPartitions(partitions)
-                        .build();
-
-        final InputDependencyConstraintChecker inputChecker =
-                createInputDependencyConstraintChecker(partitions);
-
-        for (TestingSchedulingResultPartition srp : partitions) {
-            inputChecker.markSchedulingResultPartitionFinished(srp);
-        }
-
-        assertTrue(inputChecker.check(vertex));
-    }
-
-    @Test
-    public void testCheckPartialDoneBlockingInput() {
-        final List<TestingSchedulingResultPartition> partitions =
-                addResultPartition().withPartitionCntPerDataSet(2).finish();
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder()
-                        .withConsumedPartitions(partitions)
-                        .build();
-
-        final InputDependencyConstraintChecker inputChecker =
-                createInputDependencyConstraintChecker(partitions);
-
-        inputChecker.markSchedulingResultPartitionFinished(partitions.get(0));
-
-        assertFalse(inputChecker.check(vertex));
-    }
-
-    @Test
-    public void testCheckResetBlockingInput() {
-        final List<TestingSchedulingResultPartition> partitions =
-                addResultPartition().withPartitionCntPerDataSet(2).finish();
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder()
-                        .withConsumedPartitions(partitions)
-                        .build();
-
-        final InputDependencyConstraintChecker inputChecker =
-                createInputDependencyConstraintChecker(partitions);
-
-        for (TestingSchedulingResultPartition srp : partitions) {
-            inputChecker.markSchedulingResultPartitionFinished(srp);
-        }
-
-        for (TestingSchedulingResultPartition srp : partitions) {
-            inputChecker.resetSchedulingResultPartition(srp);
-        }
-
-        assertFalse(inputChecker.check(vertex));
-    }
-
-    @Test
-    public void testCheckAnyBlockingInput() {
-        final List<TestingSchedulingResultPartition> partitions =
-                addResultPartition().withDataSetCnt(2).finish();
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder()
-                        .withConsumedPartitions(partitions)
-                        .build();
-
-        final InputDependencyConstraintChecker inputChecker =
-                createInputDependencyConstraintChecker(partitions);
-
-        inputChecker.markSchedulingResultPartitionFinished(partitions.get(0));
-
-        assertTrue(inputChecker.check(vertex));
-    }
-
-    @Test
-    public void testCheckAllBlockingInput() {
-        final List<TestingSchedulingResultPartition> partitions =
-                addResultPartition().withDataSetCnt(2).finish();
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder()
-                        .withInputDependencyConstraint(ALL)
-                        .withConsumedPartitions(partitions)
-                        .build();
-
-        final InputDependencyConstraintChecker inputChecker =
-                createInputDependencyConstraintChecker(partitions);
-
-        for (TestingSchedulingResultPartition srp : partitions) {
-            inputChecker.markSchedulingResultPartitionFinished(srp);
-        }
-
-        assertTrue(inputChecker.check(vertex));
-    }
-
-    @Test
-    public void testCheckAllPartialDatasetBlockingInput() {
-        final List<TestingSchedulingResultPartition> partitions =
-                addResultPartition().withDataSetCnt(2).finish();
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder()
-                        .withInputDependencyConstraint(ALL)
-                        .withConsumedPartitions(partitions)
-                        .build();
-
-        final InputDependencyConstraintChecker inputChecker =
-                createInputDependencyConstraintChecker(partitions);
-
-        inputChecker.markSchedulingResultPartitionFinished(partitions.get(0));
-        assertFalse(inputChecker.check(vertex));
-    }
-
-    @Test
-    public void testCheckAllPartialPartitionBlockingInput() {
-        final List<TestingSchedulingResultPartition> partitions =
-                
addResultPartition().withDataSetCnt(2).withPartitionCntPerDataSet(2).finish();
-        final TestingSchedulingExecutionVertex vertex =
-                TestingSchedulingExecutionVertex.newBuilder()
-                        .withInputDependencyConstraint(ALL)
-                        .withConsumedPartitions(partitions)
-                        .build();
-
-        final InputDependencyConstraintChecker inputChecker =
-                createInputDependencyConstraintChecker(partitions);
-
-        for (int idx = 0; idx < 3; idx++) {
-            
inputChecker.markSchedulingResultPartitionFinished(partitions.get(idx));
-        }
-
-        assertFalse(inputChecker.check(vertex));
-    }
-
-    private static TestingSchedulingResultPartitionBuilder 
addResultPartition() {
-        return new TestingSchedulingResultPartitionBuilder();
-    }
-
-    private static InputDependencyConstraintChecker 
createInputDependencyConstraintChecker(
-            List<TestingSchedulingResultPartition> partitions) {
-
-        InputDependencyConstraintChecker inputChecker = new 
InputDependencyConstraintChecker();
-        for (SchedulingResultPartition partition : partitions) {
-            inputChecker.addSchedulingResultPartition(partition);
-        }
-        return inputChecker;
-    }
-
-    private static class TestingSchedulingResultPartitionBuilder {
-        private int dataSetCnt = 1;
-        private int partitionCntPerDataSet = 1;
-        private ResultPartitionType partitionType = BLOCKING;
-        private ResultPartitionState partitionState = 
ResultPartitionState.CONSUMABLE;
-
-        TestingSchedulingResultPartitionBuilder withDataSetCnt(int dataSetCnt) 
{
-            this.dataSetCnt = dataSetCnt;
-            return this;
-        }
-
-        TestingSchedulingResultPartitionBuilder withPartitionCntPerDataSet(int 
partitionCnt) {
-            this.partitionCntPerDataSet = partitionCnt;
-            return this;
-        }
-
-        TestingSchedulingResultPartitionBuilder 
withPartitionType(ResultPartitionType type) {
-            this.partitionType = type;
-            return this;
-        }
-
-        TestingSchedulingResultPartitionBuilder 
withPartitionState(ResultPartitionState state) {
-            this.partitionState = state;
-            return this;
-        }
-
-        List<TestingSchedulingResultPartition> finish() {
-            List<TestingSchedulingResultPartition> partitions =
-                    new ArrayList<>(dataSetCnt * partitionCntPerDataSet);
-            for (int dataSetIdx = 0; dataSetIdx < dataSetCnt; dataSetIdx++) {
-                IntermediateDataSetID dataSetId = new IntermediateDataSetID();
-                for (int partitionIdx = 0; partitionIdx < 
partitionCntPerDataSet; partitionIdx++) {
-                    partitions.add(
-                            new TestingSchedulingResultPartition(
-                                    dataSetId, partitionType, partitionState));
-                }
-            }
-
-            return partitions;
-        }
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
index 166a904..593e363 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler.strategy;
 
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -26,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A simple scheduling execution vertex for testing purposes. */
@@ -38,19 +36,15 @@ public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVert
 
     private final Collection<TestingSchedulingResultPartition> 
producedPartitions;
 
-    private final InputDependencyConstraint inputDependencyConstraint;
-
     private ExecutionState executionState;
 
     public TestingSchedulingExecutionVertex(
             JobVertexID jobVertexId,
             int subtaskIndex,
-            InputDependencyConstraint constraint,
             Collection<TestingSchedulingResultPartition> consumedPartitions,
             ExecutionState executionState) {
 
         this.executionVertexId = new ExecutionVertexID(jobVertexId, 
subtaskIndex);
-        this.inputDependencyConstraint = constraint;
         this.consumedPartitions = checkNotNull(consumedPartitions);
         this.producedPartitions = new ArrayList<>();
         this.executionState = executionState;
@@ -80,11 +74,6 @@ public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVert
         return producedPartitions;
     }
 
-    @Override
-    public InputDependencyConstraint getInputDependencyConstraint() {
-        return inputDependencyConstraint;
-    }
-
     void addConsumedPartition(TestingSchedulingResultPartition partition) {
         consumedPartitions.add(partition);
     }
@@ -106,7 +95,6 @@ public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVert
     public static class Builder {
         private JobVertexID jobVertexId = new JobVertexID();
         private int subtaskIndex = 0;
-        private InputDependencyConstraint inputDependencyConstraint = ANY;
         private List<TestingSchedulingResultPartition> partitions = new 
ArrayList<>();
         private ExecutionState executionState = ExecutionState.CREATED;
 
@@ -116,11 +104,6 @@ public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVert
             return this;
         }
 
-        Builder withInputDependencyConstraint(InputDependencyConstraint 
constraint) {
-            this.inputDependencyConstraint = constraint;
-            return this;
-        }
-
         public Builder 
withConsumedPartitions(List<TestingSchedulingResultPartition> partitions) {
             this.partitions = partitions;
             return this;
@@ -133,11 +116,7 @@ public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVert
 
         public TestingSchedulingExecutionVertex build() {
             return new TestingSchedulingExecutionVertex(
-                    jobVertexId,
-                    subtaskIndex,
-                    inputDependencyConstraint,
-                    partitions,
-                    executionState);
+                    jobVertexId, subtaskIndex, partitions, executionState);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index e68fc20..806cfda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler.strategy;
 
-import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -352,19 +351,11 @@ public class TestingSchedulingTopology implements 
SchedulingTopology {
 
         private int parallelism = 1;
 
-        private InputDependencyConstraint inputDependencyConstraint = 
InputDependencyConstraint.ANY;
-
         public SchedulingExecutionVerticesBuilder withParallelism(final int 
parallelism) {
             this.parallelism = parallelism;
             return this;
         }
 
-        public SchedulingExecutionVerticesBuilder 
withInputDependencyConstraint(
-                final InputDependencyConstraint inputDependencyConstraint) {
-            this.inputDependencyConstraint = inputDependencyConstraint;
-            return this;
-        }
-
         public List<TestingSchedulingExecutionVertex> finish() {
             final List<TestingSchedulingExecutionVertex> vertices = new 
ArrayList<>();
             for (int subtaskIndex = 0; subtaskIndex < parallelism; 
subtaskIndex++) {
@@ -380,7 +371,6 @@ public class TestingSchedulingTopology implements 
SchedulingTopology {
                 final int subtaskIndex) {
             return TestingSchedulingExecutionVertex.newBuilder()
                     .withExecutionVertexID(jobVertexId, subtaskIndex)
-                    .withInputDependencyConstraint(inputDependencyConstraint)
                     .build();
         }
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d0da83b..26720ea 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -597,10 +597,6 @@ public class StreamingJobGraphGenerator {
             LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
         }
 
-        // TODO: inherit InputDependencyConstraint from the head operator
-        jobVertex.setInputDependencyConstraint(
-                
streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());
-
         jobVertices.put(streamNodeId, jobVertex);
         builtVertices.add(streamNodeId);
         jobGraph.addVertex(jobVertex);

Reply via email to