This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fb4b63  [FLINK-13055][runtime] Leverage PartitionTracker for checking 
partition availability
4fb4b63 is described below

commit 4fb4b6318411fabf7d6167f02ad834e85ad66845
Author: Zhu Zhu <[email protected]>
AuthorDate: Wed Jul 24 19:11:08 2019 +0800

    [FLINK-13055][runtime] Leverage PartitionTracker for checking partition 
availability
---
 .../runtime/executiongraph/ExecutionGraph.java     | 13 +++-
 ...ionGraphResultPartitionAvailabilityChecker.java | 53 ++++++++++++++++
 .../AdaptedRestartPipelinedRegionStrategyNG.java   |  2 +-
 .../flip1/ResultPartitionAvailabilityChecker.java  |  2 +-
 .../io/network/partition/PartitionTracker.java     |  5 ++
 .../io/network/partition/PartitionTrackerImpl.java |  7 +++
 ...inedRegionStrategyNGConcurrentFailoverTest.java | 44 +++++++-------
 ...startPipelinedRegionStrategyNGFailoverTest.java | 10 +++
 ...raphResultPartitionAvailabilityCheckerTest.java | 71 ++++++++++++++++++++++
 .../io/network/partition/NoOpPartitionTracker.java |  5 ++
 .../network/partition/TestingPartitionTracker.java | 10 +++
 11 files changed, 196 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9ed7b0b..0c85b52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
 import 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
@@ -297,6 +298,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
 
        private final PartitionTracker partitionTracker;
 
+       private final ResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker;
+
        /**
         * Future for an ongoing or completed scheduling action.
         */
@@ -513,6 +516,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
 
                this.partitionTracker = checkNotNull(partitionTracker);
 
+               this.resultPartitionAvailabilityChecker = new 
ExecutionGraphResultPartitionAvailabilityChecker(
+                       this::createResultPartitionId,
+                       partitionTracker);
+
                LOG.info("Job recovers via failover strategy: {}", 
failoverStrategy.getStrategyName());
        }
 
@@ -1567,7 +1574,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                }
        }
 
