This is an automated email from the ASF dual-hosted git repository.
corgy-w pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f725cf6e66 [Fix][Flink] Cache writer states during commit preparation
to fix streaming mode lost data. (#10158)
f725cf6e66 is described below
commit f725cf6e66cbb031340621976a7fba9a94ca70ae
Author: Adam Wang <[email protected]>
AuthorDate: Wed May 20 10:20:03 2026 +0800
[Fix][Flink] Cache writer states during commit preparation to fix streaming
mode lost data. (#10158)
---
.../translation/flink/sink/FlinkSinkWriter.java | 39 ++++
.../flink/sink/FlinkSinkWriterTest.java | 217 +++++++++++++++++++++
2 files changed, 256 insertions(+)
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index e7cc3c7860..cd60a52614 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -70,6 +70,19 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
private MultiTableResourceManager resourceManager;
+ /**
+ * Cached writer states produced together with {@link
#prepareCommit(boolean)}.
+ *
+ * <p>Flink 1.13+ calls {@code prepareCommit(..)} in {@code
prepareSnapshotPreBarrier} and
+ * {@code snapshotState(..)} afterwards. To guarantee that SeaTunnel {@link
+ * org.apache.seatunnel.api.sink.SinkWriter} always sees {@code
prepareCommit(checkpointId)}
+ * followed immediately by {@code snapshotState(checkpointId)} without any
further writes to the
+ * same transaction, we invoke {@code snapshotState(checkpointId)} inside
{@link
+ * #prepareCommit(boolean)} and cache the result here. The subsequent
Flink {@code
+ * snapshotState(..)} call simply consumes this cached state.
+ */
+ private List<FlinkWriterState<WriterStateT>> pendingStates;
+
FlinkSinkWriter(
org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT,
WriterStateT> sinkWriter,
long checkpointId,
@@ -202,7 +215,22 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
@Override
public List<CommitWrapper<CommT>> prepareCommit(boolean flush) throws
IOException {
+ // 1. Let SeaTunnel SinkWriter prepare the commit for the current
checkpointId
Optional<CommT> commTOptional = sinkWriter.prepareCommit(checkpointId);
+
+ // 2. Immediately snapshot state for the same checkpointId, so from
SeaTunnel's
+ // perspective prepareCommit(checkpointId) and
snapshotState(checkpointId) are
+ // executed back-to-back with no further writes to the same
transaction.
+ List<FlinkWriterState<WriterStateT>> states =
+ sinkWriter.snapshotState(this.checkpointId).stream()
+ .map(state -> new
FlinkWriterState<>(this.checkpointId, state))
+ .collect(Collectors.toList());
+ this.pendingStates = states;
+
+ // 3. Advance internal checkpointId for the next round.
+ this.checkpointId++;
+
+ // 4. Wrap commit info as before.
return commTOptional
.map(CommitWrapper::new)
.map(Collections::singletonList)
@@ -211,6 +239,17 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
@Override
public List<FlinkWriterState<WriterStateT>> snapshotState() throws
IOException {
+ // If we have already snapshotted state in prepareCommit for this
checkpoint,
+ // just return the cached value to Flink and avoid calling the
underlying
+ // SeaTunnel SinkWriter.snapshotState(..) a second time.
+ if (pendingStates != null) {
+ List<FlinkWriterState<WriterStateT>> states = pendingStates;
+ pendingStates = null;
+ return states;
+ }
+
+ // Fallback: in some edge cases (e.g., sinks without 2PC) Flink might
call snapshotState
+ // without a preceding prepareCommit. Preserve the original behaviour
then.
List<FlinkWriterState<WriterStateT>> states =
sinkWriter.snapshotState(this.checkpointId).stream()
.map(state -> new
FlinkWriterState<>(this.checkpointId, state))
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterTest.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterTest.java
new file mode 100644
index 0000000000..50be1a39d2
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+class FlinkSinkWriterTest {
+
+ @Test
+ void testPrepareCommitSnapshotsStateAndAdvancesCheckpoint() throws
Exception {
+ RecordingSinkWriter delegate = new RecordingSinkWriter();
+ RecordingContext context = new RecordingContext();
+
+ FlinkSinkWriter<SeaTunnelRow, String, String> flinkSinkWriter =
+ new FlinkSinkWriter<>(delegate, 1L, context);
+
+ // first checkpoint
+ List<CommitWrapper<String>> commits =
flinkSinkWriter.prepareCommit(false);
+ List<FlinkWriterState<String>> states =
flinkSinkWriter.snapshotState();
+
+ // prepareCommit should call delegate.prepareCommit with checkpointId 1
+ Assertions.assertEquals(Collections.singletonList(1L),
delegate.prepareCommitCalls);
+ Assertions.assertEquals("commit-1", commits.get(0).getCommit());
+
+ // snapshotState should have been called exactly once for checkpointId
1
+ Assertions.assertEquals(Collections.singletonList(1L),
delegate.snapshotCalls);
+ Assertions.assertEquals(1, states.size());
+ Assertions.assertEquals(1L, states.get(0).getCheckpointId());
+ Assertions.assertEquals("state-1", states.get(0).getState());
+
+ // internal checkpointId should have advanced to 2 for next round
+ commits = flinkSinkWriter.prepareCommit(false);
+ states = flinkSinkWriter.snapshotState();
+
+ Assertions.assertEquals(2, delegate.prepareCommitCalls.size());
+ Assertions.assertEquals(2, delegate.snapshotCalls.size());
+ Assertions.assertEquals("commit-2", commits.get(0).getCommit());
+ Assertions.assertEquals(2L, states.get(0).getCheckpointId());
+ Assertions.assertEquals("state-2", states.get(0).getState());
+ }
+
+ @Test
+ void testSnapshotStateWithoutPrepareCommitFallsBack() throws Exception {
+ RecordingSinkWriter delegate = new RecordingSinkWriter();
+ RecordingContext context = new RecordingContext();
+
+ FlinkSinkWriter<SeaTunnelRow, String, String> flinkSinkWriter =
+ new FlinkSinkWriter<>(delegate, 3L, context);
+
+ // Direct snapshotState should call delegate.snapshotState with
checkpointId 3
+ List<FlinkWriterState<String>> states =
flinkSinkWriter.snapshotState();
+
+ Assertions.assertEquals(Collections.singletonList(3L),
delegate.snapshotCalls);
+ Assertions.assertEquals(1, states.size());
+ Assertions.assertEquals(3L, states.get(0).getCheckpointId());
+ Assertions.assertEquals("state-3", states.get(0).getState());
+ }
+
+ private static class RecordingSinkWriter implements
SinkWriter<SeaTunnelRow, String, String> {
+
+ private final List<Long> prepareCommitCalls = new ArrayList<>();
+ private final List<Long> snapshotCalls = new ArrayList<>();
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {}
+
+ @Override
+ public Optional<String> prepareCommit() {
+ // not used in these tests
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> prepareCommit(long checkpointId) {
+ prepareCommitCalls.add(checkpointId);
+ return Optional.of("commit-" + checkpointId);
+ }
+
+ @Override
+ public List<String> snapshotState(long checkpointId) {
+ snapshotCalls.add(checkpointId);
+ return Collections.singletonList("state-" + checkpointId);
+ }
+
+ @Override
+ public void abortPrepare() {}
+
+ @Override
+ public void close() throws IOException {}
+ }
+
+ private static class RecordingContext implements SinkWriter.Context {
+
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return new NoopMetricsContext();
+ }
+
+ @Override
+ public EventListener getEventListener() {
+ return event -> {};
+ }
+ }
+
+ private static class NoopMetricsContext implements MetricsContext {
+
+ @Override
+ public org.apache.seatunnel.api.common.metrics.Counter counter(String
name) {
+ return new org.apache.seatunnel.api.common.metrics.Counter() {
+ @Override
+ public void inc() {}
+
+ @Override
+ public void inc(long n) {}
+
+ @Override
+ public void dec() {}
+
+ @Override
+ public void dec(long n) {}
+
+ @Override
+ public void set(long n) {}
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public org.apache.seatunnel.api.common.metrics.Unit unit() {
+ return org.apache.seatunnel.api.common.metrics.Unit.COUNT;
+ }
+ };
+ }
+
+ @Override
+ public <C extends org.apache.seatunnel.api.common.metrics.Counter> C
counter(
+ String name, C counter) {
+ return counter;
+ }
+
+ @Override
+ public org.apache.seatunnel.api.common.metrics.Meter meter(String
name) {
+ return new org.apache.seatunnel.api.common.metrics.Meter() {
+ @Override
+ public void markEvent() {}
+
+ @Override
+ public void markEvent(long n) {}
+
+ @Override
+ public double getRate() {
+ return 0;
+ }
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public org.apache.seatunnel.api.common.metrics.Unit unit() {
+ return org.apache.seatunnel.api.common.metrics.Unit.COUNT;
+ }
+ };
+ }
+
+ @Override
+ public <M extends org.apache.seatunnel.api.common.metrics.Meter> M
meter(
+ String name, M meter) {
+ return meter;
+ }
+ }
+}