This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9dce8f0dcf68492240933a74dbce760ae25ce07f Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Sun Oct 25 10:07:16 2020 +0800 Backport of "[FLINK-19698][connector/common] Let SplitEnumerator/SourceReader/SourceCoordinator implement CheckpointListener." The backport does not move the 'CheckpointListener' interface, to not break this between minor versions. Instead, it lets the Reader and Enumerator offer the same methods directly. This is compatible with inheriting these methods in 1.12 (and 1.12 adds the interface to the class inheritance signature, which is also non-breaking). --- .../base/source/reader/SourceReaderBase.java | 2 +- .../base/source/reader/SourceReaderBaseTest.java | 6 ++- .../base/source/reader/SourceReaderTestBase.java | 2 +- .../base/source/reader/mocks/MockSourceReader.java | 5 ++ .../flink/api/connector/source/SourceReader.java | 19 +++++++- .../api/connector/source/SplitEnumerator.java | 17 ++++++- .../source/lib/util/IteratorSourceReader.java | 2 +- .../source/lib/NumberSequenceSourceTest.java | 2 +- .../connector/source/mocks/MockSourceReader.java | 22 ++++++++- .../source/mocks/MockSplitEnumerator.java | 21 +++++++-- .../runtime/checkpoint/CheckpointCoordinator.java | 7 ++- .../OperatorCoordinatorCheckpointContext.java | 19 +++++++- .../coordination/OperatorCoordinator.java | 40 +++++++--------- .../coordination/OperatorCoordinatorHolder.java | 18 +++++--- .../RecreateOnResetOperatorCoordinator.java | 4 +- .../source/coordinator/SourceCoordinator.java | 22 +++++++-- .../checkpoint/CheckpointCoordinatorTest.java | 53 ++++++++++++++++++++++ .../CheckpointCoordinatorTestingUtils.java | 19 +++++++- .../CoordinatorEventsExactlyOnceITCase.java | 2 +- .../coordination/MockOperatorCoordinator.java | 2 +- .../OperatorCoordinatorHolderTest.java | 2 +- .../coordination/TestingOperatorCoordinator.java | 2 +- .../source/coordinator/SourceCoordinatorTest.java | 12 ++++- .../streaming/api/operators/SourceOperator.java | 17 ++++++- .../collect/CollectSinkOperatorCoordinator.java | 2 +- .../api/operators/SourceOperatorTest.java | 53 +++++++++++++++++++++- .../source/SourceOperatorEventTimeTest.java | 2 +- .../operators/source/TestingSourceOperator.java | 13 ++++++ 28 files changed, 321 insertions(+), 66 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index e1da654..2c6c1812 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -204,7 +204,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt } @Override - public List<SplitT> snapshotState() { + public List<SplitT> snapshotState(long checkpointId) { List<SplitT> splits = new ArrayList<>(); splitStates.forEach((id, context) -> splits.add(toSplitType(id, context.state))); return splits; diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 6396b2e..da057da 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -185,8 +185,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> new TestingReaderContext()) { @Override - protected void onSplitFinished(Collection<String> finishedSplitIds) { - } + public void notifyCheckpointComplete(long checkpointId) throws Exception {} + + @Override + protected void onSplitFinished(Collection<String> finishedSplitIds) {} @Override protected TestingSourceSplit initializedState(TestingSourceSplit split) { diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java index 22dda52..015cd89 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java @@ -132,7 +132,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T List<SplitT> splits = getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED); try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_SPLITS * NUM_RECORDS_PER_SPLIT)) { - List<SplitT> state = reader.snapshotState(); + List<SplitT> state = reader.snapshotState(1L); assertEquals("The snapshot should only have 10 splits. ", NUM_SPLITS, state.size()); for (int i = 0; i < NUM_SPLITS; i++) { assertEquals("The first four splits should have been fully consumed.", diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java index 66022db..9de22aa 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java @@ -57,4 +57,9 @@ public class MockSourceReader protected MockSourceSplit toSplitType(String splitId, AtomicInteger splitState) { return new MockSourceSplit(Integer.parseInt(splitId), splitState.get()); } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java index 3e623dd..0aac1b1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java @@ -32,7 +32,8 @@ import java.util.concurrent.CompletableFuture; * @param <SplitT> The type of the the source splits. */ @PublicEvolving -public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable { +public interface SourceReader<T, SplitT extends SourceSplit> + extends AutoCloseable { /** * Start the reader. @@ -58,7 +59,7 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab * * @return the state of the source. */ - List<SplitT> snapshotState(); + List<SplitT> snapshotState(long checkpointId); /** * @return a future that will be completed once there is a record available to poll. @@ -78,4 +79,18 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab * @param sourceEvent the event sent by the {@link SplitEnumerator}. */ void handleSourceEvents(SourceEvent sourceEvent); + + /** + * We have an empty default implementation here because most source readers do not have + * to implement the method. + */ + default void notifyCheckpointComplete(long checkpointId) throws Exception {} + + /** + * Called when a checkpoint is aborted. + * + * <p>NOTE: This method is here only in the backport to the Flink 1.11 branch. + * In 1.12, this default method is inherited from the CheckpointListener interface. + */ + default void notifyCheckpointAborted(long checkpointId) {} } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java index 1e429bb..e97f5e1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java @@ -29,7 +29,8 @@ import java.util.List; * 2. assign the splits to the source reader. */ @PublicEvolving -public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable { +public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> + extends AutoCloseable { /** * Start the split enumerator. @@ -76,4 +77,18 @@ public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extend */ @Override void close() throws IOException; + + /** + * We have an empty default implementation here because most source readers do not have + * to implement the method. + */ + default void notifyCheckpointComplete(long checkpointId) throws Exception {} + + /** + * Called when a checkpoint is aborted. + * + * <p>NOTE: This method is here only in the backport to the Flink 1.11 branch. + * In 1.12, this default method is inherited from the CheckpointListener interface. + */ + default void notifyCheckpointAborted(long checkpointId) {} } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java index 54ddb35..3af75e1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java @@ -122,7 +122,7 @@ public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends I } @Override - public List<SplitT> snapshotState() { + public List<SplitT> snapshotState(long checkpointId) { final ArrayList<SplitT> allSplits = new ArrayList<>(1 + remainingSplits.size()); if (iterator != null) { @SuppressWarnings("unchecked") diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java index 450983e..48cda2d 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java @@ -54,7 +54,7 @@ public class NumberSequenceSourceTest { if (--remainingInCycle <= 0) { remainingInCycle = elementsPerCycle; // checkpoint - List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(); + List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L); // re-create and restore reader = createReader(); diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index e48f827..b0a24dd 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -35,6 +35,8 @@ import java.util.concurrent.CompletableFuture; public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> { private final List<MockSourceSplit> assignedSplits = new ArrayList<>(); private final List<SourceEvent> receivedSourceEvents = new ArrayList<>(); + private final List<Long> completedCheckpoints = new ArrayList<>(); + private final List<Long> abortedCheckpoints = new ArrayList<>(); private final boolean markIdleOnNoSplits; private int currentSplitIndex = 0; @@ -90,7 +92,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } @Override - public List<MockSourceSplit> snapshotState() { + public List<MockSourceSplit> snapshotState(long checkpointId) { return assignedSplits; } @@ -119,6 +121,16 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> this.closed = true; } + @Override + public void notifyCheckpointComplete(long checkpointId) { + completedCheckpoints.add(checkpointId); + } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + abortedCheckpoints.add(checkpointId); + } + private synchronized void markUnavailable() { if (availableFuture.isDone()) { availableFuture = new CompletableFuture<>(); @@ -155,6 +167,14 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> return receivedSourceEvents; } + public List<Long> getCompletedCheckpoints() { + return completedCheckpoints; + } + + public List<Long> getAbortedCheckpoints() { + return abortedCheckpoints; + } + /** * Simple event allowing {@link MockSourceReader} to finish when requested. */ diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java index 93f6c77..0525533 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java @@ -39,11 +39,12 @@ import java.util.TreeSet; * A mock {@link SplitEnumerator} for unit tests. */ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> { - private SortedSet<MockSourceSplit> unassignedSplits; - private SplitEnumeratorContext<MockSourceSplit> enumContext; - private List<SourceEvent> handledSourceEvent; - private boolean started; - private boolean closed; + private final SortedSet<MockSourceSplit> unassignedSplits; + private final SplitEnumeratorContext<MockSourceSplit> enumContext; + private final List<SourceEvent> handledSourceEvent; + private final List<Long> successfulCheckpoints; + private volatile boolean started; + private volatile boolean closed; public MockSplitEnumerator(int numSplits, SplitEnumeratorContext<MockSourceSplit> enumContext) { this(new HashSet<>(), enumContext); @@ -59,6 +60,7 @@ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set this.unassignedSplits.addAll(unassignedSplits); this.enumContext = enumContext; this.handledSourceEvent = new ArrayList<>(); + this.successfulCheckpoints = new ArrayList<>(); this.started = false; this.closed = false; } @@ -96,6 +98,11 @@ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set } @Override + public void notifyCheckpointComplete(long checkpointId) { + successfulCheckpoints.add(checkpointId); + } + + @Override public void close() throws IOException { this.closed = true; } @@ -123,6 +130,10 @@ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set return handledSourceEvent; } + public List<Long> getSuccessfulCheckpoints() { + return successfulCheckpoints; + } + // -------------------- private void assignAllSplits() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 26cf391..0e61a11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1144,7 +1144,7 @@ public class CheckpointCoordinator { // commit coordinators for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) { - coordinatorContext.checkpointComplete(checkpointId); + coordinatorContext.notifyCheckpointComplete(checkpointId); } } @@ -1159,6 +1159,11 @@ public class CheckpointCoordinator { } } }); + + // commit coordinators + for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) { + coordinatorContext.notifyCheckpointAborted(checkpointId); + } } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java index 94d4e07..f5f72d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorInfo; +import org.apache.flink.runtime.state.CheckpointListener; import java.util.concurrent.CompletableFuture; @@ -27,7 +28,7 @@ import java.util.concurrent.CompletableFuture; * This context is the interface through which the {@link CheckpointCoordinator} interacts with an * {@link OperatorCoordinator} during checkpointing and checkpoint restoring. */ -public interface OperatorCoordinatorCheckpointContext extends OperatorInfo { +public interface OperatorCoordinatorCheckpointContext extends OperatorInfo, CheckpointListener { void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception; @@ -35,7 +36,21 @@ public interface OperatorCoordinatorCheckpointContext extends OperatorInfo { void abortCurrentTriggering(); - void checkpointComplete(long checkpointId); + /** + * We override the method here to remove the checked exception. Please check the + * Java docs of {@link CheckpointListener#notifyCheckpointComplete(long)} for more + * detail semantic of the method. + */ + @Override + void notifyCheckpointComplete(long checkpointId); + + /** + * We override the method here to remove the checked exception. Please check the + * Java docs of {@link CheckpointListener#notifyCheckpointAborted(long)} for more + * detail semantic of the method. + */ + @Override + default void notifyCheckpointAborted(long checkpointId) {} void resetToCheckpoint(byte[] checkpointData) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java index fff88c7..bc280f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators.coordination; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.state.CheckpointListener; import javax.annotation.Nullable; @@ -43,7 +44,7 @@ import java.util.concurrent.CompletableFuture; * The methods on the {@link Context} are safe to be called from another thread than the thread that * calls the Coordinator's methods. */ -public interface OperatorCoordinator extends AutoCloseable { +public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { /** * Starts the coordinator. This method is called once at the beginning, before any other methods. @@ -102,27 +103,20 @@ public interface OperatorCoordinator extends AutoCloseable { void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception; /** - * Notifies the coordinator that the checkpoint with the given checkpointId completes and - * was committed. - * - * <h3>Checkpoint Subsuming</h3> - * - * <p>Checkpoint IDs are strictly increasing. A checkpoint with higher ID always subsumes - * a checkpoint with lower ID. For example, when checkpoint T is confirmed complete, the - * code should treat all checkpoints with lower ID (T-1, T-2, etc.) also as confirmed. - * - * <h3>Exceptions</h3> - * - * <p>This method is not supposed to throw an exception indicating the the checkpoint cannot - * be completed. By the time we notify that the checkpoint is complete, the checkpoint is - * committed and cannot be aborted any more. - * - * <p>If the coordinator gets into an inconsistent state internally, as a result of logic that - * runs after this notification, it should fail the job ({@link Context#failJob(Throwable)}) - * instead. Any exception propagating from this method may be treated as a fatal error for the - * JobManager, crashing the JobManager, and leading to an expensive "master failover" procedure. + * We override the method here to remove the checked exception. Please check the + * Java docs of {@link CheckpointListener#notifyCheckpointComplete(long)} for more + * detail semantic of the method. */ - void checkpointComplete(long checkpointId); + @Override + void notifyCheckpointComplete(long checkpointId); + + /** + * We override the method here to remove the checked exception. Please check the + * Java docs of {@link CheckpointListener#notifyCheckpointAborted(long)} for more + * detail semantic of the method. + */ + @Override + default void notifyCheckpointAborted(long checkpointId) {} /** * Resets the coordinator to the given checkpoint. @@ -139,9 +133,9 @@ public interface OperatorCoordinator extends AutoCloseable { * It is safe to commit side-effects that are predicated on checkpoint completion after this * call. * - * <p>Even if no call to {@link #checkpointComplete(long)} happened, the checkpoint can still be + * <p>Even if no call to {@link #notifyCheckpointComplete(long)} happened, the checkpoint can still be * complete (for example when a system failure happened directly after committing the checkpoint, - * before calling the {@link #checkpointComplete(long)} method). + * before calling the {@link #notifyCheckpointComplete(long)} method). */ void resetToCheckpoint(byte[] checkpointData) throws Exception; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 4e4e8f5..9819754 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -208,12 +208,21 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC } @Override - public void checkpointComplete(long checkpointId) { + public void notifyCheckpointComplete(long checkpointId) { // unfortunately, this method does not run in the scheduler executor, but in the // checkpoint coordinator time thread. // we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's // main thread executor - mainThreadExecutor.execute(() -> checkpointCompleteInternal(checkpointId)); + mainThreadExecutor.execute(() -> coordinator.notifyCheckpointComplete(checkpointId)); + } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + // unfortunately, this method does not run in the scheduler executor, but in the + // checkpoint coordinator time thread. + // we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's + // main thread executor + mainThreadExecutor.execute(() -> coordinator.notifyCheckpointAborted(checkpointId)); } @Override @@ -225,11 +234,6 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC coordinator.resetToCheckpoint(checkpointData); } - private void checkpointCompleteInternal(long checkpointId) { - mainThreadExecutor.assertRunningInMainThread(); - coordinator.checkpointComplete(checkpointId); - } - private void checkpointCoordinatorInternal(final long checkpointId, final CompletableFuture<byte[]> result) { mainThreadExecutor.assertRunningInMainThread(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index ea91156..c19997c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -73,8 +73,8 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { } @Override - public void checkpointComplete(long checkpointId) { - coordinator.checkpointComplete(checkpointId); + public void notifyCheckpointComplete(long checkpointId) { + coordinator.notifyCheckpointComplete(checkpointId); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 3aa095d..a22a239 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -181,15 +181,31 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements } @Override - public void checkpointComplete(long checkpointId) { + public void notifyCheckpointComplete(long checkpointId) { ensureStarted(); coordinatorExecutor.execute(() -> { try { LOG.info("Marking checkpoint {} as completed for source {}.", checkpointId, operatorName); context.onCheckpointComplete(checkpointId); + enumerator.notifyCheckpointComplete(checkpointId); } catch (Exception e) { - LOG.error("Failing the job due to exception when completing the checkpoint {} for source {}.", - checkpointId, operatorName, e); + LOG.error("Failing the job due to exception when notifying the completion of the " + + "checkpoint {} for source {}.", checkpointId, operatorName, e); + context.failJob(e); + } + }); + } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + ensureStarted(); + coordinatorExecutor.execute(() -> { + try { + LOG.info("Marking checkpoint {} as aborted for source {}.", checkpointId, operatorName); + enumerator.notifyCheckpointAborted(checkpointId); + } catch (Exception e) { + LOG.error("Failing the job due to exception when notifying abortion of the " + + "checkpoint {} for source {}.", checkpointId, operatorName, e); context.failJob(e); } }); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 78e5336..1d89596 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; @@ -94,6 +95,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionJobVertex; @@ -2562,6 +2564,57 @@ public class CheckpointCoordinatorTest extends TestLogger { assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty()); } + @Test + public void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception { + JobID jobId = new JobID(); + final ExecutionAttemptID attemptID = new ExecutionAttemptID(); + ExecutionVertex executionVertex = mockExecutionVertex(attemptID); + + CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext context = + new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder() + .setOperatorID(new OperatorID()) + .setOnCallingCheckpointCoordinator((ignored, future) -> future.complete(new byte[0])) + .build(); + + // set up the coordinator and validate the initial state + CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() + .setJobId(jobId) + .setTasks(new ExecutionVertex[]{executionVertex}) + .setCheckpointCoordinatorConfiguration( + CheckpointCoordinatorConfiguration + .builder() + .setMaxConcurrentCheckpoints(Integer.MAX_VALUE) + .build()) + .setTimer(manuallyTriggeredScheduledExecutor) + .setCoordinatorsToCheckpoint(Collections.singleton(context)) + .build(); + try { + // Trigger checkpoint 1. + checkpointCoordinator.triggerCheckpoint(false); + manuallyTriggeredScheduledExecutor.triggerAll(); + long checkpointId1 = Collections.max(checkpointCoordinator + .getPendingCheckpoints() + .keySet()); + // Trigger checkpoint 2. + checkpointCoordinator.triggerCheckpoint(false); + manuallyTriggeredScheduledExecutor.triggerAll(); + + // Acknowledge checkpoint 2. This should abort checkpoint 1. + long checkpointId2 = Collections.max(checkpointCoordinator + .getPendingCheckpoints() + .keySet()); + AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint( + jobId, attemptID, checkpointId2, new CheckpointMetrics(), null); + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, ""); + + // OperatorCoordinator should have been notified of the abortion of checkpoint 1. + assertEquals(Collections.singletonList(1L), context.getAbortedCheckpoints()); + assertEquals(Collections.singletonList(2L), context.getCompletedCheckpoints()); + } finally { + checkpointCoordinator.shutdown(JobStatus.FINISHED); + } + } + private CheckpointCoordinator getCheckpointCoordinator( JobID jobId, ExecutionVertex vertex1, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index df1395f..7e7e731 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -791,6 +791,8 @@ public class CheckpointCoordinatorTestingUtils { private final BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator; private final Consumer<Long> onCallingAfterSourceBarrierInjection; private final OperatorID operatorID; + private final List<Long> completedCheckpoints; + private final List<Long> abortedCheckpoints; private MockOperatorCoordinatorCheckpointContext( BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator, @@ -799,6 +801,8 @@ public class CheckpointCoordinatorTestingUtils { this.onCallingCheckpointCoordinator = onCallingCheckpointCoordinator; this.onCallingAfterSourceBarrierInjection = onCallingAfterSourceBarrierInjection; this.operatorID = operatorID; + this.completedCheckpoints = new ArrayList<>(); + this.abortedCheckpoints = new ArrayList<>(); } @Override @@ -821,8 +825,13 @@ public class CheckpointCoordinatorTestingUtils { } @Override - public void checkpointComplete(long checkpointId) { + public void notifyCheckpointComplete(long checkpointId) { + completedCheckpoints.add(checkpointId); + } + @Override + public void notifyCheckpointAborted(long checkpointId) { + abortedCheckpoints.add(checkpointId); } @Override @@ -844,6 +853,14 @@ public class CheckpointCoordinatorTestingUtils { public int currentParallelism() { return 1; } + + public List<Long> getCompletedCheckpoints() { + return completedCheckpoints; + } + + public List<Long> getAbortedCheckpoints() { + return abortedCheckpoints; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index be6f07c..e3047d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -299,7 +299,7 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { } @Override - public void checkpointComplete(long checkpointId) {} + public void notifyCheckpointComplete(long checkpointId) {} @SuppressWarnings("CallToPrintStackTrace") @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java index f4e8444..8c2c6ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java @@ -54,7 +54,7 @@ public final class MockOperatorCoordinator implements OperatorCoordinator { } @Override - public void checkpointComplete(long checkpointId) { + public void notifyCheckpointComplete(long checkpointId) { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index 29ec382..2c34de6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -532,7 +532,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger { public abstract void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception; @Override - public void checkpointComplete(long checkpointId) {} + public void notifyCheckpointComplete(long checkpointId) {} @Override public void resetToCheckpoint(byte[] checkpointData) throws Exception {} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java index e91bce3..e4d8b04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java @@ -81,7 +81,7 @@ class TestingOperatorCoordinator implements OperatorCoordinator { } @Override - public void checkpointComplete(long checkpointId) { + public void notifyCheckpointComplete(long checkpointId) { lastCheckpointComplete.offer(checkpointId); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index 3e48a93..6f6b5ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -30,12 +30,14 @@ import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.junit.Test; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment; import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException; import static org.junit.Assert.assertEquals; @@ -52,7 +54,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { public void testThrowExceptionWhenNotStarted() { // The following methods should only be invoked after the source coordinator has started. String failureMessage = "Call should fail when source coordinator has not started yet."; - verifyException(() -> sourceCoordinator.checkpointComplete(100L), + verifyException(() -> sourceCoordinator.notifyCheckpointComplete(100L), failureMessage, "The coordinator has not started yet."); verifyException(() -> sourceCoordinator.handleEventFromOperator(0, null), failureMessage, "The coordinator has not started yet."); @@ -209,7 +211,13 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { checkpointFuture.get(); // Complete checkpoint 100. - sourceCoordinator.checkpointComplete(100L); + sourceCoordinator.notifyCheckpointComplete(100L); + waitUtil( + () -> !enumerator.getSuccessfulCheckpoints().isEmpty(), + Duration.ofMillis(1000L), + "The enumerator failed to process the successful checkpoint " + + "before times out."); + assertEquals(100L, (long) enumerator.getSuccessfulCheckpoints().get(0)); // Fail reader 0. sourceCoordinator.subtaskFailed(0, null); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 46d4239..b18d85e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -225,8 +225,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> @Override public void snapshotState(StateSnapshotContext context) throws Exception { - LOG.debug("Taking a snapshot for checkpoint {}", context.getCheckpointId()); - readerState.update(sourceReader.snapshotState()); + long checkpointId = context.getCheckpointId(); + LOG.debug("Taking a snapshot for checkpoint {}", checkpointId); + readerState.update(sourceReader.snapshotState(checkpointId)); } @Override @@ -241,6 +242,18 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> readerState = new SimpleVersionedListState<>(rawState, splitSerializer); } + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + sourceReader.notifyCheckpointComplete(checkpointId); + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + super.notifyCheckpointAborted(checkpointId); + sourceReader.notifyCheckpointAborted(checkpointId); + } + @SuppressWarnings("unchecked") public void handleOperatorEvent(OperatorEvent event) { if (event instanceof AddSplitEvent) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java index 695e4bd..1423415 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java @@ -199,7 +199,7 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor } @Override - public void checkpointComplete(long checkpointId) { + public void notifyCheckpointComplete(long checkpointId) { } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 3b381fd..65228f0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -18,17 +18,21 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.mocks.MockSourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; @@ -36,10 +40,15 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.source.TestingSourceOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.util.CollectionUtil; import org.junit.After; @@ -47,6 +56,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -69,15 +79,24 @@ public class SourceOperatorTest { private SourceOperator<Integer, MockSourceSplit> operator; @Before - public void setup() { + public void setup() throws Exception { this.mockSourceReader = new MockSourceReader(); this.mockGateway = new MockOperatorEventGateway(); this.operator = new TestingSourceOperator<>(mockSourceReader, mockGateway, SUBTASK_INDEX); + Environment env = getTestingEnvironment(); + this.operator.setup( + new SourceOperatorStreamTask<Integer>(env), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>())); + this.operator.initializeState(new StreamTaskStateInitializerImpl(env, new MemoryStateBackend())); } @After public void cleanUp() throws Exception { operator.close(); + if (((TestingSourceOperator<Integer>) operator).isReaderCreated()) { + assertTrue(mockSourceReader.isClosed()); + } } @Test @@ -104,7 +123,6 @@ public class SourceOperatorTest { OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0); assertTrue(operatorEvent instanceof ReaderRegistrationEvent); assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) operatorEvent).subtaskId()); - assertTrue(mockSourceReader.isClosed()); } @Test @@ -155,6 +173,26 @@ public class SourceOperatorTest { assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState); } + @Test + public void testNotifyCheckpointComplete() throws Exception { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + operator.open(); + operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L)); + operator.notifyCheckpointComplete(100L); + assertEquals(100L, (long) mockSourceReader.getCompletedCheckpoints().get(0)); + } + + @Test + public void testNotifyCheckpointAborted() throws Exception { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + operator.open(); + operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L)); + operator.notifyCheckpointAborted(100L); + assertEquals(100L, (long) mockSourceReader.getAbortedCheckpoints().get(0)); + } + // ---------------- helper methods ------------------------- private StateInitializationContext getStateContext() throws Exception { @@ -190,4 +228,15 @@ public class SourceOperatorTest { return abstractStateBackend.createOperatorStateBackend( env, "test-operator", Collections.emptyList(), cancelStreamRegistry); } + + private Environment getTestingEnvironment() { + return new StreamMockEnvironment( + new Configuration(), + new Configuration(), + new ExecutionConfig(), + 1L, + new MockInputSplitProvider(), + 1, + new TestTaskStateManager()); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java index c189001..d48ed13 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java @@ -239,7 +239,7 @@ public class SourceOperatorEventTimeTest { } @Override - public List<MockSourceSplit> snapshotState() { + public List<MockSourceSplit> snapshotState(long checkpointId) { throw new UnsupportedOperationException(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index a133ace..14b94eb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -43,6 +43,8 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit private final int subtaskIndex; private final int parallelism; + private volatile boolean readerCreated; + public TestingSourceOperator( SourceReader<T, MockSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, @@ -79,6 +81,13 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit this.subtaskIndex = subtaskIndex; this.parallelism = parallelism; this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); + this.readerCreated = false; + } + + @Override + public void open() throws Exception { + super.open(); + readerCreated = true; } @Override @@ -93,4 +102,8 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit cfg.setAutoWatermarkInterval(100); return cfg; } + + public boolean isReaderCreated() { + return readerCreated; + } }