-       private ResultPartitionID createResultPartitionId(final 
IntermediateResultPartitionID resultPartitionId) {
+       ResultPartitionID createResultPartitionId(final 
IntermediateResultPartitionID resultPartitionId) {
                final SchedulingResultPartition schedulingResultPartition = 
schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
                final SchedulingExecutionVertex producer = 
schedulingResultPartition.getProducer();
                final ExecutionVertexID producerId = producer.getId();
@@ -1743,6 +1750,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                return partitionTracker;
        }
 
+       public ResultPartitionAvailabilityChecker 
getResultPartitionAvailabilityChecker() {
+               return resultPartitionAvailabilityChecker;
+       }
+
        PartitionReleaseStrategy getPartitionReleaseStrategy() {
                return partitionReleaseStrategy;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityChecker.java
new file mode 100644
index 0000000..2ec0a34
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityChecker.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ResultPartitionAvailabilityChecker} which decides the intermediate 
result partition availability
+ * based on whether the corresponding result partition in the execution graph 
is tracked.
+ */
+public class ExecutionGraphResultPartitionAvailabilityChecker implements 
ResultPartitionAvailabilityChecker {
+
+       /** The function maps an IntermediateResultPartitionID to a 
ResultPartitionID. */
+       private final Function<IntermediateResultPartitionID, 
ResultPartitionID> partitionIDMapper;
+
+       /** The tracker that tracks all available result partitions. */
+       private final PartitionTracker partitionTracker;
+
+       ExecutionGraphResultPartitionAvailabilityChecker(
+                       final Function<IntermediateResultPartitionID, 
ResultPartitionID> partitionIDMapper,
+                       final PartitionTracker partitionTracker) {
+
+               this.partitionIDMapper = checkNotNull(partitionIDMapper);
+               this.partitionTracker = checkNotNull(partitionTracker);
+       }
+
+       @Override
+       public boolean isAvailable(final IntermediateResultPartitionID 
resultPartitionID) {
+               return 
partitionTracker.isPartitionTracked(partitionIDMapper.apply(resultPartitionID));
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index 337534f..ddb60af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -290,7 +290,7 @@ public class AdaptedRestartPipelinedRegionStrategyNG 
extends FailoverStrategy {
                // currently it's safe to add it here, as this method is 
invoked only once in production code.
                checkState(restartPipelinedRegionStrategy == null, 
"notifyNewVertices() must be called only once");
                this.restartPipelinedRegionStrategy = new 
RestartPipelinedRegionStrategy(
-                       new DefaultFailoverTopology(executionGraph));
+                       new DefaultFailoverTopology(executionGraph), 
executionGraph.getResultPartitionAvailabilityChecker());
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
index 286ad3e..b10724d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 /**
  * This checker helps to query result partition availability.
  */
-interface ResultPartitionAvailabilityChecker {
+public interface ResultPartitionAvailabilityChecker {
 
        /**
         * Returns whether the given partition is available.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
index 4c432f1..3697fb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
@@ -54,4 +54,9 @@ public interface PartitionTracker {
         * Returns whether any partition is being tracked for the given task 
executor ID.
         */
        boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId);
+
+       /**
+        * Returns whether the given partition is being tracked.
+        */
+       boolean isPartitionTracked(ResultPartitionID resultPartitionID);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index 53e7d3f..c52e8b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -122,6 +122,13 @@ public class PartitionTrackerImpl implements 
PartitionTracker {
                return 
partitionTable.hasTrackedPartitions(producingTaskExecutorId);
        }
 
+       @Override
+       public boolean isPartitionTracked(final ResultPartitionID 
resultPartitionID) {
+               Preconditions.checkNotNull(resultPartitionID);
+
+               return partitionInfos.containsKey(resultPartitionID);
+       }
+
        private Optional<PartitionInfo> 
internalStopTrackingPartition(ResultPartitionID resultPartitionId) {
                final PartitionInfo partitionInfo = 
partitionInfos.remove(resultPartitionId);
                if (partitionInfo == null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
index f5461f0..b23ec26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
@@ -19,8 +19,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -29,12 +27,14 @@ import 
org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStra
 import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
@@ -42,6 +42,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
@@ -250,34 +251,31 @@ public class 
AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest exten
         */
        private ExecutionGraph createExecutionGraph() throws Exception {
 
-               final JobInformation jobInformation = new 
DummyJobInformation(TEST_JOB_ID, "test job");
-               final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM);
-
-               final Time timeout = Time.seconds(10L);
-               final ExecutionGraph graph = new ExecutionGraph(
-                       jobInformation,
-                       TestingUtils.defaultExecutor(),
-                       TestingUtils.defaultExecutor(),
-                       timeout,
-                       manuallyTriggeredRestartStrategy,
-                       TestAdaptedRestartPipelinedRegionStrategyNG::new,
-                       slotProvider,
-                       getClass().getClassLoader(),
-                       VoidBlobWriter.getInstance(),
-                       timeout);
-
-               JobVertex v1 = new JobVertex("vertex1");
+               final JobVertex v1 = new JobVertex("vertex1");
                v1.setInvokableClass(NoOpInvokable.class);
                v1.setParallelism(DEFAULT_PARALLELISM);
 
-               JobVertex v2 = new JobVertex("vertex2");
+               final JobVertex v2 = new JobVertex("vertex2");
                v2.setInvokableClass(NoOpInvokable.class);
                v2.setParallelism(DEFAULT_PARALLELISM);
 
                v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
 
-               JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, v2);
-               
graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+               final JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, 
v2);
+
+               final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM);
+
+               final PartitionTracker partitionTracker = new 
PartitionTrackerImpl(
+                       jg.getJobID(),
+                       NettyShuffleMaster.INSTANCE,
+                       ignored -> Optional.empty());
+
+               final ExecutionGraph graph = new 
ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jg)
+                       .setRestartStrategy(manuallyTriggeredRestartStrategy)
+                       
.setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new)
+                       .setSlotProvider(slotProvider)
+                       .setPartitionTracker(partitionTracker)
+                       .build();
 
                graph.start(componentMainThreadExecutor);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index 3c1ca2b..acf9869 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -33,6 +33,8 @@ import 
org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
@@ -47,6 +49,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
@@ -57,6 +60,7 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
@@ -379,10 +383,16 @@ public class 
AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
                        final JobGraph jobGraph,
                        final RestartStrategy restartStrategy) throws Exception 
{
 
+               final PartitionTracker partitionTracker = new 
PartitionTrackerImpl(
+                       jobGraph.getJobID(),
+                       NettyShuffleMaster.INSTANCE,
+                       ignored -> Optional.empty());
+
                final ExecutionGraph eg = new 
ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph)
                        .setRestartStrategy(restartStrategy)
                        
.setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new)
                        .setSlotProvider(slotProvider)
+                       .setPartitionTracker(partitionTracker)
                        .build();
 
                eg.start(componentMainThreadExecutor);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityCheckerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityCheckerTest.java
new file mode 100644
index 0000000..ccbd439
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityCheckerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ExecutionGraphResultPartitionAvailabilityChecker}.
+ */
+public class ExecutionGraphResultPartitionAvailabilityCheckerTest extends 
TestLogger {
+
+       @Test
+       public void testPartitionAvailabilityCheck() {
+
+               final IntermediateResultPartitionID irp1ID = new 
IntermediateResultPartitionID();
+               final IntermediateResultPartitionID irp2ID = new 
IntermediateResultPartitionID();
+               final IntermediateResultPartitionID irp3ID = new 
IntermediateResultPartitionID();
+               final IntermediateResultPartitionID irp4ID = new 
IntermediateResultPartitionID();
+
+               final Map<IntermediateResultPartitionID, Boolean> 
expectedAvailability =
+                       new HashMap<IntermediateResultPartitionID, Boolean>() {{
+                               put(irp1ID, true);
+                               put(irp2ID, false);
+                               put(irp3ID, false);
+                               put(irp4ID, true);
+                       }};
+
+               // let the partition tracker respect the expected availability 
result
+               final TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
+               partitionTracker.setIsPartitionTrackedFunction(rpID -> 
expectedAvailability.get(rpID.getPartitionId()));
+
+               // the execution attempt ID should make no difference in this 
case
+               final Function<IntermediateResultPartitionID, 
ResultPartitionID> partitionIDMapper =
+                       intermediateResultPartitionID -> new 
ResultPartitionID(intermediateResultPartitionID, new ExecutionAttemptID());
+
+               final ResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker =
+                       new 
ExecutionGraphResultPartitionAvailabilityChecker(partitionIDMapper, 
partitionTracker);
+
+               for (IntermediateResultPartitionID irpID : 
expectedAvailability.keySet()) {
+                       assertEquals(expectedAvailability.get(irpID), 
resultPartitionAvailabilityChecker.isAvailable(irpID));
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
index 30895b2..0d1a160 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
@@ -50,4 +50,9 @@ public enum NoOpPartitionTracker implements PartitionTracker {
        public boolean isTrackingPartitionsFor(ResourceID 
producingTaskExecutorId) {
                return false;
        }
+
+       @Override
+       public boolean isPartitionTracked(final ResultPartitionID 
resultPartitionID) {
+               return false;
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
index 2bcda7e..2d85ff6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
 public class TestingPartitionTracker implements PartitionTracker {
 
        private Function<ResourceID, Boolean> isTrackingPartitionsForFunction = 
ignored -> false;
+       private Function<ResultPartitionID, Boolean> isPartitionTrackedFunction 
= ignored -> false;
        private Consumer<ResourceID> stopTrackingAllPartitionsConsumer = 
ignored -> {};
        private Consumer<ResourceID> 
stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {};
        private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> 
startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
@@ -44,6 +45,10 @@ public class TestingPartitionTracker implements 
PartitionTracker {
                this.isTrackingPartitionsForFunction = 
isTrackingPartitionsForFunction;
        }
 
+       public void setIsPartitionTrackedFunction(Function<ResultPartitionID, 
Boolean> isPartitionTrackedFunction) {
+               this.isPartitionTrackedFunction = isPartitionTrackedFunction;
+       }
+
        public void setStopTrackingAllPartitionsConsumer(Consumer<ResourceID> 
stopTrackingAllPartitionsConsumer) {
                this.stopTrackingAllPartitionsConsumer = 
stopTrackingAllPartitionsConsumer;
        }
@@ -80,4 +85,9 @@ public class TestingPartitionTracker implements 
PartitionTracker {
        public boolean isTrackingPartitionsFor(ResourceID 
producingTaskExecutorId) {
                return 
isTrackingPartitionsForFunction.apply(producingTaskExecutorId);
        }
+
+       @Override
+       public boolean isPartitionTracked(final ResultPartitionID 
resultPartitionID) {
+               return isPartitionTrackedFunction.apply(resultPartitionID);
+       }
 }

Reply via email to