This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ad4857e49b35c756c8a800a13fc74f8770381d1b Author: Arvid Heise <[email protected]> AuthorDate: Thu Apr 3 09:32:00 2025 +0200 [FLINK-37605][runtime] Infer checkpoint id on endInput in sink So far, we used a special value for the final checkpoint on endInput. However, as shown in the description of this ticket, final doesn't mean final. Hence, multiple committables with EOI could be created at different times. With this commit, we stop using a special value for such committables and instead try to guess the checkpoint id of the next checkpoint. There are various factors that influence the checkpoint id but we can mostly ignore them all because we just need to pick a checkpoint id that is - higher than all checkpoint ids of the previous, successful checkpoints of this attempt - higher than the checkpoint id of the restored checkpoint - lower than any future checkpoint id. Hence, we just remember the last observed checkpoint id (initialized with max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple endInput calls happening through restarts will result in unique checkpoint ids. Note that aborted checkpoints before endInput may result in diverged checkpoint ids across subtasks. However, each of the id satisfies above requirements and any id of endInput1 will be smaller than any id of endInput2. Thus, diverged checkpoint ids will not [...] (cherry picked from commit 93025452714570a4d461519510375dd72af3a2c0) --- .../sink/compactor/operator/CompactorOperator.java | 12 +++-- .../api/connector/sink2/CommittableMessage.java | 6 ++- .../runtime/operators/sink/CommitterOperator.java | 23 ++++----- .../runtime/operators/sink/SinkWriterOperator.java | 58 ++++------------------ .../CheckpointCommittableManagerImpl.java | 1 + .../sink/committables/CommittableCollector.java | 11 ---- .../sink2/GlobalCommitterOperatorTest.java | 33 ------------ .../operators/sink/CommitterOperatorTestBase.java | 40 --------------- .../operators/sink/SinkWriterOperatorTestBase.java | 40 ++++++++++++++- .../committables/CommittableCollectorTest.java | 20 -------- .../util/AbstractStreamOperatorTestHarness.java | 12 +++-- 11 files changed, 81 insertions(+), 175 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java index 0cb573d466f..69ee4b07914 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java @@ -34,6 +34,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -106,6 +107,8 @@ public class CompactorOperator // submitted again while restoring private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; + private long lastKnownCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; + public CompactorOperator( FileCompactStrategy strategy, SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, @@ -136,15 +139,16 @@ public class CompactorOperator @Override public void endInput() throws Exception { // add collecting requests into the final snapshot - checkpointRequests.put(CommittableMessage.EOI, collectingRequests); + long checkpointId = lastKnownCheckpointId + 1; + checkpointRequests.put(checkpointId, collectingRequests); collectingRequests = new ArrayList<>(); // submit all requests and wait until they are done - submitUntil(CommittableMessage.EOI); + submitUntil(checkpointId); assert checkpointRequests.isEmpty(); getAllTasksFuture().join(); - emitCompacted(CommittableMessage.EOI); + emitCompacted(checkpointId); assert compactingRequests.isEmpty(); } @@ -222,6 +226,8 @@ public class CompactorOperator } private void emitCompacted(long checkpointId) throws Exception { + lastKnownCheckpointId = checkpointId; + List<FileSinkCommittable> compacted = new ArrayList<>(); Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter = compactingRequests.iterator(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java index 7db0c29ecc6..4a2049dbce8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java @@ -28,8 +28,10 @@ public interface CommittableMessage<CommT> { /** * Special value for checkpointId for the end of input in case of batch commit or final * checkpoint. + * + * @deprecated the special value is not used anymore at all (remove with Flink 2.2) */ - long EOI = Long.MAX_VALUE; + @Deprecated long EOI = Long.MAX_VALUE; /** The subtask that created this committable. */ int getSubtaskId(); @@ -49,6 +51,8 @@ public interface CommittableMessage<CommT> { /** * Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch * commit. + * + * @deprecated the special value EOI is not used anymore */ long getCheckpointIdOrEOI(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 10ae86cf10d..6954ad24e36 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.configuration.SinkOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -51,7 +52,6 @@ import java.util.Collection; import java.util.Collections; import java.util.OptionalLong; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -76,11 +76,9 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage private SinkCommitterMetricGroup metricGroup; private Committer<CommT> committer; private CommittableCollector<CommT> committableCollector; - private long lastCompletedCheckpointId = -1; + private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; private int maxRetries; - private boolean endInput = false; - /** The operator's state descriptor. */ private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>( @@ -131,11 +129,11 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), metricGroup)); - if (context.isRestored()) { + if (checkpointId.isPresent()) { committableCollectorState.get().forEach(cc -> committableCollector.merge(cc)); lastCompletedCheckpointId = checkpointId.getAsLong(); // try to re-commit recovered transactions as quickly as possible - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId); } } @@ -148,24 +146,23 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage @Override public void endInput() throws Exception { - endInput = true; if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId)); } - private void commitAndEmitCheckpoints() throws IOException, InterruptedException { - long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId; + private void commitAndEmitCheckpoints(long checkpointId) + throws IOException, InterruptedException { + lastCompletedCheckpointId = checkpointId; for (CheckpointCommittableManager<CommT> checkpointManager : - committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { + committableCollector.getCheckpointCommittablesUpTo(checkpointId)) { // ensure that all committables of the first checkpoint are fully committed before // attempting the next committable commitAndEmit(checkpointManager); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index 7fb78f37c0d..31397f48b37 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.serialization.SerializationSchema.Initializat import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; @@ -52,8 +51,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.UserCodeClassLoader; -import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; - import javax.annotation.Nullable; import java.io.IOException; @@ -62,6 +59,7 @@ import java.util.Collection; import java.util.List; import java.util.OptionalLong; +import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -91,13 +89,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Nullable private final SimpleVersionedSerializer<CommT> committableSerializer; private final List<CommT> legacyCommittables = new ArrayList<>(); - /** - * Used to remember that EOI has already happened so that we don't emit the last committables of - * the final checkpoints twice. - */ - private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC = - new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE); - /** The runtime information of the input element. */ private final Context<InputT> context; @@ -115,10 +106,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab private final MailboxExecutor mailboxExecutor; private boolean endOfInput = false; - /** - * Remembers the endOfInput state for (final) checkpoints iff the operator emits committables. - */ - @Nullable private ListState<Boolean> endOfInputState; + private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1; SinkWriterOperator( Sink<InputT> sink, @@ -146,8 +134,10 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - WriterInitContext initContext = createInitContext(context.getRestoredCheckpointId()); - if (context.isRestored()) { + OptionalLong restoredCheckpointId = context.getRestoredCheckpointId(); + WriterInitContext initContext = createInitContext(restoredCheckpointId); + if (restoredCheckpointId.isPresent()) { + lastKnownCheckpointId = restoredCheckpointId.getAsLong(); if (committableSerializer != null) { final ListState<List<CommT>> legacyCommitterState = new SimpleVersionedListState<>( @@ -161,41 +151,12 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab } sinkWriter = writerStateHandler.createWriter(initContext, context); - - if (emitDownstream) { - // Figure out if we have seen end of input before and if we can suppress creating - // transactions and sending them downstream to the CommitterOperator. We have the - // following - // cases: - // 1. state is empty: - // - First time initialization - // - Restoring from a previous version of Flink that didn't handle EOI - // - Upscaled from a final or regular checkpoint - // In all cases, we regularly handle EOI, potentially resulting in duplicate summaries - // that the CommitterOperator needs to handle. - // 2. state is not empty: - // - This implies Flink restores from a version that handles EOI. - // - If there is one entry, no rescaling happened (for this subtask), so if it's true, - // we recover from a final checkpoint (for this subtask) and can ignore another EOI - // else we have a regular checkpoint. - // - If there are multiple entries, Flink downscaled, and we need to check if all are - // true and do the same as above. As soon as one entry is false, we regularly start - // the writer and potentially emit duplicate summaries if we indeed recovered from a - // final checkpoint. - endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); - ArrayList<Boolean> previousState = Lists.newArrayList(endOfInputState.get()); - endOfInput = !previousState.isEmpty() && !previousState.contains(false); - } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); writerStateHandler.snapshotState(context.getCheckpointId()); - if (endOfInputState != null) { - endOfInputState.clear(); - endOfInputState.add(this.endOfInput); - } } @Override @@ -225,17 +186,16 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Override public void endInput() throws Exception { + LOG.info("Received endInput"); if (!endOfInput) { endOfInput = true; - if (endOfInputState != null) { - endOfInputState.add(true); - } sinkWriter.flush(true); - emitCommittables(CommittableMessage.EOI); + emitCommittables(lastKnownCheckpointId + 1); } } private void emitCommittables(long checkpointId) throws IOException, InterruptedException { + lastKnownCheckpointId = checkpointId; if (!emitDownstream) { // To support SinkV1 topologies with only a writer we have to call prepareCommit // although no committables are forwarded diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index da4491cda61..816bd55543e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -95,6 +95,7 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa summary.getSubtaskId(), checkpointId, metricGroup); + // Remove branch once CommittableMessage.EOI has been removed (earliest 2.2) if (checkpointId == CommittableMessage.EOI) { SubtaskCommittableManager<CommT> merged = subtasksCommittableManagers.merge( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index 4e49d73279e..96585a632d1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Objects; -import java.util.Optional; import java.util.TreeMap; import java.util.stream.Collectors; @@ -49,7 +48,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ @Internal public class CommittableCollector<CommT> { - private static final long EOI = Long.MAX_VALUE; /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */ private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>> checkpointCommittables; @@ -143,15 +141,6 @@ public class CommittableCollector<CommT> { return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values()); } - /** - * Returns {@link CheckpointCommittableManager} belonging to the last input. - * - * @return {@link CheckpointCommittableManager} - */ - public Optional<CheckpointCommittableManager<CommT>> getEndOfInputCommittable() { - return Optional.ofNullable(checkpointCommittables.get(EOI)); - } - /** * Returns whether all {@link CheckpointCommittableManager} currently hold by the collector are * either committed or failed. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java index 641a651e2e4..24f9422d30b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java @@ -32,7 +32,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class GlobalCommitterOperatorTest { @@ -138,38 +137,6 @@ class GlobalCommitterOperatorTest { } } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception { - final MockCommitter committer = new MockCommitter(); - final OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = - createTestHarness(committer, commitOnInput); - testHarness.open(); - - final CommittableSummary<Integer> committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary<Integer> committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - // commitOnInput implies that the global committer is not using notifyCheckpointComplete - if (commitOnInput) { - assertThat(committer.committed).containsExactly(1, 2); - } else { - assertThat(committer.committed).isEmpty(); - testHarness.notifyOfCompletedCheckpoint(EOI); - assertThat(committer.committed).containsExactly(1, 2); - } - - assertThat(testHarness.getOutput()).isEmpty(); - } - private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> createTestHarness( Committer<Integer> committer, boolean commitOnInput) throws Exception { return new OneInputStreamOperatorTestHarness<>( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index 756ea0c8022..c8b37943846 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -35,7 +35,6 @@ import org.junit.jupiter.params.provider.ValueSource; import java.util.function.IntSupplier; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; import static org.assertj.core.api.Assertions.as; @@ -126,45 +125,6 @@ abstract class CommitterOperatorTestBase { testHarness.close(); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode); - testHarness.open(); - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary<String> committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - testHarness.endInput(); - if (!isBatchMode) { - assertThat(testHarness.getOutput()).isEmpty(); - // notify final checkpoint complete - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(0) - .hasOverallCommittables(2) - .hasPendingCommittables(0); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - @Test void testStateRestore() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java index d6513ab738a..e0425dc56c8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java @@ -66,7 +66,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; import static org.assertj.core.api.Assertions.as; @@ -178,7 +177,7 @@ abstract class SinkWriterOperatorTestBase { testHarness.processElement(1, 1); testHarness.endInput(); - assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); } @ParameterizedTest @@ -467,6 +466,43 @@ abstract class SinkWriterOperatorTestBase { testHarness.close(); } + @Test + void testDoubleEndOfInput() throws Exception { + InspectableSink sink = sinkWithCommitter(); + + OperatorSubtaskState snapshot; + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink()))) { + testHarness.open(); + testHarness.processElement(1, 1); + + testHarness.endInput(); + testHarness.prepareSnapshotPreBarrier(1); + snapshot = testHarness.snapshot(1, 1); + + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); + } + + final InspectableSink restoredSink = sinkWithCommitter(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink.getSink()))) { + restoredTestHarness.setRestoredCheckpointId(1L); + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); + restoredTestHarness.processElement(2, 2); + + restoredTestHarness.endInput(); + restoredTestHarness.prepareSnapshotPreBarrier(3); + restoredTestHarness.snapshot(3, 1); + + // asserts the guessed checkpoint id which needs + assertBasicOutput(restoredTestHarness.extractOutputValues(), 1, 2L); + } + } + private static void assertContextsEqual( Sink.InitContext initContext, WriterInitContext original) { assertThat(initContext.getUserCodeClassLoader().asClassLoader()) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java index 6e55adcc0c5..3181c21361a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java @@ -24,9 +24,6 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.junit.jupiter.api.Test; -import java.util.Optional; - -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class CommittableCollectorTest { @@ -44,22 +41,5 @@ class CommittableCollectorTest { committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 0, 0)); assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2); - - assertThat(committableCollector.getEndOfInputCommittable()).isNotPresent(); - } - - @Test - void testGetEndOfInputCommittable() { - final CommittableCollector<Integer> committableCollector = - new CommittableCollector<>(METRIC_GROUP); - CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, EOI, 1, 0, 0); - committableCollector.addMessage(first); - - Optional<CheckpointCommittableManager<Integer>> endOfInputCommittable = - committableCollector.getEndOfInputCommittable(); - assertThat(endOfInputCommittable).isPresent(); - assertThat(endOfInputCommittable) - .get() - .returns(EOI, CheckpointCommittableManager::getCheckpointId); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index aa58c4ea8cc..8935c61865d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -176,6 +176,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { private volatile boolean wasFailedExternally = false; + private long restoredCheckpointId = 0; + public AbstractStreamOperatorTestHarness( StreamOperator<OUT> operator, int maxParallelism, int parallelism, int subtaskIndex) throws Exception { @@ -388,6 +390,10 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { return config; } + public void setRestoredCheckpointId(long restoredCheckpointId) { + this.restoredCheckpointId = restoredCheckpointId; + } + /** Get all the output from the task. This contains StreamRecords and Events interleaved. */ public ConcurrentLinkedQueue<Object> getOutput() { return outputList; @@ -596,16 +602,16 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { jmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), jmOperatorStateHandles); - taskStateManager.setReportedCheckpointId(0); + taskStateManager.setReportedCheckpointId(restoredCheckpointId); taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, jmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, jmTaskStateSnapshot)); if (tmOperatorStateHandles != null) { TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot(); tmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), tmOperatorStateHandles); taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, tmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, tmTaskStateSnapshot)); } }
