This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 32c51c96880400fdd3ded0292b09e5c8d8ea28bb Author: Stephan Ewen <se...@apache.org> AuthorDate: Thu May 28 18:18:41 2020 +0200 [FLINK-16986][coordination][refactor] Reduce dependencies of OperatorCoordinatorHolder and OperatorCoordinatorCheckpointContext This simplifies both testing and future refactoring. --- .../runtime/checkpoint/CheckpointCoordinator.java | 7 +- .../OperatorCoordinatorCheckpointContext.java | 41 +---- .../checkpoint/OperatorCoordinatorCheckpoints.java | 20 +-- .../runtime/checkpoint/PendingCheckpoint.java | 3 +- .../coordination/OperatorCoordinatorHolder.java | 172 +++++++++++++-------- .../coordination/OperatorInfo.java} | 30 +--- .../runtime/checkpoint/PendingCheckpointTest.java | 27 ++-- ...kpointContext.java => TestingOperatorInfo.java} | 31 +--- 8 files changed, 151 insertions(+), 180 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 51b98f5..da518ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.operators.coordination.OperatorInfo; import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; @@ -626,7 +627,7 @@ public class CheckpointCoordinator { checkpointID, timestamp, ackTasks, - OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint), + OperatorInfo.getIds(coordinatorsToCheckpoint), masterHooks.keySet(), props, checkpointStorageLocation, @@ -1074,7 +1075,7 @@ public class CheckpointCoordinator { // commit coordinators for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) { - coordinatorContext.coordinator().checkpointComplete(checkpointId); + coordinatorContext.checkpointComplete(checkpointId); } } @@ -1496,7 +1497,7 @@ public class CheckpointCoordinator { final ByteStreamStateHandle coordinatorState = state.getCoordinatorState(); if (coordinatorState != null) { - coordContext.coordinator().resetToCheckpoint(coordinatorState.getData()); + coordContext.resetToCheckpoint(coordinatorState.getData()); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java index 92fd2aa..abc15b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java @@ -18,49 +18,24 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorInfo; -import java.util.Collection; -import java.util.stream.Collectors; +import java.util.concurrent.CompletableFuture; /** - * An {@link OperatorCoordinator} and its contextual information needed to trigger and - * acknowledge a checkpoint. + * This context is the interface through which the {@link CheckpointCoordinator} interacts with an + * {@link OperatorCoordinator} during checkpointing and checkpoint restoring. */ -public interface OperatorCoordinatorCheckpointContext { +public interface OperatorCoordinatorCheckpointContext extends OperatorInfo { - // ------------------------------------------------------------------------ - // properties - // ------------------------------------------------------------------------ - - OperatorCoordinator coordinator(); - - OperatorID operatorId(); - - int maxParallelism(); - - int currentParallelism(); - - // ------------------------------------------------------------------------ - // checkpoint triggering callbacks - // ------------------------------------------------------------------------ - - void onCallTriggerCheckpoint(long checkpointId); - - void onCheckpointStateFutureComplete(long checkpointId); + CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception; void afterSourceBarrierInjection(long checkpointId); void abortCurrentTriggering(); - // ------------------------------------------------------------------------ - // utils - // ------------------------------------------------------------------------ + void checkpointComplete(long checkpointId); - static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) { - return infos.stream() - .map(OperatorCoordinatorCheckpointContext::operatorId) - .collect(Collectors.toList()); - } + void resetToCheckpoint(byte[] checkpointData) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java index 68fc8f1..6215123 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorInfo; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import java.util.ArrayList; @@ -39,15 +40,15 @@ import java.util.concurrent.Executor; final class OperatorCoordinatorCheckpoints { public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint( - final OperatorCoordinatorCheckpointContext coordinatorInfo, + final OperatorCoordinatorCheckpointContext coordinatorContext, final long checkpointId) throws Exception { final CompletableFuture<byte[]> checkpointFuture = - coordinatorInfo.coordinator().checkpointCoordinator(checkpointId); + coordinatorContext.checkpointCoordinator(checkpointId); return checkpointFuture.thenApply( (state) -> new CoordinatorSnapshot( - coordinatorInfo, new ByteStreamStateHandle(coordinatorInfo.operatorId().toString(), state)) + coordinatorContext, new ByteStreamStateHandle(coordinatorContext.operatorId().toString(), state)) ); } @@ -59,16 +60,7 @@ final class OperatorCoordinatorCheckpoints { for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) { final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId); - coordinator.onCallTriggerCheckpoint(checkpointId); - individualSnapshots.add(checkpointFuture); - checkpointFuture.whenComplete((ignored, failure) -> { - if (failure != null) { - coordinator.abortCurrentTriggering(); - } else { - coordinator.onCheckpointStateFutureComplete(checkpointId); - } - }); } return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new); @@ -144,10 +136,10 @@ final class OperatorCoordinatorCheckpoints { static final class CoordinatorSnapshot { - final OperatorCoordinatorCheckpointContext coordinator; + final OperatorInfo coordinator; final ByteStreamStateHandle state; - CoordinatorSnapshot(OperatorCoordinatorCheckpointContext coordinator, ByteStreamStateHandle state) { + CoordinatorSnapshot(OperatorInfo coordinator, ByteStreamStateHandle state) { this.coordinator = coordinator; this.state = state; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 2a0eba7..376301d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorInfo; import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; @@ -431,7 +432,7 @@ public class PendingCheckpoint { } public TaskAcknowledgeResult acknowledgeCoordinatorState( - OperatorCoordinatorCheckpointContext coordinatorInfo, + OperatorInfo coordinatorInfo, @Nullable ByteStreamStateHandle stateHandle) { synchronized (lock) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index c70e229..304eb20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -35,6 +36,7 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.BiFunction; +import java.util.function.Consumer; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -67,7 +69,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC private final int operatorParallelism; private final int operatorMaxParallelism; - private long currentlyTriggeredCheckpoint; + private volatile long currentlyTriggeredCheckpoint; private OperatorCoordinatorHolder( final OperatorID operatorId, @@ -88,21 +90,25 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC } public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) { - context.lazyInitialize(scheduler, schedulerExecutor); + lazyInitialize(scheduler::handleGlobalFailure, schedulerExecutor); + } + + @VisibleForTesting + void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) { + context.lazyInitialize(globalFailureHandler, schedulerExecutor); } // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ - @Override - public OperatorID operatorId() { - return operatorId; + public OperatorCoordinator coordinator() { + return coordinator; } @Override - public OperatorCoordinator coordinator() { - return coordinator; + public OperatorID operatorId() { + return operatorId; } @Override @@ -116,39 +122,6 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC } // ------------------------------------------------------------------------ - // Checkpointing Callbacks - // ------------------------------------------------------------------------ - - @Override - public void onCallTriggerCheckpoint(long checkpointId) { - checkCheckpointAlreadyHappening(checkpointId); - currentlyTriggeredCheckpoint = checkpointId; - } - - @Override - public void onCheckpointStateFutureComplete(long checkpointId) { - checkCheckpointAlreadyHappening(checkpointId); - eventValve.shutValve(); - } - - @Override - public void afterSourceBarrierInjection(long checkpointId) { - checkCheckpointAlreadyHappening(checkpointId); - eventValve.openValve(); - currentlyTriggeredCheckpoint = NO_CHECKPOINT; - } - - @Override - public void abortCurrentTriggering() { - eventValve.openValve(); - currentlyTriggeredCheckpoint = NO_CHECKPOINT; - } - - private void checkCheckpointAlreadyHappening(long checkpointId) { - checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId); - } - - // ------------------------------------------------------------------------ // OperatorCoordinator Interface // ------------------------------------------------------------------------ @@ -177,7 +150,20 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC @Override public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception { - return coordinator.checkpointCoordinator(checkpointId); + setCurrentlyTriggeredCheckpoint(checkpointId); + + final CompletableFuture<byte[]> checkpointFuture = coordinator.checkpointCoordinator(checkpointId); + + // synchronously!!!, with the completion, we need to shut the event valve + checkpointFuture.whenComplete((ignored, failure) -> { + if (failure != null) { + abortCurrentTriggering(); + } else { + onCheckpointStateFutureComplete(checkpointId); + } + }); + + return checkpointFuture; } @Override @@ -187,10 +173,47 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC @Override public void resetToCheckpoint(byte[] checkpointData) throws Exception { + resetCheckpointTriggeringCheck(); + eventValve.reset(); coordinator.resetToCheckpoint(checkpointData); } // ------------------------------------------------------------------------ + // Checkpointing Callbacks + // ------------------------------------------------------------------------ + + @Override + public void afterSourceBarrierInjection(long checkpointId) { + verifyNoOtherCheckpointBeingTriggered(checkpointId); + eventValve.openValve(); + resetCheckpointTriggeringCheck(); + } + + @Override + public void abortCurrentTriggering() { + eventValve.openValve(); + resetCheckpointTriggeringCheck(); + } + + void onCheckpointStateFutureComplete(long checkpointId) { + verifyNoOtherCheckpointBeingTriggered(checkpointId); + eventValve.shutValve(); + } + + private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) { + checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId); + } + + private void setCurrentlyTriggeredCheckpoint(long checkpointId) { + verifyNoOtherCheckpointBeingTriggered(checkpointId); + currentlyTriggeredCheckpoint = checkpointId; + } + + private void resetCheckpointTriggeringCheck() { + currentlyTriggeredCheckpoint = NO_CHECKPOINT; + } + + // ------------------------------------------------------------------------ // Factories // ------------------------------------------------------------------------ @@ -209,20 +232,41 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC return executionAttempt.sendOperatorEvent(opId, serializedEvent); }; - final OperatorEventValve valve = new OperatorEventValve(eventSender); - final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex, valve); - final OperatorCoordinator coordinator = provider.create(context); - - return new OperatorCoordinatorHolder( + return create( opId, - coordinator, - context, - valve, + provider, + eventSender, + jobVertex.getName(), jobVertex.getParallelism(), jobVertex.getMaxParallelism()); } } + @VisibleForTesting + static OperatorCoordinatorHolder create( + final OperatorID opId, + final OperatorCoordinator.Provider coordinatorProvider, + final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender, + final String operatorName, + final int operatorParallelism, + final int operatorMaxParallelism) { + + final OperatorEventValve valve = new OperatorEventValve(eventSender); + + final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext( + opId, valve, operatorName, operatorParallelism); + + final OperatorCoordinator coordinator = coordinatorProvider.create(context); + + return new OperatorCoordinatorHolder( + opId, + coordinator, + context, + valve, + operatorParallelism, + operatorMaxParallelism); + } + // ------------------------------------------------------------------------ // Nested Classes // ------------------------------------------------------------------------ @@ -239,33 +283,36 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC private static final class LazyInitializedCoordinatorContext implements OperatorCoordinator.Context { private final OperatorID operatorId; - private final ExecutionJobVertex jobVertex; private final OperatorEventValve eventValve; + private final String operatorName; + private final int operatorParallelism; - private SchedulerNG scheduler; + private Consumer<Throwable> globalFailureHandler; private Executor schedulerExecutor; public LazyInitializedCoordinatorContext( - OperatorID operatorId, - ExecutionJobVertex jobVertex, - OperatorEventValve eventValve) { + final OperatorID operatorId, + final OperatorEventValve eventValve, + final String operatorName, + final int operatorParallelism) { this.operatorId = checkNotNull(operatorId); - this.jobVertex = checkNotNull(jobVertex); this.eventValve = checkNotNull(eventValve); + this.operatorName = checkNotNull(operatorName); + this.operatorParallelism = operatorParallelism; } - void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) { - this.scheduler = checkNotNull(scheduler); + void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) { + this.globalFailureHandler = checkNotNull(globalFailureHandler); this.schedulerExecutor = checkNotNull(schedulerExecutor); } void unInitialize() { - this.scheduler = null; + this.globalFailureHandler = null; this.schedulerExecutor = null; } boolean isInitialized() { - return jobVertex != null; + return schedulerExecutor != null; } private void checkInitialized() { @@ -309,15 +356,14 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC checkInitialized(); final FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" + - jobVertex.getName() + "' (operator " + operatorId + ").", cause); + operatorName + "' (operator " + operatorId + ").", cause); - schedulerExecutor.execute(() -> scheduler.handleGlobalFailure(e)); + schedulerExecutor.execute(() -> globalFailureHandler.accept(e)); } @Override public int currentParallelism() { - checkInitialized(); - return jobVertex.getParallelism(); + return operatorParallelism; } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java similarity index 54% copy from flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java index 92fd2aa..a146940 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java @@ -16,25 +16,17 @@ * limitations under the License. */ -package org.apache.flink.runtime.checkpoint; +package org.apache.flink.runtime.operators.coordination; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import java.util.Collection; import java.util.stream.Collectors; /** - * An {@link OperatorCoordinator} and its contextual information needed to trigger and - * acknowledge a checkpoint. + * An interface to access basic properties of an operator in the context of its coordinator. */ -public interface OperatorCoordinatorCheckpointContext { - - // ------------------------------------------------------------------------ - // properties - // ------------------------------------------------------------------------ - - OperatorCoordinator coordinator(); +public interface OperatorInfo { OperatorID operatorId(); @@ -43,24 +35,12 @@ public interface OperatorCoordinatorCheckpointContext { int currentParallelism(); // ------------------------------------------------------------------------ - // checkpoint triggering callbacks - // ------------------------------------------------------------------------ - - void onCallTriggerCheckpoint(long checkpointId); - - void onCheckpointStateFutureComplete(long checkpointId); - - void afterSourceBarrierInjection(long checkpointId); - - void abortCurrentTriggering(); - - // ------------------------------------------------------------------------ // utils // ------------------------------------------------------------------------ - static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) { + static Collection<OperatorID> getIds(Collection<? extends OperatorInfo> infos) { return infos.stream() - .map(OperatorCoordinatorCheckpointContext::operatorId) + .map(OperatorInfo::operatorId) .collect(Collectors.toList()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index b454633..30c1dc2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -32,7 +32,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCheckpointContext; +import org.apache.flink.runtime.operators.coordination.OperatorInfo; +import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.TestingStreamStateHandle; @@ -438,7 +439,7 @@ public class PendingCheckpointTest { @Test public void testInitiallyUnacknowledgedCoordinatorStates() throws Exception { final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators( - createOperatorCoordinator(), createOperatorCoordinator()); + new TestingOperatorInfo(), new TestingOperatorInfo()); assertEquals(2, checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators()); assertFalse(checkpoint.isFullyAcknowledged()); @@ -446,8 +447,8 @@ public class PendingCheckpointTest { @Test public void testAcknowledgedCoordinatorStates() throws Exception { - final OperatorCoordinatorCheckpointContext coord1 = createOperatorCoordinator(); - final OperatorCoordinatorCheckpointContext coord2 = createOperatorCoordinator(); + final OperatorInfo coord1 = new TestingOperatorInfo(); + final OperatorInfo coord2 = new TestingOperatorInfo(); final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coord1, coord2); final TaskAcknowledgeResult ack1 = checkpoint.acknowledgeCoordinatorState(coord1, new TestingStreamStateHandle()); @@ -462,7 +463,7 @@ public class PendingCheckpointTest { @Test public void testDuplicateAcknowledgeCoordinator() throws Exception { - final OperatorCoordinatorCheckpointContext coordinator = createOperatorCoordinator(); + final OperatorInfo coordinator = new TestingOperatorInfo(); final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coordinator); checkpoint.acknowledgeCoordinatorState(coordinator, new TestingStreamStateHandle()); @@ -473,9 +474,9 @@ public class PendingCheckpointTest { @Test public void testAcknowledgeUnknownCoordinator() throws Exception { - final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(createOperatorCoordinator()); + final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(new TestingOperatorInfo()); - final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(createOperatorCoordinator(), null); + final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(new TestingOperatorInfo(), null); assertEquals(TaskAcknowledgeResult.UNKNOWN, ack); } @@ -507,11 +508,11 @@ public class PendingCheckpointTest { } private PendingCheckpoint createPendingCheckpointWithCoordinators( - OperatorCoordinatorCheckpointContext... coordinators) throws IOException { + OperatorInfo... coordinators) throws IOException { final PendingCheckpoint checkpoint = createPendingCheckpoint( CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - OperatorCoordinatorCheckpointContext.getIds(Arrays.asList(coordinators)), + OperatorInfo.getIds(Arrays.asList(coordinators)), Collections.emptyList(), Executors.directExecutor()); @@ -520,9 +521,9 @@ public class PendingCheckpointTest { } private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(ByteStreamStateHandle... handles) throws IOException { - OperatorCoordinatorCheckpointContext[] coords = new OperatorCoordinatorCheckpointContext[handles.length]; + final OperatorInfo[] coords = new OperatorInfo[handles.length]; for (int i = 0; i < handles.length; i++) { - coords[i] = createOperatorCoordinator(); + coords[i] = new TestingOperatorInfo(); } final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coords); @@ -562,10 +563,6 @@ public class PendingCheckpointTest { new CompletableFuture<>()); } - private static OperatorCoordinatorCheckpointContext createOperatorCoordinator() { - return new MockOperatorCoordinatorCheckpointContext(); - } - @SuppressWarnings("unchecked") static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException { Field field = PendingCheckpoint.class.getDeclaredField("operatorStates"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java similarity index 60% rename from flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java index 7cab82f..f8d4a1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java @@ -18,40 +18,31 @@ package org.apache.flink.runtime.operators.coordination; -import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext; import org.apache.flink.runtime.jobgraph.OperatorID; /** - * A testing mock implementation of the {@link OperatorCoordinatorCheckpointContext}. + * A testing implementation of the {@link OperatorInfo}. */ -public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordinatorCheckpointContext { +public class TestingOperatorInfo implements OperatorInfo { private final OperatorID operatorId; - private final OperatorCoordinator coordinator; private final int parallelism; private final int maxParallelism; - public MockOperatorCoordinatorCheckpointContext() { - this(new OperatorID(), new MockOperatorCoordinator(), 50, 256); + public TestingOperatorInfo() { + this(new OperatorID(), 50, 256); } - public MockOperatorCoordinatorCheckpointContext( + public TestingOperatorInfo( OperatorID operatorId, - OperatorCoordinator coordinator, int parallelism, int maxParallelism) { this.operatorId = operatorId; - this.coordinator = coordinator; this.parallelism = parallelism; this.maxParallelism = maxParallelism; } @Override - public OperatorCoordinator coordinator() { - return coordinator; - } - - @Override public OperatorID operatorId() { return operatorId; } @@ -65,16 +56,4 @@ public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordin public int currentParallelism() { return parallelism; } - - @Override - public void onCallTriggerCheckpoint(long checkpointId) {} - - @Override - public void onCheckpointStateFutureComplete(long checkpointId) {} - - @Override - public void afterSourceBarrierInjection(long checkpointId) {} - - @Override - public void abortCurrentTriggering() {} }