This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 32c51c96880400fdd3ded0292b09e5c8d8ea28bb
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 28 18:18:41 2020 +0200

    [FLINK-16986][coordination][refactor] Reduce dependencies of 
OperatorCoordinatorHolder and OperatorCoordinatorCheckpointContext
    
    This simplifies both testing and future refactoring.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   7 +-
 .../OperatorCoordinatorCheckpointContext.java      |  41 +----
 .../checkpoint/OperatorCoordinatorCheckpoints.java |  20 +--
 .../runtime/checkpoint/PendingCheckpoint.java      |   3 +-
 .../coordination/OperatorCoordinatorHolder.java    | 172 +++++++++++++--------
 .../coordination/OperatorInfo.java}                |  30 +---
 .../runtime/checkpoint/PendingCheckpointTest.java  |  27 ++--
 ...kpointContext.java => TestingOperatorInfo.java} |  31 +---
 8 files changed, 151 insertions(+), 180 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 51b98f5..da518ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -626,7 +627,7 @@ public class CheckpointCoordinator {
                        checkpointID,
                        timestamp,
                        ackTasks,
-                       
OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
+                       OperatorInfo.getIds(coordinatorsToCheckpoint),
                        masterHooks.keySet(),
                        props,
                        checkpointStorageLocation,
@@ -1074,7 +1075,7 @@ public class CheckpointCoordinator {
 
                // commit coordinators
                for (OperatorCoordinatorCheckpointContext coordinatorContext : 
coordinatorsToCheckpoint) {
-                       
coordinatorContext.coordinator().checkpointComplete(checkpointId);
+                       coordinatorContext.checkpointComplete(checkpointId);
                }
        }
 
@@ -1496,7 +1497,7 @@ public class CheckpointCoordinator {
 
                        final ByteStreamStateHandle coordinatorState = 
state.getCoordinatorState();
                        if (coordinatorState != null) {
-                               
coordContext.coordinator().resetToCheckpoint(coordinatorState.getData());
+                               
coordContext.resetToCheckpoint(coordinatorState.getData());
                        }
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 92fd2aa..abc15b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -18,49 +18,24 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 
-import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * An {@link OperatorCoordinator} and its contextual information needed to 
trigger and
- * acknowledge a checkpoint.
+ * This context is the interface through which the {@link 
CheckpointCoordinator} interacts with an
+ * {@link OperatorCoordinator} during checkpointing and checkpoint restoring.
  */
-public interface OperatorCoordinatorCheckpointContext {
+public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
 
-       // 
------------------------------------------------------------------------
-       //  properties
-       // 
------------------------------------------------------------------------
-
-       OperatorCoordinator coordinator();
-
-       OperatorID operatorId();
-
-       int maxParallelism();
-
-       int currentParallelism();
-
-       // 
------------------------------------------------------------------------
-       //  checkpoint triggering callbacks
-       // 
------------------------------------------------------------------------
-
-       void onCallTriggerCheckpoint(long checkpointId);
-
-       void onCheckpointStateFutureComplete(long checkpointId);
+       CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) 
throws Exception;
 
        void afterSourceBarrierInjection(long checkpointId);
 
        void abortCurrentTriggering();
 
-       // 
------------------------------------------------------------------------
-       //  utils
-       // 
------------------------------------------------------------------------
+       void checkpointComplete(long checkpointId);
 
-       static Collection<OperatorID> 
getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
-               return infos.stream()
-                       .map(OperatorCoordinatorCheckpointContext::operatorId)
-                       .collect(Collectors.toList());
-       }
+       void resetToCheckpoint(byte[] checkpointData) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 68fc8f1..6215123 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import java.util.ArrayList;
@@ -39,15 +40,15 @@ import java.util.concurrent.Executor;
 final class OperatorCoordinatorCheckpoints {
 
        public static CompletableFuture<CoordinatorSnapshot> 
triggerCoordinatorCheckpoint(
-                       final OperatorCoordinatorCheckpointContext 
coordinatorInfo,
+                       final OperatorCoordinatorCheckpointContext 
coordinatorContext,
                        final long checkpointId) throws Exception {
 
                final CompletableFuture<byte[]> checkpointFuture =
-                               
coordinatorInfo.coordinator().checkpointCoordinator(checkpointId);
+                       coordinatorContext.checkpointCoordinator(checkpointId);
 
                return checkpointFuture.thenApply(
                                (state) -> new CoordinatorSnapshot(
-                                               coordinatorInfo, new 
ByteStreamStateHandle(coordinatorInfo.operatorId().toString(), state))
+                                               coordinatorContext, new 
ByteStreamStateHandle(coordinatorContext.operatorId().toString(), state))
                );
        }
 
@@ -59,16 +60,7 @@ final class OperatorCoordinatorCheckpoints {
 
                for (final OperatorCoordinatorCheckpointContext coordinator : 
coordinators) {
                        final CompletableFuture<CoordinatorSnapshot> 
checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
-                       coordinator.onCallTriggerCheckpoint(checkpointId);
-
                        individualSnapshots.add(checkpointFuture);
-                       checkpointFuture.whenComplete((ignored, failure) -> {
-                               if (failure != null) {
-                                       coordinator.abortCurrentTriggering();
-                               } else {
-                                       
coordinator.onCheckpointStateFutureComplete(checkpointId);
-                               }
-                       });
                }
 
                return 
FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
@@ -144,10 +136,10 @@ final class OperatorCoordinatorCheckpoints {
 
        static final class CoordinatorSnapshot {
 
-               final OperatorCoordinatorCheckpointContext coordinator;
+               final OperatorInfo coordinator;
                final ByteStreamStateHandle state;
 
-               CoordinatorSnapshot(OperatorCoordinatorCheckpointContext 
coordinator, ByteStreamStateHandle state) {
+               CoordinatorSnapshot(OperatorInfo coordinator, 
ByteStreamStateHandle state) {
                        this.coordinator = coordinator;
                        this.state = state;
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2a0eba7..376301d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -431,7 +432,7 @@ public class PendingCheckpoint {
        }
 
        public TaskAcknowledgeResult acknowledgeCoordinatorState(
-                       OperatorCoordinatorCheckpointContext coordinatorInfo,
+                       OperatorInfo coordinatorInfo,
                        @Nullable ByteStreamStateHandle stateHandle) {
 
                synchronized (lock) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index c70e229..304eb20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -35,6 +36,7 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -67,7 +69,7 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
        private final int operatorParallelism;
        private final int operatorMaxParallelism;
 
-       private long currentlyTriggeredCheckpoint;
+       private volatile long currentlyTriggeredCheckpoint;
 
        private OperatorCoordinatorHolder(
                        final OperatorID operatorId,
@@ -88,21 +90,25 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
        }
 
        public void lazyInitialize(SchedulerNG scheduler, Executor 
schedulerExecutor) {
-               context.lazyInitialize(scheduler, schedulerExecutor);
+               lazyInitialize(scheduler::handleGlobalFailure, 
schedulerExecutor);
+       }
+
+       @VisibleForTesting
+       void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor 
schedulerExecutor) {
+               context.lazyInitialize(globalFailureHandler, schedulerExecutor);
        }
 
        // 
------------------------------------------------------------------------
        //  Properties
        // 
------------------------------------------------------------------------
 
-       @Override
-       public OperatorID operatorId() {
-               return operatorId;
+       public OperatorCoordinator coordinator() {
+               return coordinator;
        }
 
        @Override
-       public OperatorCoordinator coordinator() {
-               return coordinator;
+       public OperatorID operatorId() {
+               return operatorId;
        }
 
        @Override
@@ -116,39 +122,6 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
        }
 
        // 
------------------------------------------------------------------------
-       //  Checkpointing Callbacks
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void onCallTriggerCheckpoint(long checkpointId) {
-               checkCheckpointAlreadyHappening(checkpointId);
-               currentlyTriggeredCheckpoint = checkpointId;
-       }
-
-       @Override
-       public void onCheckpointStateFutureComplete(long checkpointId) {
-               checkCheckpointAlreadyHappening(checkpointId);
-               eventValve.shutValve();
-       }
-
-       @Override
-       public void afterSourceBarrierInjection(long checkpointId) {
-               checkCheckpointAlreadyHappening(checkpointId);
-               eventValve.openValve();
-               currentlyTriggeredCheckpoint = NO_CHECKPOINT;
-       }
-
-       @Override
-       public void abortCurrentTriggering() {
-               eventValve.openValve();
-               currentlyTriggeredCheckpoint = NO_CHECKPOINT;
-       }
-
-       private void checkCheckpointAlreadyHappening(long checkpointId) {
-               checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || 
currentlyTriggeredCheckpoint == checkpointId);
-       }
-
-       // 
------------------------------------------------------------------------
        //  OperatorCoordinator Interface
        // 
------------------------------------------------------------------------
 
@@ -177,7 +150,20 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
 
        @Override
        public CompletableFuture<byte[]> checkpointCoordinator(long 
checkpointId) throws Exception {
-               return coordinator.checkpointCoordinator(checkpointId);
+               setCurrentlyTriggeredCheckpoint(checkpointId);
+
+               final CompletableFuture<byte[]> checkpointFuture = 
coordinator.checkpointCoordinator(checkpointId);
+
+               // synchronously!!!, with the completion, we need to shut the 
event valve
+               checkpointFuture.whenComplete((ignored, failure) -> {
+                       if (failure != null) {
+                               abortCurrentTriggering();
+                       } else {
+                               onCheckpointStateFutureComplete(checkpointId);
+                       }
+               });
+
+               return checkpointFuture;
        }
 
        @Override
@@ -187,10 +173,47 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
 
        @Override
        public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+               resetCheckpointTriggeringCheck();
+               eventValve.reset();
                coordinator.resetToCheckpoint(checkpointData);
        }
 
        // 
------------------------------------------------------------------------
+       //  Checkpointing Callbacks
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void afterSourceBarrierInjection(long checkpointId) {
+               verifyNoOtherCheckpointBeingTriggered(checkpointId);
+               eventValve.openValve();
+               resetCheckpointTriggeringCheck();
+       }
+
+       @Override
+       public void abortCurrentTriggering() {
+               eventValve.openValve();
+               resetCheckpointTriggeringCheck();
+       }
+
+       void onCheckpointStateFutureComplete(long checkpointId) {
+               verifyNoOtherCheckpointBeingTriggered(checkpointId);
+               eventValve.shutValve();
+       }
+
+       private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) {
+               checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || 
currentlyTriggeredCheckpoint == checkpointId);
+       }
+
+       private void setCurrentlyTriggeredCheckpoint(long checkpointId) {
+               verifyNoOtherCheckpointBeingTriggered(checkpointId);
+               currentlyTriggeredCheckpoint = checkpointId;
+       }
+
+       private void resetCheckpointTriggeringCheck() {
+               currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+       }
+
+       // 
------------------------------------------------------------------------
        //  Factories
        // 
------------------------------------------------------------------------
 
@@ -209,20 +232,41 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                                        return 
executionAttempt.sendOperatorEvent(opId, serializedEvent);
                                };
 
-                       final OperatorEventValve valve = new 
OperatorEventValve(eventSender);
-                       final LazyInitializedCoordinatorContext context = new 
LazyInitializedCoordinatorContext(opId, jobVertex, valve);
-                       final OperatorCoordinator coordinator = 
provider.create(context);
-
-                       return new OperatorCoordinatorHolder(
+                       return create(
                                        opId,
-                                       coordinator,
-                                       context,
-                                       valve,
+                                       provider,
+                                       eventSender,
+                                       jobVertex.getName(),
                                        jobVertex.getParallelism(),
                                        jobVertex.getMaxParallelism());
                }
        }
 
+       @VisibleForTesting
+       static OperatorCoordinatorHolder create(
+                       final OperatorID opId,
+                       final OperatorCoordinator.Provider coordinatorProvider,
+                       final BiFunction<SerializedValue<OperatorEvent>, 
Integer, CompletableFuture<Acknowledge>> eventSender,
+                       final String operatorName,
+                       final int operatorParallelism,
+                       final int operatorMaxParallelism) {
+
+               final OperatorEventValve valve = new 
OperatorEventValve(eventSender);
+
+               final LazyInitializedCoordinatorContext context = new 
LazyInitializedCoordinatorContext(
+                               opId, valve, operatorName, operatorParallelism);
+
+               final OperatorCoordinator coordinator = 
coordinatorProvider.create(context);
+
+               return new OperatorCoordinatorHolder(
+                               opId,
+                               coordinator,
+                               context,
+                               valve,
+                               operatorParallelism,
+                               operatorMaxParallelism);
+       }
+
        // 
------------------------------------------------------------------------
        //  Nested Classes
        // 
------------------------------------------------------------------------
@@ -239,33 +283,36 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
        private static final class LazyInitializedCoordinatorContext implements 
OperatorCoordinator.Context {
 
                private final OperatorID operatorId;
-               private final ExecutionJobVertex jobVertex;
                private final OperatorEventValve eventValve;
+               private final String operatorName;
+               private final int operatorParallelism;
 
-               private SchedulerNG scheduler;
+               private Consumer<Throwable> globalFailureHandler;
                private Executor schedulerExecutor;
 
                public LazyInitializedCoordinatorContext(
-                               OperatorID operatorId,
-                               ExecutionJobVertex jobVertex,
-                               OperatorEventValve eventValve) {
+                               final OperatorID operatorId,
+                               final OperatorEventValve eventValve,
+                               final String operatorName,
+                               final int operatorParallelism) {
                        this.operatorId = checkNotNull(operatorId);
-                       this.jobVertex = checkNotNull(jobVertex);
                        this.eventValve = checkNotNull(eventValve);
+                       this.operatorName = checkNotNull(operatorName);
+                       this.operatorParallelism = operatorParallelism;
                }
 
-               void lazyInitialize(SchedulerNG scheduler, Executor 
schedulerExecutor) {
-                       this.scheduler = checkNotNull(scheduler);
+               void lazyInitialize(Consumer<Throwable> globalFailureHandler, 
Executor schedulerExecutor) {
+                       this.globalFailureHandler = 
checkNotNull(globalFailureHandler);
                        this.schedulerExecutor = 
checkNotNull(schedulerExecutor);
                }
 
                void unInitialize() {
-                       this.scheduler = null;
+                       this.globalFailureHandler = null;
                        this.schedulerExecutor = null;
                }
 
                boolean isInitialized() {
-                       return jobVertex != null;
+                       return schedulerExecutor != null;
                }
 
                private void checkInitialized() {
@@ -309,15 +356,14 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                        checkInitialized();
 
                        final FlinkException e = new FlinkException("Global 
failure triggered by OperatorCoordinator for '" +
-                               jobVertex.getName() + "' (operator " + 
operatorId + ").", cause);
+                               operatorName + "' (operator " + operatorId + 
").", cause);
 
-                       schedulerExecutor.execute(() -> 
scheduler.handleGlobalFailure(e));
+                       schedulerExecutor.execute(() -> 
globalFailureHandler.accept(e));
                }
 
                @Override
                public int currentParallelism() {
-                       checkInitialized();
-                       return jobVertex.getParallelism();
+                       return operatorParallelism;
                }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
similarity index 54%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
index 92fd2aa..a146940 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
@@ -16,25 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.checkpoint;
+package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 
 import java.util.Collection;
 import java.util.stream.Collectors;
 
 /**
- * An {@link OperatorCoordinator} and its contextual information needed to 
trigger and
- * acknowledge a checkpoint.
+ * An interface to access basic properties of an operator in the context of 
its coordinator.
  */
-public interface OperatorCoordinatorCheckpointContext {
-
-       // 
------------------------------------------------------------------------
-       //  properties
-       // 
------------------------------------------------------------------------
-
-       OperatorCoordinator coordinator();
+public interface OperatorInfo {
 
        OperatorID operatorId();
 
@@ -43,24 +35,12 @@ public interface OperatorCoordinatorCheckpointContext {
        int currentParallelism();
 
        // 
------------------------------------------------------------------------
-       //  checkpoint triggering callbacks
-       // 
------------------------------------------------------------------------
-
-       void onCallTriggerCheckpoint(long checkpointId);
-
-       void onCheckpointStateFutureComplete(long checkpointId);
-
-       void afterSourceBarrierInjection(long checkpointId);
-
-       void abortCurrentTriggering();
-
-       // 
------------------------------------------------------------------------
        //  utils
        // 
------------------------------------------------------------------------
 
-       static Collection<OperatorID> 
getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
+       static Collection<OperatorID> getIds(Collection<? extends OperatorInfo> 
infos) {
                return infos.stream()
-                       .map(OperatorCoordinatorCheckpointContext::operatorId)
+                       .map(OperatorInfo::operatorId)
                        .collect(Collectors.toList());
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index b454633..30c1dc2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -32,7 +32,8 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCheckpointContext;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
+import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
@@ -438,7 +439,7 @@ public class PendingCheckpointTest {
        @Test
        public void testInitiallyUnacknowledgedCoordinatorStates() throws 
Exception {
                final PendingCheckpoint checkpoint = 
createPendingCheckpointWithCoordinators(
-                               createOperatorCoordinator(), 
createOperatorCoordinator());
+                               new TestingOperatorInfo(), new 
TestingOperatorInfo());
 
                assertEquals(2, 
checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
                assertFalse(checkpoint.isFullyAcknowledged());
@@ -446,8 +447,8 @@ public class PendingCheckpointTest {
 
        @Test
        public void testAcknowledgedCoordinatorStates() throws Exception {
-               final OperatorCoordinatorCheckpointContext coord1 = 
createOperatorCoordinator();
-               final OperatorCoordinatorCheckpointContext coord2 = 
createOperatorCoordinator();
+               final OperatorInfo coord1 = new TestingOperatorInfo();
+               final OperatorInfo coord2 = new TestingOperatorInfo();
                final PendingCheckpoint checkpoint = 
createPendingCheckpointWithCoordinators(coord1, coord2);
 
                final TaskAcknowledgeResult ack1 = 
checkpoint.acknowledgeCoordinatorState(coord1, new TestingStreamStateHandle());
@@ -462,7 +463,7 @@ public class PendingCheckpointTest {
 
        @Test
        public void testDuplicateAcknowledgeCoordinator() throws Exception {
-               final OperatorCoordinatorCheckpointContext coordinator = 
createOperatorCoordinator();
+               final OperatorInfo coordinator = new TestingOperatorInfo();
                final PendingCheckpoint checkpoint = 
createPendingCheckpointWithCoordinators(coordinator);
 
                checkpoint.acknowledgeCoordinatorState(coordinator, new 
TestingStreamStateHandle());
@@ -473,9 +474,9 @@ public class PendingCheckpointTest {
 
        @Test
        public void testAcknowledgeUnknownCoordinator() throws Exception {
-               final PendingCheckpoint checkpoint = 
createPendingCheckpointWithCoordinators(createOperatorCoordinator());
+               final PendingCheckpoint checkpoint = 
createPendingCheckpointWithCoordinators(new TestingOperatorInfo());
 
-               final TaskAcknowledgeResult ack = 
checkpoint.acknowledgeCoordinatorState(createOperatorCoordinator(), null);
+               final TaskAcknowledgeResult ack = 
checkpoint.acknowledgeCoordinatorState(new TestingOperatorInfo(), null);
 
                assertEquals(TaskAcknowledgeResult.UNKNOWN, ack);
        }
@@ -507,11 +508,11 @@ public class PendingCheckpointTest {
        }
 
        private PendingCheckpoint createPendingCheckpointWithCoordinators(
-                       OperatorCoordinatorCheckpointContext... coordinators) 
throws IOException {
+                               OperatorInfo... coordinators) throws 
IOException {
 
                final PendingCheckpoint checkpoint = createPendingCheckpoint(
                                
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                               
OperatorCoordinatorCheckpointContext.getIds(Arrays.asList(coordinators)),
+                               
OperatorInfo.getIds(Arrays.asList(coordinators)),
                                Collections.emptyList(),
                                Executors.directExecutor());
 
@@ -520,9 +521,9 @@ public class PendingCheckpointTest {
        }
 
        private PendingCheckpoint 
createPendingCheckpointWithAcknowledgedCoordinators(ByteStreamStateHandle... 
handles) throws IOException {
-               OperatorCoordinatorCheckpointContext[] coords = new 
OperatorCoordinatorCheckpointContext[handles.length];
+               final OperatorInfo[] coords = new OperatorInfo[handles.length];
                for (int i = 0; i < handles.length; i++) {
-                       coords[i] = createOperatorCoordinator();
+                       coords[i] = new TestingOperatorInfo();
                }
 
                final PendingCheckpoint checkpoint = 
createPendingCheckpointWithCoordinators(coords);
@@ -562,10 +563,6 @@ public class PendingCheckpointTest {
                        new CompletableFuture<>());
        }
 
-       private static OperatorCoordinatorCheckpointContext 
createOperatorCoordinator() {
-               return new MockOperatorCoordinatorCheckpointContext();
-       }
-
        @SuppressWarnings("unchecked")
        static void setTaskState(PendingCheckpoint pending, OperatorState 
state) throws NoSuchFieldException, IllegalAccessException {
                Field field = 
PendingCheckpoint.class.getDeclaredField("operatorStates");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
similarity index 60%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
index 7cab82f..f8d4a1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
@@ -18,40 +18,31 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
-import 
org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
 /**
- * A testing mock implementation of the {@link 
OperatorCoordinatorCheckpointContext}.
+ * A testing implementation of the {@link OperatorInfo}.
  */
-public class MockOperatorCoordinatorCheckpointContext implements 
OperatorCoordinatorCheckpointContext {
+public class TestingOperatorInfo implements OperatorInfo {
 
        private final OperatorID operatorId;
-       private final OperatorCoordinator coordinator;
        private final int parallelism;
        private final int maxParallelism;
 
-       public MockOperatorCoordinatorCheckpointContext() {
-               this(new OperatorID(), new MockOperatorCoordinator(), 50, 256);
+       public TestingOperatorInfo() {
+               this(new OperatorID(), 50, 256);
        }
 
-       public MockOperatorCoordinatorCheckpointContext(
+       public TestingOperatorInfo(
                        OperatorID operatorId,
-                       OperatorCoordinator coordinator,
                        int parallelism,
                        int maxParallelism) {
                this.operatorId = operatorId;
-               this.coordinator = coordinator;
                this.parallelism = parallelism;
                this.maxParallelism = maxParallelism;
        }
 
        @Override
-       public OperatorCoordinator coordinator() {
-               return coordinator;
-       }
-
-       @Override
        public OperatorID operatorId() {
                return operatorId;
        }
@@ -65,16 +56,4 @@ public class MockOperatorCoordinatorCheckpointContext 
implements OperatorCoordin
        public int currentParallelism() {
                return parallelism;
        }
-
-       @Override
-       public void onCallTriggerCheckpoint(long checkpointId) {}
-
-       @Override
-       public void onCheckpointStateFutureComplete(long checkpointId) {}
-
-       @Override
-       public void afterSourceBarrierInjection(long checkpointId) {}
-
-       @Override
-       public void abortCurrentTriggering() {}
 }

Reply via email to