This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 37e6724813bd92f8d323ddec80308241f693a5e1 Author: Arvid Heise <[email protected]> AuthorDate: Thu Sep 5 15:25:12 2024 +0200 [FLINK-25920] Handle duplicate EOI in Sink In case of a failure after final checkpoint, EOI is called twice. SinkWriter should ignore the second call to avoid emitting more dummy committables = transactional objects containing no data since no data can arrive when recovering from final checkpoint. The commit uses a boolean list state to remember if EOI has been emitted. The cases are discussed in code. Since rescaling may still result in these dummy committables, the committer needs merge them into the CommittableCollector as these committables still need to be committed as systems like Kafka don't provide transactional isolation. --- .../runtime/operators/sink/SinkWriterOperator.java | 58 ++++++++++++- .../CheckpointCommittableManagerImpl.java | 27 +++--- .../sink/committables/CommittableCollector.java | 2 +- .../CheckpointCommittableManagerImplTest.java | 14 ++-- .../flink/test/streaming/runtime/SinkITCase.java | 97 +++++++++++++++++++--- 5 files changed, 165 insertions(+), 33 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index 93f5ba66198..35d6ca6f7ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema.Initializat import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; @@ -53,6 +54,8 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.UserCodeClassLoader; +import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; + import javax.annotation.Nullable; import java.io.IOException; @@ -67,7 +70,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * An operator that processes records to be written into a {@link - * org.apache.flink.api.connector.sink.Sink}. It also has a way to process committables with the + * org.apache.flink.api.connector.sink2.Sink}. It also has a way to process committables with the * same parallelism or send them downstream to a {@link CommitterOperator} with a different * parallelism. * @@ -90,6 +93,13 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Nullable private final SimpleVersionedSerializer<CommT> committableSerializer; private final List<CommT> legacyCommittables = new ArrayList<>(); + /** + * Used to remember that EOI has already happened so that we don't emit the last committables of + * the final checkpoints twice. + */ + private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC = + new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE); + /** The runtime information of the input element. */ private final Context<InputT> context; @@ -107,6 +117,10 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab private final MailboxExecutor mailboxExecutor; private boolean endOfInput = false; + /** + * Remembers the endOfInput state for (final) checkpoints iff the operator emits committables. + */ + @Nullable private ListState<Boolean> endOfInputState; SinkWriterOperator( Sink<InputT> sink, @@ -160,17 +174,48 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab legacyCommitterState.clear(); } } + sinkWriter = writerStateHandler.createWriter(initContext, context); + + if (emitDownstream) { + // Figure out if we have seen end of input before and if we can suppress creating + // transactions and sending them downstream to the CommitterOperator. We have the + // following + // cases: + // 1. state is empty: + // - First time initialization + // - Restoring from a previous version of Flink that didn't handle EOI + // - Upscaled from a final or regular checkpoint + // In all cases, we regularly handle EOI, potentially resulting in duplicate summaries + // that the CommitterOperator needs to handle. + // 2. state is not empty: + // - This implies Flink restores from a version that handles EOI. + // - If there is one entry, no rescaling happened (for this subtask), so if it's true, + // we recover from a final checkpoint (for this subtask) and can ignore another EOI + // else we have a regular checkpoint. + // - If there are multiple entries, Flink downscaled, and we need to check if all are + // true and do the same as above. As soon as one entry is false, we regularly start + // the writer and potentially emit duplicate summaries if we indeed recovered from a + // final checkpoint. + endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); + ArrayList<Boolean> previousState = Lists.newArrayList(endOfInputState.get()); + endOfInput = !previousState.isEmpty() && !previousState.contains(false); + } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); writerStateHandler.snapshotState(context.getCheckpointId()); + if (endOfInputState != null) { + endOfInputState.clear(); + endOfInputState.add(this.endOfInput); + } } @Override public void processElement(StreamRecord<InputT> element) throws Exception { + checkState(!endOfInput, "Received element after endOfInput: %s", element); context.element = element; sinkWriter.write(element.getValue(), context); } @@ -195,9 +240,14 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Override public void endInput() throws Exception { - endOfInput = true; - sinkWriter.flush(true); - emitCommittables(CommittableMessage.EOI); + if (!endOfInput) { + endOfInput = true; + if (endOfInputState != null) { + endOfInputState.add(true); + } + sinkWriter.flush(true); + emitCommittables(CommittableMessage.EOI); + } } private void emitCommittables(long checkpointId) throws IOException, InterruptedException { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index d98ec256e50..a217116055c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.sink.committables; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; @@ -72,18 +73,24 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa return subtasksCommittableManagers.values(); } - void upsertSummary(CommittableSummary<CommT> summary) { + void addSummary(CommittableSummary<CommT> summary) { + long checkpointId = summary.getCheckpointIdOrEOI(); SubtaskCommittableManager<CommT> manager = new SubtaskCommittableManager<>( - summary.getNumberOfCommittables(), - subtaskId, - summary.getCheckpointIdOrEOI(), - metricGroup); - SubtaskCommittableManager<CommT> existing = - subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager); - if (existing != null) { - throw new UnsupportedOperationException( - "Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920"); + summary.getNumberOfCommittables(), subtaskId, checkpointId, metricGroup); + if (checkpointId == CommittableMessage.EOI) { + SubtaskCommittableManager<CommT> merged = + subtasksCommittableManagers.merge( + summary.getSubtaskId(), manager, SubtaskCommittableManager::merge); + } else { + SubtaskCommittableManager<CommT> existing = + subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager); + if (existing != null) { + throw new UnsupportedOperationException( + String.format( + "Received duplicate committable summary for checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of FLINK-25920", + checkpointId, summary.getSubtaskId(), manager, existing)); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index 801c8446850..2dac78c71ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -241,7 +241,7 @@ public class CommittableCollector<CommT> { numberOfSubtasks, summary.getCheckpointIdOrEOI(), metricGroup)) - .upsertSummary(summary); + .addSummary(summary); } private void addCommittable(CommittableWithLineage<CommT> committable) { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java index 5bcebda497f..6c8687b63c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java @@ -47,7 +47,7 @@ class CheckpointCommittableManagerImplTest { assertThat(checkpointCommittables.getSubtaskCommittableManagers()).isEmpty(); final CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, 1L, 1, 0, 0); - checkpointCommittables.upsertSummary(first); + checkpointCommittables.addSummary(first); assertThat(checkpointCommittables.getSubtaskCommittableManagers()) .hasSize(1) .satisfiesExactly( @@ -60,7 +60,7 @@ class CheckpointCommittableManagerImplTest { // Add different subtask id final CommittableSummary<Integer> third = new CommittableSummary<>(2, 1, 2L, 2, 1, 1); - checkpointCommittables.upsertSummary(third); + checkpointCommittables.addSummary(third); assertThat(checkpointCommittables.getSubtaskCommittableManagers()).hasSize(2); } @@ -68,8 +68,8 @@ class CheckpointCommittableManagerImplTest { void testCommit() throws IOException, InterruptedException { final CheckpointCommittableManagerImpl<Integer> checkpointCommittables = new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP); - checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); - checkpointCommittables.upsertSummary(new CommittableSummary<>(2, 1, 1L, 2, 0, 0)); + checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); + checkpointCommittables.addSummary(new CommittableSummary<>(2, 1, 1L, 2, 0, 0)); checkpointCommittables.addCommittable(new CommittableWithLineage<>(3, 1L, 1)); checkpointCommittables.addCommittable(new CommittableWithLineage<>(4, 1L, 2)); @@ -96,10 +96,10 @@ class CheckpointCommittableManagerImplTest { void testUpdateCommittableSummary() { final CheckpointCommittableManagerImpl<Integer> checkpointCommittables = new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP); - checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); + checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); assertThatThrownBy( () -> - checkpointCommittables.upsertSummary( + checkpointCommittables.addSummary( new CommittableSummary<>(1, 1, 1L, 2, 0, 0))) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("FLINK-25920"); @@ -114,7 +114,7 @@ class CheckpointCommittableManagerImplTest { final CheckpointCommittableManagerImpl<Integer> original = new CheckpointCommittableManagerImpl<>( subtaskId, numberOfSubtasks, checkpointId, METRIC_GROUP); - original.upsertSummary( + original.addSummary( new CommittableSummary<>(subtaskId, numberOfSubtasks, checkpointId, 1, 0, 0)); CheckpointCommittableManagerImpl<Integer> copy = original.copy(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java index 0a3de995c9a..383ea9cfd33 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java @@ -17,25 +17,38 @@ package org.apache.flink.test.streaming.runtime; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.connector.datagen.source.TestDataGenerators; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.runtime.operators.sink.TestSink; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.junit.SharedObjectsExtension; +import org.apache.flink.testutils.junit.SharedReference; -import org.junit.Before; -import org.junit.Test; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -48,7 +61,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; /** * Integration test for {@link org.apache.flink.api.connector.sink.Sink} run time implementation. */ -public class SinkITCase extends AbstractTestBaseJUnit4 { +class SinkITCase extends AbstractTestBase { static final List<Integer> SOURCE_DATA = Arrays.asList( 895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, @@ -109,14 +122,17 @@ public class SinkITCase extends AbstractTestBaseJUnit4 { COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean() && GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean(); - @Before + @RegisterExtension + private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create(); + + @BeforeEach public void init() { COMMIT_QUEUE.clear(); GLOBAL_COMMIT_QUEUE.clear(); } @Test - public void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception { + void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); final DataStream<Integer> stream = @@ -153,7 +169,7 @@ public class SinkITCase extends AbstractTestBaseJUnit4 { } @Test - public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception { + void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); env.fromData(SOURCE_DATA) @@ -177,7 +193,7 @@ public class SinkITCase extends AbstractTestBaseJUnit4 { } @Test - public void writerAndCommitterExecuteInStreamingMode() throws Exception { + void writerAndCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); final DataStream<Integer> stream = @@ -198,8 +214,45 @@ public class SinkITCase extends AbstractTestBaseJUnit4 { containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray())); } + /** + * Creates a bounded stream with a failing committer. The test verifies that the Sink correctly + * recovers and handles multiple endInput(). + */ + @Test + void duplicateEndInput() throws Exception { + // we need at least 2 attempts but add a bit of a safety margin for unexpected retries + int maxAttempts = 10; + final Configuration conf = new Configuration(); + conf.set(CheckpointingOptions.TOLERABLE_FAILURE_NUMBER, maxAttempts); + conf.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, maxAttempts); + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(conf); + env.enableCheckpointing(100); + + AtomicBoolean failedOnce = new AtomicBoolean(false); + List<String> committed = new ArrayList<>(); + FailingOnceCommitter committer = + new FailingOnceCommitter( + sharedObjects.add(failedOnce), sharedObjects.add(committed)); + env.<Object>fromData("bounded") + .sinkTo(TestSinkV2.newBuilder().setCommitter(committer).build()); + + JobClient jobClient = env.executeAsync(); + // wait for job to finish including restarts + jobClient.getJobExecutionResult().get(); + // Did we successfully finish? + Assertions.assertThat(jobClient.getJobStatus().get()).isEqualTo(JobStatus.FINISHED); + + // check that we error'ed once as expected + Assertions.assertThat(failedOnce).isTrue(); + // but also eventually succeed to commit (size > 1 in case of unexpected retries) + Assertions.assertThat(committed).isNotEmpty(); + } + @Test - public void writerAndCommitterExecuteInBatchMode() throws Exception { + void writerAndCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); env.fromData(SOURCE_DATA) @@ -214,7 +267,7 @@ public class SinkITCase extends AbstractTestBaseJUnit4 { } @Test - public void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception { + void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); final DataStream<Integer> stream = @@ -245,7 +298,7 @@ public class SinkITCase extends AbstractTestBaseJUnit4 { } @Test - public void writerAndGlobalCommitterExecuteInBatchMode() throws Exception { + void writerAndGlobalCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); env.fromData(SOURCE_DATA) @@ -280,4 +333,26 @@ public class SinkITCase extends AbstractTestBaseJUnit4 { env.setRuntimeMode(RuntimeExecutionMode.BATCH); return env; } + + static class FailingOnceCommitter extends TestSinkV2.DefaultCommitter { + private final SharedReference<AtomicBoolean> failedOnce; + private final SharedReference<List<String>> committed; + + public FailingOnceCommitter( + SharedReference<AtomicBoolean> failedOnce, + SharedReference<List<String>> committed) { + this.failedOnce = failedOnce; + this.committed = committed; + } + + @Override + public void commit(Collection<CommitRequest<String>> committables) { + if (failedOnce.get().compareAndSet(false, true)) { + throw new RuntimeException("Fail to commit"); + } + for (CommitRequest<String> committable : committables) { + this.committed.get().add(committable.getCommittable()); + } + } + } }
