This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 2df2d6b1a [kv] Add undo recovery support for aggregation tables (#2545)
2df2d6b1a is described below
commit 2df2d6b1a10e5f95d10bb149a6ca34d4c8a7f2f0
Author: Yang Wang <[email protected]>
AuthorDate: Mon Feb 9 20:05:44 2026 +0800
[kv] Add undo recovery support for aggregation tables (#2545)
---
.../fluss/client/table/scanner/log/LogScanner.java | 10 +
.../client/table/scanner/log/LogScannerImpl.java | 17 +
.../org/apache/fluss/utils/ByteArrayWrapper.java | 65 ++
.../apache/fluss/utils/ByteArrayWrapperTest.java | 99 ++
.../sink/writer/undo/BucketRecoveryContext.java | 137 +++
.../fluss/flink/sink/writer/undo/UndoComputer.java | 150 ++++
.../sink/writer/undo/UndoRecoveryExecutor.java | 287 ++++++
.../sink/writer/undo/UndoRecoveryManager.java | 248 +++++
.../flink/sink/writer/undo/UndoComputerTest.java | 199 ++++
.../sink/writer/undo/UndoRecoveryExecutorTest.java | 234 +++++
.../writer/undo/UndoRecoveryManagerITCase.java | 999 +++++++++++++++++++++
.../apache/fluss/flink/utils/TestLogScanner.java | 159 ++++
.../apache/fluss/flink/utils/TestUpsertWriter.java | 118 +++
13 files changed, 2722 insertions(+)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
index 04357f598..d255e340a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
@@ -106,6 +106,16 @@ public interface LogScanner extends AutoCloseable {
*/
void unsubscribe(long partitionId, int bucket);
+ /**
+ * Unsubscribe from the given bucket of a non-partitioned table
dynamically.
+ *
+ * <p>Please use {@link #unsubscribe(long, int)} to unsubscribe a
partitioned table.
+ *
+ * @param bucket the table bucket to unsubscribe.
+ * @throws java.lang.IllegalStateException if the table is a partitioned
table.
+ */
+ void unsubscribe(int bucket);
+
/**
* Subscribe to the given partitioned table bucket from beginning
dynamically. If the table
* bucket is already subscribed, the start offset will be updated.
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
index ebcae05c5..33181bd7a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
@@ -225,6 +225,23 @@ public class LogScannerImpl implements LogScanner {
}
}
+ @Override
+ public void unsubscribe(int bucket) {
+ if (isPartitionedTable) {
+ throw new IllegalStateException(
+ "The table is a partitioned table, please use "
+ + "\"unsubscribe(long partitionId, int bucket)\"
to "
+ + "unsubscribe a partitioned bucket instead.");
+ }
+ acquireAndEnsureOpen();
+ try {
+ TableBucket tableBucket = new TableBucket(tableId, bucket);
+
this.logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket));
+ } finally {
+ release();
+ }
+ }
+
@Override
public void wakeup() {
logFetcher.wakeup();
diff --git
a/fluss-common/src/main/java/org/apache/fluss/utils/ByteArrayWrapper.java
b/fluss-common/src/main/java/org/apache/fluss/utils/ByteArrayWrapper.java
new file mode 100644
index 000000000..da547d36b
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/ByteArrayWrapper.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.utils;
+
+import java.util.Arrays;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * A wrapper for byte[] that provides proper equals() and hashCode()
implementations for use as Map
+ * keys.
+ *
+ * <p>The hashCode is pre-computed at construction time for better performance
when used in
+ * hash-based collections.
+ */
+public final class ByteArrayWrapper {
+
+ private final byte[] data;
+ private final int hashCode;
+
+ public ByteArrayWrapper(byte[] data) {
+ this.data = checkNotNull(data, "data cannot be null");
+ this.hashCode = Arrays.hashCode(data);
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ByteArrayWrapper)) {
+ return false;
+ }
+ return Arrays.equals(data, ((ByteArrayWrapper) o).data);
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public String toString() {
+ return "ByteArrayWrapper{length=" + data.length + "}";
+ }
+}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/utils/ByteArrayWrapperTest.java
b/fluss-common/src/test/java/org/apache/fluss/utils/ByteArrayWrapperTest.java
new file mode 100644
index 000000000..f0f69d112
--- /dev/null
+++
b/fluss-common/src/test/java/org/apache/fluss/utils/ByteArrayWrapperTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ByteArrayWrapper}. */
+class ByteArrayWrapperTest {
+
+ @Test
+ void testEqualsAndHashCode() {
+ byte[] data1 = new byte[] {1, 2, 3};
+ byte[] data2 = new byte[] {1, 2, 3};
+ byte[] data3 = new byte[] {1, 2, 4};
+
+ ByteArrayWrapper wrapper1 = new ByteArrayWrapper(data1);
+ ByteArrayWrapper wrapper2 = new ByteArrayWrapper(data2);
+ ByteArrayWrapper wrapper3 = new ByteArrayWrapper(data3);
+
+ // Same content should be equal
+ assertThat(wrapper1).isEqualTo(wrapper2);
+ assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());
+
+ // Different content should not be equal
+ assertThat(wrapper1).isNotEqualTo(wrapper3);
+ }
+
+ @Test
+ void testNullDataThrowsException() {
+ assertThatThrownBy(() -> new ByteArrayWrapper(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("data cannot be null");
+ }
+
+ @Test
+ void testAsMapKey() {
+ byte[] key1 = new byte[] {1, 2, 3};
+ byte[] key2 = new byte[] {1, 2, 3}; // Same content, different array
+ byte[] key3 = new byte[] {4, 5, 6};
+
+ Map<ByteArrayWrapper, String> map = new HashMap<>();
+ map.put(new ByteArrayWrapper(key1), "value1");
+
+ // Should find with same content
+ assertThat(map.get(new ByteArrayWrapper(key2))).isEqualTo("value1");
+
+ // Should not find with different content
+ assertThat(map.get(new ByteArrayWrapper(key3))).isNull();
+
+ // Should overwrite with same key
+ map.put(new ByteArrayWrapper(key2), "value2");
+ assertThat(map).hasSize(1);
+ assertThat(map.get(new ByteArrayWrapper(key1))).isEqualTo("value2");
+ }
+
+ @Test
+ void testGetData() {
+ byte[] data = new byte[] {1, 2, 3};
+ ByteArrayWrapper wrapper = new ByteArrayWrapper(data);
+
+ assertThat(wrapper.getData()).isSameAs(data);
+ }
+
+ @Test
+ void testEmptyArray() {
+ ByteArrayWrapper wrapper1 = new ByteArrayWrapper(new byte[0]);
+ ByteArrayWrapper wrapper2 = new ByteArrayWrapper(new byte[0]);
+
+ assertThat(wrapper1).isEqualTo(wrapper2);
+ assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());
+ }
+
+ @Test
+ void testToString() {
+ ByteArrayWrapper wrapper = new ByteArrayWrapper(new byte[] {1, 2, 3});
+ assertThat(wrapper.toString()).contains("length=3");
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java
new file mode 100644
index 000000000..47797d2c4
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.ByteArrayWrapper;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Encapsulates the recovery state for a single bucket.
+ *
+ * <p>This class tracks:
+ *
+ * <ul>
+ * <li>The bucket being recovered
+ * <li>The checkpoint offset (start point for reading changelog)
+ * <li>The log end offset (end point for reading changelog)
+ * <li>Processed primary keys for deduplication (streaming execution)
+ * <li>Progress tracking during changelog scanning
+ * </ul>
+ */
+public class BucketRecoveryContext {
+
+ private final TableBucket bucket;
+ private final long checkpointOffset;
+ private final long logEndOffset;
+
+ private final Set<ByteArrayWrapper> processedKeys;
+ private long lastProcessedOffset;
+ private int totalRecordsProcessed;
+
+ public BucketRecoveryContext(TableBucket bucket, long checkpointOffset,
long logEndOffset) {
+ this.bucket = bucket;
+ this.checkpointOffset = checkpointOffset;
+ this.logEndOffset = logEndOffset;
+ this.processedKeys = new HashSet<>();
+ this.lastProcessedOffset = checkpointOffset;
+ this.totalRecordsProcessed = 0;
+ }
+
+ public TableBucket getBucket() {
+ return bucket;
+ }
+
+ public long getCheckpointOffset() {
+ return checkpointOffset;
+ }
+
+ public long getLogEndOffset() {
+ return logEndOffset;
+ }
+
+ public Set<ByteArrayWrapper> getProcessedKeys() {
+ return processedKeys;
+ }
+
+ /**
+ * Checks if this bucket needs recovery.
+ *
+ * @return true if checkpoint offset is less than log end offset
+ */
+ public boolean needsRecovery() {
+ return checkpointOffset < logEndOffset;
+ }
+
+ /**
+ * Checks if changelog scanning is complete for this bucket.
+ *
+ * <p>Complete means either:
+ *
+ * <ul>
+ * <li>No recovery is needed (checkpointOffset >= logEndOffset), or
+ * <li>The last processed offset has reached or passed logEndOffset - 1
(lastProcessedOffset
+ * >= logEndOffset - 1)
+ * </ul>
+ *
+ * @return true if changelog scanning is complete
+ */
+ public boolean isComplete() {
+ // If no recovery is needed, we're already complete
+ if (!needsRecovery()) {
+ return true;
+ }
+ return lastProcessedOffset >= logEndOffset - 1;
+ }
+
+ /**
+ * Records that a changelog record has been processed.
+ *
+ * @param offset the offset of the processed record
+ */
+ public void recordProcessed(long offset) {
+ lastProcessedOffset = offset;
+ totalRecordsProcessed++;
+ }
+
+ public int getTotalRecordsProcessed() {
+ return totalRecordsProcessed;
+ }
+
+ public long getLastProcessedOffset() {
+ return lastProcessedOffset;
+ }
+
+ @Override
+ public String toString() {
+ return "BucketRecoveryContext{"
+ + "bucket="
+ + bucket
+ + ", checkpointOffset="
+ + checkpointOffset
+ + ", logEndOffset="
+ + logEndOffset
+ + ", processedKeys="
+ + processedKeys.size()
+ + ", complete="
+ + isComplete()
+ + '}';
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java
new file mode 100644
index 000000000..80d9baac8
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.utils.ByteArrayWrapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Computes and executes undo operations from changelog records using
streaming execution.
+ *
+ * <p>This class uses a {@link KeyEncoder} to encode primary keys as byte
arrays for efficient
+ * deduplication, and executes undo operations immediately via {@link
UpsertWriter}. The undo logic
+ * is:
+ *
+ * <ul>
+ * <li>{@code INSERT} → Delete the row (it didn't exist at checkpoint)
+ * <li>{@code UPDATE_BEFORE} → Restore the old value (this was the state at
checkpoint)
+ * <li>{@code UPDATE_AFTER} → Ignored (UPDATE_BEFORE already handled the
undo)
+ * <li>{@code DELETE} → Re-insert the deleted row (it existed at checkpoint)
+ * </ul>
+ *
+ * <p>For each primary key, only the first change after checkpoint determines
the undo action. The
+ * original row from {@link ScanRecord} is directly used without copying, as
each ScanRecord
+ * contains an independent row instance.
+ */
+public class UndoComputer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(UndoComputer.class);
+
+ private final KeyEncoder keyEncoder;
+ private final UpsertWriter writer;
+
+ /**
+ * Creates an UndoComputer.
+ *
+ * @param keyEncoder the key encoder for primary key deduplication
+ * @param writer the writer for all undo operations
+ */
+ public UndoComputer(KeyEncoder keyEncoder, UpsertWriter writer) {
+ this.keyEncoder = keyEncoder;
+ this.writer = writer;
+ }
+
+ /**
+ * Processes a changelog record and executes the undo operation
immediately.
+ *
+ * <p>Only the first change for each primary key triggers an undo
operation.
+ *
+ * @param record the changelog record
+ * @param processedKeys set of already processed primary keys for
deduplication
+ * @return CompletableFuture for the async write, or null if skipped
+ */
+ @Nullable
+ public CompletableFuture<?> processRecord(
+ ScanRecord record, Set<ByteArrayWrapper> processedKeys) {
+ // Skip UPDATE_AFTER before key encoding — UPDATE_BEFORE already
handled the undo
+ if (record.getChangeType() == ChangeType.UPDATE_AFTER) {
+ return null;
+ }
+
+ byte[] encodedKey = keyEncoder.encodeKey(record.getRow());
+ ByteArrayWrapper keyWrapper = new ByteArrayWrapper(encodedKey);
+
+ // Skip if we already processed this key
+ if (processedKeys.contains(keyWrapper)) {
+ return null;
+ }
+
+ CompletableFuture<?> future = computeAndExecuteUndo(record);
+ if (future != null) {
+ processedKeys.add(keyWrapper);
+ }
+ return future;
+ }
+
+ /**
+ * Computes and executes the undo operation for a changelog record.
+ *
+ * <p>UPDATE_AFTER is ignored because UPDATE_BEFORE and UPDATE_AFTER
always come in pairs, with
+ * UPDATE_BEFORE appearing first. The undo logic is determined by
UPDATE_BEFORE which contains
+ * the old value needed for restoration.
+ *
+ * <p>The original row is directly used without copying because:
+ *
+ * <ul>
+ * <li>Each ScanRecord contains an independent GenericRow instance
(created in
+ * CompletedFetch.toScanRecord)
+ * <li>For DELETE operations, UpsertWriter.delete() only needs the
primary key fields
+ * <li>For UPSERT operations, the original row contains the exact data
to restore
+ * </ul>
+ *
+ * @param record the changelog record
+ * @return CompletableFuture for the async write, or null if no action
needed (e.g.,
+ * UPDATE_AFTER)
+ */
+ @Nullable
+ private CompletableFuture<?> computeAndExecuteUndo(ScanRecord record) {
+ ChangeType changeType = record.getChangeType();
+ InternalRow row = record.getRow();
+
+ switch (changeType) {
+ case INSERT:
+ // Row was inserted after checkpoint → delete it
+ return writer.delete(row);
+
+ case UPDATE_BEFORE:
+ // Row was updated after checkpoint → restore old value
+ return writer.upsert(row);
+
+ case UPDATE_AFTER:
+ // Ignored: UPDATE_BEFORE already handled the undo logic for
this key.
+ return null;
+
+ case DELETE:
+ // Row was deleted after checkpoint → re-insert it
+ return writer.upsert(row);
+
+ default:
+ LOG.warn("Unexpected change type for undo: {}", changeType);
+ return null;
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
new file mode 100644
index 000000000..a1f3f6637
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.ByteArrayWrapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Executes undo recovery for multiple buckets using streaming execution.
+ *
+ * <p>This executor manages:
+ *
+ * <ul>
+ * <li>Single LogScanner with multiple bucket subscriptions
+ * <li>Changelog reading with immediate undo operation execution
+ * <li>Async write operations via UpsertWriter
+ * </ul>
+ *
+ * <p>The execution flow (streaming):
+ *
+ * <ol>
+ * <li>Subscribe all buckets to the LogScanner
+ * <li>Poll changelog records and execute undo operations immediately
+ * <li>Collect CompletableFutures for completion tracking
+ * <li>Flush and wait for all futures to ensure all writes complete
+ * </ol>
+ */
+public class UndoRecoveryExecutor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(UndoRecoveryExecutor.class);
+
+ /** Fixed poll timeout in milliseconds. */
+ private static final long POLL_TIMEOUT_MS = 10_000;
+
+ /** Default maximum total wait time in milliseconds before failing (1
hour). */
+ private static final long DEFAULT_MAX_TOTAL_WAIT_TIME_MS = 60 * 60 * 1000;
// 1 hour
+
+ /** Interval for logging progress during long waits (5 minutes). */
+ private static final long PROGRESS_LOG_INTERVAL_MS = 5 * 60 * 1000; // 5
minutes
+
+ private final LogScanner scanner;
+ private final UpsertWriter writer;
+ private final UndoComputer undoComputer;
+
+ // Configurable timeout parameter
+ private final long maxTotalWaitTimeMs;
+
+ public UndoRecoveryExecutor(
+ LogScanner scanner, UpsertWriter writer, UndoComputer
undoComputer) {
+ this(scanner, writer, undoComputer, DEFAULT_MAX_TOTAL_WAIT_TIME_MS);
+ }
+
+ /**
+ * Creates an executor with custom timeout configuration (for testing).
+ *
+ * @param scanner the log scanner
+ * @param writer the upsert writer
+ * @param undoComputer the undo computer
+ * @param maxTotalWaitTimeMs maximum total wait time before failing
+ */
+ public UndoRecoveryExecutor(
+ LogScanner scanner,
+ UpsertWriter writer,
+ UndoComputer undoComputer,
+ long maxTotalWaitTimeMs) {
+ this.scanner = scanner;
+ this.writer = writer;
+ this.undoComputer = undoComputer;
+ this.maxTotalWaitTimeMs = maxTotalWaitTimeMs;
+ }
+
+ /**
+ * Executes undo recovery for the given bucket contexts.
+ *
+ * @param contexts the bucket recovery contexts (must have target offsets
set)
+ * @throws Exception if recovery fails
+ */
+ public void execute(List<BucketRecoveryContext> contexts) throws Exception
{
+ // Filter contexts that need recovery
+ List<BucketRecoveryContext> toRecover =
filterContextsNeedingRecovery(contexts);
+ if (toRecover.isEmpty()) {
+ LOG.debug("No buckets need recovery after filtering");
+ return;
+ }
+
+ LOG.debug("Executing undo recovery for {} bucket(s)",
toRecover.size());
+
+ // Subscribe and read changelog with streaming execution
+ subscribeAll(toRecover);
+ List<CompletableFuture<?>> allFutures =
readChangelogAndExecute(toRecover);
+
+ // Flush first, then wait for all async writes to complete
+ writer.flush();
+ if (!allFutures.isEmpty()) {
+ CompletableFuture.allOf(allFutures.toArray(new
CompletableFuture[0])).get();
+ }
+
+ // Log summary at INFO level for visibility
+ int totalUndoOps = 0;
+ for (BucketRecoveryContext ctx : toRecover) {
+ totalUndoOps += ctx.getProcessedKeys().size();
+ }
+
+ LOG.info(
+ "Undo recovery execution completed: {} bucket(s), {} total
undo operation(s)",
+ toRecover.size(),
+ totalUndoOps);
+ }
+
+ private List<BucketRecoveryContext> filterContextsNeedingRecovery(
+ List<BucketRecoveryContext> contexts) {
+ List<BucketRecoveryContext> result = new ArrayList<>();
+ for (BucketRecoveryContext ctx : contexts) {
+ if (ctx.needsRecovery()) {
+ LOG.debug(
+ "Bucket {} needs recovery: checkpoint={},
logEndOffset={}",
+ ctx.getBucket(),
+ ctx.getCheckpointOffset(),
+ ctx.getLogEndOffset());
+ result.add(ctx);
+ } else {
+ LOG.debug("Bucket {} already up-to-date, no recovery needed",
ctx.getBucket());
+ }
+ }
+ return result;
+ }
+
+ private void subscribeAll(List<BucketRecoveryContext> contexts) {
+ for (BucketRecoveryContext ctx : contexts) {
+ TableBucket bucket = ctx.getBucket();
+ if (bucket.getPartitionId() != null) {
+ scanner.subscribe(
+ bucket.getPartitionId(), bucket.getBucket(),
ctx.getCheckpointOffset());
+ } else {
+ scanner.subscribe(bucket.getBucket(),
ctx.getCheckpointOffset());
+ }
+ }
+ }
+
+ private List<CompletableFuture<?>>
readChangelogAndExecute(List<BucketRecoveryContext> contexts)
+ throws Exception {
+ List<CompletableFuture<?>> allFutures = new ArrayList<>();
+
+ // Early exit if all contexts are already complete (no records to read)
+ if (allComplete(contexts)) {
+ LOG.debug("All buckets already complete, no changelog reading
needed");
+ return allFutures;
+ }
+
+ long startTimeMs = System.currentTimeMillis();
+ long lastProgressLogTime = System.currentTimeMillis();
+ Set<TableBucket> unsubscribedBuckets = new HashSet<>();
+
+ while (!allComplete(contexts)) {
+ ScanRecords records =
scanner.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+
+ if (records.isEmpty()) {
+ // Check if we've exceeded the maximum total wait time
+ long elapsedMs = System.currentTimeMillis() - startTimeMs;
+ if (elapsedMs >= maxTotalWaitTimeMs) {
+ throw new RuntimeException(
+ String.format(
+ "Undo recovery timed out: unable to read
all changelog records after "
+ + "%.1f minutes of waiting. %d
bucket(s) still incomplete. "
+ + "The job will restart and retry
the recovery.",
+ elapsedMs / 60000.0,
countIncomplete(contexts)));
+ }
+
+ LOG.debug("Empty poll (total elapsed: {}ms)", elapsedMs);
+ } else {
+ // Process records for each bucket
+ for (BucketRecoveryContext ctx : contexts) {
+ if (ctx.isComplete()) {
+ continue;
+ }
+ List<ScanRecord> bucketRecords =
records.records(ctx.getBucket());
+ if (!bucketRecords.isEmpty()) {
+ processRecords(ctx, bucketRecords, allFutures);
+ }
+ // Unsubscribe completed buckets to stop fetching data
from them
+ if (ctx.isComplete() &&
unsubscribedBuckets.add(ctx.getBucket())) {
+ unsubscribeBucket(ctx.getBucket());
+ }
+ }
+ }
+
+ // Log progress periodically (at the end of each loop iteration)
+ long now = System.currentTimeMillis();
+ if (now - lastProgressLogTime >= PROGRESS_LOG_INTERVAL_MS) {
+ long elapsedMs = now - startTimeMs;
+ LOG.info(
+ "Undo recovery waiting for changelog records: {}
bucket(s) incomplete, "
+ + "waited {} minutes so far (max: {} minutes)",
+ countIncomplete(contexts),
+ String.format("%.1f", elapsedMs / 60000.0),
+ String.format("%.1f", maxTotalWaitTimeMs / 60000.0));
+ lastProgressLogTime = now;
+ }
+ }
+
+ // Log summary
+ if (LOG.isDebugEnabled()) {
+ for (BucketRecoveryContext ctx : contexts) {
+ LOG.debug(
+ "Bucket {} read {} records, executed {} undo
operations",
+ ctx.getBucket(),
+ ctx.getTotalRecordsProcessed(),
+ ctx.getProcessedKeys().size());
+ }
+ }
+
+ return allFutures;
+ }
+
+ private void unsubscribeBucket(TableBucket bucket) {
+ if (bucket.getPartitionId() != null) {
+ scanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
+ } else {
+ scanner.unsubscribe(bucket.getBucket());
+ }
+ }
+
+ private void processRecords(
+ BucketRecoveryContext ctx,
+ List<ScanRecord> records,
+ List<CompletableFuture<?>> futures) {
+ Set<ByteArrayWrapper> processedKeys = ctx.getProcessedKeys();
+ for (ScanRecord record : records) {
+ CompletableFuture<?> future = undoComputer.processRecord(record,
processedKeys);
+ if (future != null) {
+ futures.add(future);
+ }
+ ctx.recordProcessed(record.logOffset());
+ if (ctx.isComplete()) {
+ break;
+ }
+ }
+ }
+
+ private boolean allComplete(List<BucketRecoveryContext> contexts) {
+ for (BucketRecoveryContext ctx : contexts) {
+ if (!ctx.isComplete()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private int countIncomplete(List<BucketRecoveryContext> contexts) {
+ int count = 0;
+ for (BucketRecoveryContext ctx : contexts) {
+ if (!ctx.isComplete()) {
+ count++;
+ }
+ }
+ return count;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManager.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManager.java
new file mode 100644
index 000000000..fa0adc10f
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManager.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.writer.Upsert;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.rpc.protocol.MergeMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages undo recovery operations during Flink sink writer initialization.
+ *
+ * <p>This manager ensures exactly-once semantics by reversing any writes that
occurred after the
+ * last successful checkpoint but before a failure. The recovery process
involves:
+ *
+ * <ol>
+ * <li>Reading changelog records from checkpoint offset to current latest
offset
+ * <li>Computing inverse operations for each affected primary key
+ * <li>Applying undo operations using OVERWRITE mode to restore bucket state
+ * </ol>
+ *
+ * <p><b>Undo Logic:</b> For each primary key, only the first change after
checkpoint determines the
+ * undo action:
+ *
+ * <ul>
+ * <li>{@code INSERT} → Delete the row (it didn't exist at checkpoint)
+ * <li>{@code UPDATE_BEFORE} → Restore the old value (this was the state at
checkpoint)
+ * <li>{@code UPDATE_AFTER} → Ignored (UPDATE_BEFORE already handled the
undo)
+ * <li>{@code DELETE} → Re-insert the deleted row (it existed at checkpoint)
+ * </ul>
+ *
+ * <p>This class delegates to specialized components:
+ *
+ * <ul>
+ * <li>{@link UndoComputer} - Undo operation computation using {@link
KeyEncoder} for primary key
+ * encoding
+ * <li>{@link UndoRecoveryExecutor} - Changelog reading and write execution
+ * </ul>
+ *
+ * @see MergeMode#OVERWRITE
+ */
+public class UndoRecoveryManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(UndoRecoveryManager.class);
+
+ private final Table table;
+ @Nullable private final int[] targetColumnIndexes;
+
+ /**
+ * Creates a new UndoRecoveryManager.
+ *
+ * @param table the Fluss table to perform recovery on
+ * @param targetColumnIndexes optional target columns for partial update
(null for full row)
+ */
+ public UndoRecoveryManager(Table table, @Nullable int[]
targetColumnIndexes) {
+ this.table = table;
+ this.targetColumnIndexes = targetColumnIndexes;
+
+ logPartialUpdateConfig(targetColumnIndexes);
+ }
+
+ private void logPartialUpdateConfig(@Nullable int[] targetColumnIndexes) {
+ if (targetColumnIndexes != null) {
+ LOG.info(
+ "Undo recovery configured with partial update columns: {}",
+ Arrays.toString(targetColumnIndexes));
+ }
+ }
+
+ // ==================== Public API ====================
+
+ /**
+ * Performs undo recovery for buckets with known undo offsets.
+ *
+ * <p>The caller is responsible for providing both the checkpoint offset
(start) and the log end
+ * offset (end) for each bucket. This avoids redundant listOffset calls
since the caller
+ * typically already has this information from determining which buckets
need recovery.
+ *
+ * @param bucketUndoOffsets map of bucket to its undo offsets
(checkpointOffset, logEndOffset)
+ * @param subtaskIndex the Flink subtask index (for logging)
+ * @param parallelism the total parallelism (for logging)
+ * @throws Exception if recovery fails
+ */
+ public void performUndoRecovery(
+ Map<TableBucket, UndoOffsets> bucketUndoOffsets, int subtaskIndex,
int parallelism)
+ throws Exception {
+
+ if (bucketUndoOffsets.isEmpty()) {
+ LOG.debug("No buckets to recover on subtask {}/{}", subtaskIndex,
parallelism);
+ return;
+ }
+
+ LOG.info(
+ "Starting undo recovery for {} bucket(s) on subtask {}/{}",
+ bucketUndoOffsets.size(),
+ subtaskIndex,
+ parallelism);
+
+ List<BucketRecoveryContext> contexts =
buildRecoveryContexts(bucketUndoOffsets);
+
+ try (LogScanner scanner = createLogScanner()) {
+ Upsert recoveryUpsert =
table.newUpsert().mergeMode(MergeMode.OVERWRITE);
+ if (targetColumnIndexes != null) {
+ recoveryUpsert =
recoveryUpsert.partialUpdate(targetColumnIndexes);
+ }
+ UpsertWriter writer = recoveryUpsert.createWriter();
+
+ // Create UndoComputer with writer for streaming execution
+ Schema schema = table.getTableInfo().getSchema();
+ // Use CompactedKeyEncoder directly instead of the deprecated
KeyEncoder.of(),
+ // which requires a lake format parameter that is not applicable
here.
+ KeyEncoder keyEncoder =
+ CompactedKeyEncoder.createKeyEncoder(
+ schema.getRowType(),
schema.getPrimaryKey().get().getColumnNames());
+ UndoComputer undoComputer = new UndoComputer(keyEncoder, writer);
+
+ UndoRecoveryExecutor executor = new UndoRecoveryExecutor(scanner,
writer, undoComputer);
+ // This is a blocking operation that reads changelog and executes
undo operations.
+ // It may take a significant amount of time depending on the
volume of records to
+ // process.
+ executor.execute(contexts);
+ }
+
+ // Calculate total undo operations for summary
+ int totalUndoOps = 0;
+ for (BucketRecoveryContext ctx : contexts) {
+ totalUndoOps += ctx.getProcessedKeys().size();
+ }
+
+ LOG.info(
+ "Completed undo recovery for {} bucket(s) with {} undo
operation(s) on subtask {}/{}",
+ bucketUndoOffsets.size(),
+ totalUndoOps,
+ subtaskIndex,
+ parallelism);
+ }
+
+ /**
+ * Encapsulates the offset information needed for undo recovery of a
bucket.
+ *
+ * <p>This class holds the checkpoint offset (recovery start point) and
log end offset (recovery
+ * end point) for a bucket's undo recovery operation.
+ */
+ public static class UndoOffsets {
+ private final long checkpointOffset;
+ private final long logEndOffset;
+
+ /**
+ * Creates a new UndoOffsets.
+ *
+ * @param checkpointOffset the checkpoint offset (start point,
inclusive)
+ * @param logEndOffset the log end offset (end point, exclusive)
+ * @throws IllegalArgumentException if offsets are negative or
checkpointOffset >
+ * logEndOffset
+ */
+ public UndoOffsets(long checkpointOffset, long logEndOffset) {
+ if (checkpointOffset < 0) {
+ throw new IllegalArgumentException(
+ "checkpointOffset must be non-negative: " +
checkpointOffset);
+ }
+ if (logEndOffset < 0) {
+ throw new IllegalArgumentException(
+ "logEndOffset must be non-negative: " + logEndOffset);
+ }
+ if (checkpointOffset > logEndOffset) {
+ throw new IllegalArgumentException(
+ String.format(
+ "checkpointOffset (%d) must not be greater
than logEndOffset (%d)",
+ checkpointOffset, logEndOffset));
+ }
+ // Note: checkpointOffset == logEndOffset is allowed, it simply
means no recovery needed
+ this.checkpointOffset = checkpointOffset;
+ this.logEndOffset = logEndOffset;
+ }
+
+ public long getCheckpointOffset() {
+ return checkpointOffset;
+ }
+
+ public long getLogEndOffset() {
+ return logEndOffset;
+ }
+ }
+
+ // ==================== Scanner Factory ====================
+
+ /**
+ * Creates a LogScanner for reading changelog records.
+ *
+ * <p>This method is protected to allow test subclasses to inject custom
scanners for fault
+ * injection testing.
+ *
+ * @return a new LogScanner instance
+ */
+ protected LogScanner createLogScanner() {
+ return table.newScan().createLogScanner();
+ }
+
+ // ==================== Recovery Context Building ====================
+
+ private List<BucketRecoveryContext> buildRecoveryContexts(
+ Map<TableBucket, UndoOffsets> bucketUndoOffsets) {
+
+ List<BucketRecoveryContext> contexts = new ArrayList<>();
+
+ for (Map.Entry<TableBucket, UndoOffsets> entry :
bucketUndoOffsets.entrySet()) {
+ TableBucket bucket = entry.getKey();
+ UndoOffsets offsets = entry.getValue();
+
+ BucketRecoveryContext ctx =
+ new BucketRecoveryContext(
+ bucket, offsets.getCheckpointOffset(),
offsets.getLogEndOffset());
+ contexts.add(ctx);
+ }
+
+ return contexts;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoComputerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoComputerTest.java
new file mode 100644
index 000000000..d41bb84b4
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoComputerTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.flink.utils.TestUpsertWriter;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.ByteArrayWrapper;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link UndoComputer}.
+ *
+ * <p>Tests verify: (1) ChangeType to undo operation mapping, (2) primary key
deduplication.
+ */
+class UndoComputerTest {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(DataTypes.INT(), DataTypes.STRING(), DataTypes.INT());
+ private static final List<String> PRIMARY_KEY_COLUMNS =
Collections.singletonList("f0");
+
+ private KeyEncoder keyEncoder;
+ private TestUpsertWriter mockWriter;
+ private UndoComputer undoComputer;
+
+ @BeforeEach
+ void setUp() {
+ keyEncoder = KeyEncoder.of(ROW_TYPE, PRIMARY_KEY_COLUMNS, null);
+ mockWriter = new TestUpsertWriter();
+ undoComputer = new UndoComputer(keyEncoder, mockWriter);
+ }
+
+ // ==================== Undo Logic Mapping (Parameterized)
====================
+
+ /**
+ * Parameterized test for ChangeType to undo operation mapping.
+ *
+ * <p>Validates: Requirements 4.3 - Correct undo logic mapping.
+ *
+ * <p>Mapping rules:
+ *
+ * <ul>
+ * <li>INSERT → delete (undo insert by deleting)
+ * <li>UPDATE_BEFORE → upsert (restore old value)
+ * <li>UPDATE_AFTER → skip (no action needed)
+ * <li>DELETE → upsert (restore deleted row)
+ * </ul>
+ */
+ @ParameterizedTest(name = "{0} → expectDelete={1}, expectUpsert={2},
expectNull={3}")
+ @MethodSource("changeTypeUndoMappingProvider")
+ void testChangeTypeToUndoMapping(
+ ChangeType changeType,
+ boolean expectDelete,
+ boolean expectUpsert,
+ boolean expectNullFuture) {
+ Set<ByteArrayWrapper> processedKeys = new HashSet<>();
+ GenericRow testRow = row(1, "test", 100);
+ ScanRecord record = new ScanRecord(0L, 0L, changeType, testRow);
+
+ CompletableFuture<?> future = undoComputer.processRecord(record,
processedKeys);
+
+ if (expectNullFuture) {
+ assertThat(future).isNull();
+ assertThat(processedKeys).isEmpty();
+ } else {
+ assertThat(future).isNotNull();
+ assertThat(processedKeys).hasSize(1);
+ }
+ assertThat(mockWriter.getDeleteCount()).isEqualTo(expectDelete ? 1 :
0);
+ assertThat(mockWriter.getUpsertCount()).isEqualTo(expectUpsert ? 1 :
0);
+
+ if (expectDelete) {
+ assertThat(mockWriter.getLastDeletedRow()).isSameAs(testRow);
+ }
+ if (expectUpsert) {
+ assertThat(mockWriter.getLastUpsertedRow()).isSameAs(testRow);
+ }
+ }
+
+ static Stream<Arguments> changeTypeUndoMappingProvider() {
+ return Stream.of(
+ // ChangeType, expectDelete, expectUpsert, expectNullFuture
+ Arguments.of(ChangeType.INSERT, true, false, false),
+ Arguments.of(ChangeType.UPDATE_BEFORE, false, true, false),
+ Arguments.of(ChangeType.UPDATE_AFTER, false, false, true),
+ Arguments.of(ChangeType.DELETE, false, true, false));
+ }
+
+ // ==================== Primary Key Deduplication ====================
+
+ /**
+ * Test primary key deduplication: only first occurrence of each key is
processed.
+ *
+ * <p>Validates: Requirements 2.2, 2.3 - Primary key deduplication and key
tracking.
+ */
+ @Test
+ void testPrimaryKeyDeduplication() {
+ Set<ByteArrayWrapper> processedKeys = new HashSet<>();
+
+ // First record for key=1 (INSERT → delete)
+ GenericRow row1 = row(1, "first", 100);
+ CompletableFuture<?> f1 =
+ undoComputer.processRecord(
+ new ScanRecord(0L, 0L, ChangeType.INSERT, row1),
processedKeys);
+
+ // Second record for same key=1 (should be skipped)
+ GenericRow row2 = row(1, "second", 200);
+ CompletableFuture<?> f2 =
+ undoComputer.processRecord(
+ new ScanRecord(1L, 0L, ChangeType.UPDATE_BEFORE,
row2), processedKeys);
+
+ // Third record for different key=2 (INSERT → delete)
+ GenericRow row3 = row(2, "third", 300);
+ CompletableFuture<?> f3 =
+ undoComputer.processRecord(
+ new ScanRecord(2L, 0L, ChangeType.INSERT, row3),
processedKeys);
+
+ // Fourth record for key=2 (should be skipped)
+ GenericRow row4 = row(2, "fourth", 400);
+ CompletableFuture<?> f4 =
+ undoComputer.processRecord(
+ new ScanRecord(3L, 0L, ChangeType.DELETE, row4),
processedKeys);
+
+ // Verify: only first occurrence of each key processed
+ assertThat(f1).isNotNull();
+ assertThat(f2).as("Duplicate key=1 should be skipped").isNull();
+ assertThat(f3).isNotNull();
+ assertThat(f4).as("Duplicate key=2 should be skipped").isNull();
+
+ // Verify: 2 unique keys tracked
+ assertThat(processedKeys).hasSize(2);
+
+ // Verify: 2 deletes (one per unique key, both were INSERTs)
+ assertThat(mockWriter.getDeleteCount()).isEqualTo(2);
+ assertThat(mockWriter.getUpsertCount()).isEqualTo(0);
+
+ // Verify: correct rows were processed
+ assertThat(mockWriter.getAllDeletedRows()).containsExactly(row1, row3);
+ }
+
+ /**
+ * Test that UPDATE_AFTER records don't add keys to processed set.
+ *
+ * <p>This ensures subsequent records for the same key can still be
processed.
+ */
+ @Test
+ void testUpdateAfterDoesNotBlockSubsequentRecords() {
+ Set<ByteArrayWrapper> processedKeys = new HashSet<>();
+
+ // UPDATE_AFTER for key=1 (skipped, key NOT added to set)
+ undoComputer.processRecord(
+ new ScanRecord(0L, 0L, ChangeType.UPDATE_AFTER, row(1,
"after", 100)),
+ processedKeys);
+
+ // INSERT for same key=1 (should be processed since UPDATE_AFTER
didn't add key)
+ CompletableFuture<?> f =
+ undoComputer.processRecord(
+ new ScanRecord(1L, 0L, ChangeType.INSERT, row(1,
"insert", 200)),
+ processedKeys);
+
+ assertThat(f).isNotNull();
+ assertThat(mockWriter.getDeleteCount()).isEqualTo(1);
+ assertThat(processedKeys).hasSize(1);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutorTest.java
new file mode 100644
index 000000000..d60c10d9a
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutorTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.flink.utils.TestLogScanner;
+import org.apache.fluss.flink.utils.TestUpsertWriter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link UndoRecoveryExecutor}.
+ *
+ * <p>Tests verify: (1) streaming execution with futures, (2) multi-bucket
recovery, (3) exception
+ * propagation.
+ */
+class UndoRecoveryExecutorTest {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(DataTypes.INT(), DataTypes.STRING(), DataTypes.INT());
+ private static final List<String> PRIMARY_KEY_COLUMNS =
Collections.singletonList("f0");
+ private static final long TABLE_ID = 1L;
+
+ private KeyEncoder keyEncoder;
+ private TestUpsertWriter mockWriter;
+ private TestLogScanner mockScanner;
+ private UndoComputer undoComputer;
+ private UndoRecoveryExecutor executor;
+
+ // Short timeout value for testing (total ~300ms instead of 1 hour)
+ private static final long TEST_MAX_TOTAL_WAIT_TIME_MS = 300;
+
+ @BeforeEach
+ void setUp() {
+ keyEncoder = KeyEncoder.of(ROW_TYPE, PRIMARY_KEY_COLUMNS, null);
+ mockWriter = new TestUpsertWriter();
+ mockScanner = new TestLogScanner();
+ undoComputer = new UndoComputer(keyEncoder, mockWriter);
+ // Use short timeout for testing
+ executor =
+ new UndoRecoveryExecutor(
+ mockScanner, mockWriter, undoComputer,
TEST_MAX_TOTAL_WAIT_TIME_MS);
+ }
+
+ /**
+ * Test multi-bucket recovery with mixed ChangeTypes and key deduplication.
+ *
+ * <p>Validates: Requirements 3.3 - All futures complete after execute.
+ */
+ @Test
+ void testMultiBucketRecoveryWithDeduplication() throws Exception {
+ TableBucket bucket0 = new TableBucket(TABLE_ID, 0);
+ TableBucket bucket1 = new TableBucket(TABLE_ID, 1);
+
+ BucketRecoveryContext ctx0 = new BucketRecoveryContext(bucket0, 0L,
4L);
+
+ BucketRecoveryContext ctx1 = new BucketRecoveryContext(bucket1, 0L,
3L);
+
+ // Bucket0: INSERT(key=1), UPDATE_BEFORE(key=1, dup), INSERT(key=2),
DELETE(key=3)
+ mockScanner.setRecordsForBucket(
+ bucket0,
+ Arrays.asList(
+ new ScanRecord(0L, 0L, ChangeType.INSERT, row(1, "a",
100)),
+ new ScanRecord(1L, 0L, ChangeType.UPDATE_BEFORE,
row(1, "b", 200)),
+ new ScanRecord(2L, 0L, ChangeType.INSERT, row(2, "c",
300)),
+ new ScanRecord(3L, 0L, ChangeType.DELETE, row(3, "d",
400))));
+
+ // Bucket1: DELETE(key=10), UPDATE_AFTER(key=11, skip), INSERT(key=12)
+ mockScanner.setRecordsForBucket(
+ bucket1,
+ Arrays.asList(
+ new ScanRecord(0L, 0L, ChangeType.DELETE, row(10, "e",
500)),
+ new ScanRecord(1L, 0L, ChangeType.UPDATE_AFTER,
row(11, "f", 600)),
+ new ScanRecord(2L, 0L, ChangeType.INSERT, row(12, "g",
700))));
+
+ executor.execute(Arrays.asList(ctx0, ctx1));
+
+ // Bucket0: 3 unique keys (key=1 deduplicated)
+ // - key=1: INSERT → delete
+ // - key=2: INSERT → delete
+ // - key=3: DELETE → upsert
+ assertThat(ctx0.getProcessedKeys()).hasSize(3);
+
+ // Bucket1: 2 unique keys (UPDATE_AFTER skipped)
+ // - key=10: DELETE → upsert
+ // - key=12: INSERT → delete
+ assertThat(ctx1.getProcessedKeys()).hasSize(2);
+
+ // Total: 3 deletes (key=1,2,12), 2 upserts (key=3,10)
+ assertThat(mockWriter.getDeleteCount()).isEqualTo(3);
+ assertThat(mockWriter.getUpsertCount()).isEqualTo(2);
+ assertThat(mockWriter.isFlushCalled()).isTrue();
+
+ assertThat(ctx0.isComplete()).isTrue();
+ assertThat(ctx1.isComplete()).isTrue();
+ }
+
+ /**
+ * Test that recovery is skipped when checkpoint offset >= target offset.
+ *
+ * <p>Validates: No unnecessary work when no recovery needed.
+ */
+ @Test
+ void testNoRecoveryNeededSkipsExecution() throws Exception {
+ TableBucket bucket = new TableBucket(TABLE_ID, 0);
+
+ // Checkpoint offset (5) >= log end offset (5) → no recovery needed
+ BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 5L, 5L);
+
+ executor.execute(Collections.singletonList(ctx));
+
+ assertThat(mockWriter.getDeleteCount()).isEqualTo(0);
+ assertThat(mockWriter.getUpsertCount()).isEqualTo(0);
+ assertThat(mockWriter.isFlushCalled()).isFalse();
+ }
+
+ /**
+ * Test exception propagation from writer failures.
+ *
+ * <p>Validates: Requirements 7.1, 7.2 - Exception propagation.
+ */
+ @Test
+ void testExceptionPropagationFromWriter() {
+ TableBucket bucket = new TableBucket(TABLE_ID, 0);
+
+ BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 0L, 2L);
+
+ mockScanner.setRecordsForBucket(
+ bucket,
+ Arrays.asList(
+ new ScanRecord(0L, 0L, ChangeType.INSERT, row(1, "a",
100)),
+ new ScanRecord(1L, 0L, ChangeType.INSERT, row(2, "b",
200))));
+
+ mockWriter.setShouldFail(true);
+
+ assertThatThrownBy(() ->
executor.execute(Collections.singletonList(ctx)))
+ .hasCauseInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Simulated write failure");
+ }
+
+ /**
+ * Test that exception is thrown after max total wait time.
+ *
+ * <p>Validates: Undo recovery timeout triggers a retryable exception.
Note: This test uses a
+ * mock scanner that always returns empty, so it will hit the timeout
quickly in test
+ * environment. In production, the timeout is 1 hour.
+ */
+ @Test
+ void testFatalExceptionOnMaxEmptyPolls() {
+ TableBucket bucket = new TableBucket(TABLE_ID, 0);
+
+ BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 0L, 2L);
+
+ // Configure scanner to always return empty (simulating network issues
or server problems)
+ mockScanner.setAlwaysReturnEmpty(true);
+
+ // The test will timeout based on total wait time, but since we're
using a mock
+ // scanner with no actual delay, it will fail quickly
+ assertThatThrownBy(() ->
executor.execute(Collections.singletonList(ctx)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Undo recovery timed out")
+ .hasMessageContaining("minutes of waiting")
+ .hasMessageContaining("still incomplete")
+ .hasMessageContaining("restart and retry");
+ }
+
+ /**
+ * Test multi-poll scenario where records are returned in batches.
+ *
+ * <p>This tests realistic LogScanner behavior where records are returned
incrementally across
+ * multiple poll() calls, ensuring the executor correctly handles partial
results.
+ */
+ @Test
+ void testMultiPollBatchProcessing() throws Exception {
+ TableBucket bucket = new TableBucket(TABLE_ID, 0);
+
+ BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 0L, 6L);
+
+ // Configure 6 records but return only 2 per poll
+ mockScanner.setRecordsForBucket(
+ bucket,
+ Arrays.asList(
+ new ScanRecord(0L, 0L, ChangeType.INSERT, row(1, "a",
100)),
+ new ScanRecord(1L, 0L, ChangeType.INSERT, row(2, "b",
200)),
+ new ScanRecord(2L, 0L, ChangeType.DELETE, row(3, "c",
300)),
+ new ScanRecord(3L, 0L, ChangeType.UPDATE_BEFORE,
row(4, "d", 400)),
+ new ScanRecord(4L, 0L, ChangeType.UPDATE_AFTER, row(4,
"e", 500)),
+ new ScanRecord(5L, 0L, ChangeType.INSERT, row(5, "f",
600))));
+ mockScanner.setBatchSize(2);
+
+ executor.execute(Collections.singletonList(ctx));
+
+ // Should process all 6 records across 3 polls
+ // key=1: INSERT → delete
+ // key=2: INSERT → delete
+ // key=3: DELETE → upsert
+ // key=4: UPDATE_BEFORE → upsert (UPDATE_AFTER skipped)
+ // key=5: INSERT → delete
+ assertThat(ctx.getProcessedKeys()).hasSize(5);
+ assertThat(ctx.getTotalRecordsProcessed()).isEqualTo(6);
+ assertThat(mockWriter.getDeleteCount()).isEqualTo(3); // keys 1, 2, 5
+ assertThat(mockWriter.getUpsertCount()).isEqualTo(2); // keys 3, 4
+ assertThat(ctx.isComplete()).isTrue();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
new file mode 100644
index 000000000..9e7762c30
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
@@ -0,0 +1,999 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.ListOffsetsResult;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.UpsertResult;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.sink.writer.undo.UndoRecoveryManager.UndoOffsets;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.MergeEngineType;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.PartitionSpec;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration tests for {@link UndoRecoveryManager}.
+ *
+ * <p>Tests use complex multi-bucket, multi-key scenarios to ensure only
correct implementations
+ * pass. Each test covers multiple aspects to maximize value while minimizing
redundancy.
+ */
+public class UndoRecoveryManagerITCase {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setClusterConf(
+ new Configuration()
+ // not to clean snapshots for test purpose
+ .set(
+
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
+ Integer.MAX_VALUE))
+ .setNumOfTabletServers(3)
+ .build();
+
+ private static final String DEFAULT_DB = "test-flink-db";
+
+ protected static Connection conn;
+ protected static Admin admin;
+
+ @BeforeAll
+ static void beforeAll() {
+ Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+ conn = ConnectionFactory.createConnection(clientConf);
+ admin = conn.getAdmin();
+ }
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ admin.createDatabase(DEFAULT_DB, DatabaseDescriptor.EMPTY, true).get();
+ }
+
+ @AfterAll
+ static void afterAll() throws Exception {
+ if (admin != null) {
+ admin.close();
+ admin = null;
+ }
+ if (conn != null) {
+ conn.close();
+ conn = null;
+ }
+ }
+
+ protected long createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
+ throws Exception {
+ admin.createTable(tablePath, tableDescriptor, true).get();
+ return admin.getTableInfo(tablePath).get().getTableId();
+ }
+
+ private static final int NUM_BUCKETS = 4;
+
+ private static final Schema TEST_SCHEMA =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("price", DataTypes.INT())
+ .column("stock", DataTypes.INT())
+ .primaryKey("id")
+ .build();
+
+ private static final Schema PARTITIONED_SCHEMA =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("price", DataTypes.INT())
+ .column("pt", DataTypes.STRING())
+ .primaryKey("id", "pt")
+ .build();
+
+ // ==================== Full Update Tests ====================
+
+ /**
+ * Comprehensive test for Full Update mode with 4 buckets and 100+ keys.
+ *
+ * <p>Covers: zero-offset recovery, DELETE undo, UPDATE undo with
deduplication, mixed ops.
+ */
+ @Test
+ void testFullUpdateMultiBucketComprehensiveRecovery() throws Exception {
+ TablePath tablePath = createTestTablePath("full_update_test");
+ long tableId =
+ createTable(
+ tablePath,
+ TableDescriptor.builder()
+ .schema(TEST_SCHEMA)
+ .distributedBy(NUM_BUCKETS, "id")
+ .property(
+ ConfigOptions.TABLE_MERGE_ENGINE,
+ MergeEngineType.AGGREGATION)
+ .build());
+
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter writer = table.newUpsert().createWriter();
+ Lookuper lookuper = table.newLookup().createLookuper();
+ BucketTracker tracker = new BucketTracker(NUM_BUCKETS, tableId);
+
+ // Phase 1: Write initial 100 keys (batch async)
+ List<CompletableFuture<UpsertResult>> writeFutures = new
ArrayList<>();
+ for (int id = 0; id < 100; id++) {
+ writeFutures.add(writer.upsert(row(id, "pre_" + id, id * 10,
id)));
+ }
+ for (int i = 0; i < writeFutures.size(); i++) {
+ tracker.recordWrite(i, writeFutures.get(i).get());
+ }
+ writer.flush();
+
+ int zeroOffsetBucket = tracker.selectBucketForZeroOffsetRecovery();
+ Map<TableBucket, Long> checkpointOffsets =
+
tracker.buildCheckpointOffsetsWithZeroOffset(zeroOffsetBucket);
+
+ Map<Integer, Set<Integer>> keysAfterCheckpoint = new HashMap<>();
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ keysAfterCheckpoint.put(i, new HashSet<>());
+ }
+
+ // Phase 2: Operations after checkpoint (batch async)
+ List<CompletableFuture<UpsertResult>> newKeyFutures = new
ArrayList<>();
+ for (int id = 200; id < 300; id++) {
+ newKeyFutures.add(writer.upsert(row(id, "new_" + id, id * 10,
id)));
+ }
+ writer.flush();
+ for (int i = 0; i < newKeyFutures.size(); i++) {
+ UpsertResult result = newKeyFutures.get(i).get();
+
keysAfterCheckpoint.get(result.getBucket().getBucket()).add(200 + i);
+ }
+
+ for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+ if (bucket != zeroOffsetBucket &&
!tracker.keysByBucket.get(bucket).isEmpty()) {
+ List<Integer> keys = tracker.keysByBucket.get(bucket);
+ for (int i = 0; i < Math.min(3, keys.size()); i++) {
+ int id = keys.get(i);
+ writer.upsert(row(id, "updated_" + id, 9999, 9999));
+ keysAfterCheckpoint.get(bucket).add(id);
+ }
+ if (keys.size() > 5) {
+ for (int i = 3; i < Math.min(5, keys.size()); i++) {
+ int id = keys.get(i);
+ writer.delete(row(id, "pre_" + id, id * 10, id));
+ keysAfterCheckpoint.get(bucket).add(id);
+ }
+ }
+ if (keys.size() > 6) {
+ int id = keys.get(5);
+ writer.upsert(row(id, "multi_v1", 111, 111));
+ writer.upsert(row(id, "multi_v2", 222, 222));
+ writer.upsert(row(id, "multi_v3", 333, 333));
+ keysAfterCheckpoint.get(bucket).add(id);
+ }
+ }
+ }
+ writer.flush();
+
+ // Phase 3: Perform undo recovery
+ Map<TableBucket, UndoOffsets> offsetRanges =
+ buildUndoOffsets(checkpointOffsets, tablePath, admin);
+ new UndoRecoveryManager(table,
null).performUndoRecovery(offsetRanges, 0, 1);
+
+ // Phase 4: Verify results (parallel lookup)
+ List<CompletableFuture<Void>> verifyFutures = new ArrayList<>();
+
+ for (int id : tracker.keysByBucket.get(zeroOffsetBucket)) {
+ final int keyId = id;
+ verifyFutures.add(
+ lookuper.lookup(row(keyId))
+ .thenAccept(
+ r ->
+ assertThat(r.getSingletonRow())
+ .as(
+ "Zero-offset
bucket key %d should be deleted",
+ keyId)
+ .isNull()));
+ }
+
+ for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+ if (bucket == zeroOffsetBucket) {
+ continue;
+ }
+ for (int id : tracker.keysByBucket.get(bucket)) {
+ final int keyId = id;
+ final int b = bucket;
+ verifyFutures.add(
+ lookuper.lookup(row(keyId))
+ .thenAccept(
+ r -> {
+ InternalRow result =
r.getSingletonRow();
+ assertThat(result)
+ .as(
+ "Bucket %d key
%d should exist",
+ b, keyId)
+ .isNotNull();
+ assertRowEquals(
+ result,
+ keyId,
+ "pre_" + keyId,
+ keyId * 10,
+ keyId);
+ }));
+ }
+ for (int id : keysAfterCheckpoint.get(bucket)) {
+ if (id >= 200) {
+ final int keyId = id;
+ final int b = bucket;
+ verifyFutures.add(
+ lookuper.lookup(row(keyId))
+ .thenAccept(
+ r ->
+
assertThat(r.getSingletonRow())
+ .as(
+
"Bucket %d new key %d should be deleted",
+ b,
keyId)
+ .isNull()));
+ }
+ }
+ }
+ CompletableFuture.allOf(verifyFutures.toArray(new
CompletableFuture[0])).get();
+ }
+ }
+
+ // ==================== Partial Update Tests ====================
+
+ /**
+ * Comprehensive test for Partial Update mode with 4 buckets.
+ *
+ * <p>Covers: target columns rollback, non-target columns preserved, other
writer's changes
+ * preserved.
+ */
+ @Test
+ void testPartialUpdateMultiBucketComprehensiveRecovery() throws Exception {
+ TablePath tablePath = createTestTablePath("partial_update_test");
+ long tableId =
+ createTable(
+ tablePath,
+ TableDescriptor.builder()
+ .schema(TEST_SCHEMA)
+ .distributedBy(NUM_BUCKETS, "id")
+ .property(
+ ConfigOptions.TABLE_MERGE_ENGINE,
+ MergeEngineType.AGGREGATION)
+ .build());
+
+ int[] targetColumns = new int[] {0, 2}; // id, price
+
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter fullWriter = table.newUpsert().createWriter();
+ Lookuper lookuper = table.newLookup().createLookuper();
+
+ // Discover bucket assignments efficiently (batch write + delete)
+ Map<Integer, Integer> keyToBucket = new HashMap<>();
+ List<CompletableFuture<UpsertResult>> discoverFutures = new
ArrayList<>();
+ for (int id = 0; id < 80; id++) {
+ discoverFutures.add(fullWriter.upsert(row(id, "temp", 0, 0)));
+ }
+ for (int i = 0; i < discoverFutures.size(); i++) {
+ keyToBucket.put(i,
discoverFutures.get(i).get().getBucket().getBucket());
+ }
+ fullWriter.flush();
+ for (int id = 0; id < 80; id++) {
+ fullWriter.delete(row(id, "temp", 0, 0));
+ }
+ fullWriter.flush();
+
+ Map<Integer, List<Integer>> initialKeysByBucket = new HashMap<>();
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ initialKeysByBucket.put(i, new ArrayList<>());
+ }
+ for (int id = 0; id < 80; id++) {
+ initialKeysByBucket.get(keyToBucket.get(id)).add(id);
+ }
+ int zeroOffsetBucket =
+ selectBucketForZeroOffsetRecovery(initialKeysByBucket,
NUM_BUCKETS);
+
+ // Phase 1: Write initial data (except zeroOffsetBucket)
+ Map<Integer, List<Integer>> keysByBucket = new HashMap<>();
+ Map<Integer, Long> bucketOffsets = new HashMap<>();
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ keysByBucket.put(i, new ArrayList<>());
+ }
+
+ List<CompletableFuture<UpsertResult>> initFutures = new
ArrayList<>();
+ List<Integer> initIds = new ArrayList<>();
+ for (int id = 0; id < 80; id++) {
+ int bucket = keyToBucket.get(id);
+ if (bucket != zeroOffsetBucket) {
+ initFutures.add(fullWriter.upsert(row(id, "name_" + id, id
* 10, id * 100)));
+ initIds.add(id);
+ }
+ }
+ for (int i = 0; i < initFutures.size(); i++) {
+ UpsertResult result = initFutures.get(i).get();
+ int id = initIds.get(i);
+ int bucket = result.getBucket().getBucket();
+ keysByBucket.get(bucket).add(id);
+ bucketOffsets.put(bucket, result.getLogEndOffset());
+ }
+ fullWriter.flush();
+
+ Map<TableBucket, Long> checkpointOffsets = new HashMap<>();
+ checkpointOffsets.put(new TableBucket(tableId, zeroOffsetBucket),
0L);
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ if (i != zeroOffsetBucket) {
+ checkpointOffsets.put(
+ new TableBucket(tableId, i),
bucketOffsets.getOrDefault(i, 0L));
+ }
+ }
+
+ Set<Integer> zeroOffsetBucketKeysAfterCheckpoint = new HashSet<>();
+ Map<Integer, Set<Integer>> newKeysAfterCheckpointByBucket = new
HashMap<>();
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ newKeysAfterCheckpointByBucket.put(i, new HashSet<>());
+ }
+
+ // Phase 2: Partial updates after checkpoint
+ UpsertWriter partialWriter =
+
table.newUpsert().partialUpdate(targetColumns).createWriter();
+
+ for (int id = 0; id < 80; id++) {
+ if (keyToBucket.get(id) == zeroOffsetBucket) {
+ partialWriter.upsert(row(id, null, 9999, null));
+ zeroOffsetBucketKeysAfterCheckpoint.add(id);
+ }
+ }
+
+ List<CompletableFuture<UpsertResult>> newPartialFutures = new
ArrayList<>();
+ for (int id = 100; id < 130; id++) {
+ newPartialFutures.add(partialWriter.upsert(row(id, null, id *
10, null)));
+ }
+ for (int i = 0; i < newPartialFutures.size(); i++) {
+ UpsertResult result = newPartialFutures.get(i).get();
+ int id = 100 + i;
+ int bucket = result.getBucket().getBucket();
+ if (bucket == zeroOffsetBucket) {
+ zeroOffsetBucketKeysAfterCheckpoint.add(id);
+ } else {
+ newKeysAfterCheckpointByBucket.get(bucket).add(id);
+ }
+ }
+
+ for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+ if (bucket != zeroOffsetBucket) {
+ List<Integer> keys = keysByBucket.get(bucket);
+ for (int i = 0; i < Math.min(5, keys.size()); i++) {
+ partialWriter.upsert(row(keys.get(i), null, 8888,
null));
+ }
+ }
+ }
+ partialWriter.flush();
+
+ // Other Writer updates stock (should NOT be undone)
+ UpsertWriter otherWriter =
+ table.newUpsert().partialUpdate(new int[] {0,
3}).createWriter();
+ for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+ if (bucket != zeroOffsetBucket) {
+ for (int id : keysByBucket.get(bucket)) {
+ otherWriter.upsert(row(id, null, null, 5555));
+ }
+ }
+ }
+ otherWriter.flush();
+
+ // Phase 3: Perform undo recovery
+ Map<TableBucket, UndoOffsets> offsetRanges =
+ buildUndoOffsets(checkpointOffsets, tablePath, admin);
+ new UndoRecoveryManager(table,
targetColumns).performUndoRecovery(offsetRanges, 0, 1);
+
+ // Phase 4: Verify results (parallel lookup)
+ List<CompletableFuture<Void>> verifyFutures = new ArrayList<>();
+
+ for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+ if (bucket == zeroOffsetBucket) {
+ continue;
+ }
+
+ for (int id : keysByBucket.get(bucket)) {
+ final int keyId = id;
+ final int b = bucket;
+ verifyFutures.add(
+ lookuper.lookup(row(keyId))
+ .thenAccept(
+ r -> {
+ InternalRow result =
r.getSingletonRow();
+ assertThat(result)
+ .as(
+ "Bucket %d key
%d should exist",
+ b, keyId)
+ .isNotNull();
+
assertThat(result.getString(1).toString())
+ .isEqualTo("name_" +
keyId);
+ assertThat(result.getInt(2))
+ .as("price restored")
+ .isEqualTo(keyId * 10);
+ assertThat(result.getInt(3))
+ .as("stock kept")
+ .isEqualTo(5555);
+ }));
+ }
+ }
+ CompletableFuture.allOf(verifyFutures.toArray(new
CompletableFuture[0])).get();
+ }
+ }
+
+ // ==================== Partitioned Table Tests ====================
+
+ /**
+ * Test undo recovery for partitioned table with multiple partitions.
+ *
+ * <p>Covers: multi-partition recovery, INSERT/UPDATE/DELETE undo across
partitions.
+ */
+ @Test
+ void testPartitionedTableRecovery() throws Exception {
+ TablePath tablePath = createTestTablePath("partitioned_test");
+ long tableId =
+ createTable(
+ tablePath,
+ TableDescriptor.builder()
+ .schema(PARTITIONED_SCHEMA)
+ .partitionedBy("pt")
+ .distributedBy(2, "id")
+ .property(
+ ConfigOptions.TABLE_MERGE_ENGINE,
+ MergeEngineType.AGGREGATION)
+ .build());
+
+ String partition1 = "p1", partition2 = "p2";
+ admin.createPartition(tablePath, newPartitionSpec("pt", partition1),
false).get();
+ admin.createPartition(tablePath, newPartitionSpec("pt", partition2),
false).get();
+
+ List<PartitionInfo> partitionInfos =
admin.listPartitionInfos(tablePath).get();
+ Map<String, Long> partitionNameToId = new HashMap<>();
+ for (PartitionInfo info : partitionInfos) {
+ partitionNameToId.put(info.getPartitionName(),
info.getPartitionId());
+ }
+ long partition1Id = partitionNameToId.get(partition1);
+ long partition2Id = partitionNameToId.get(partition2);
+
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter writer = table.newUpsert().createWriter();
+ Lookuper lookuper = table.newLookup().createLookuper();
+ Map<TableBucket, Long> checkpointOffsets = new HashMap<>();
+
+ // Phase 1: Write initial data (batch)
+ List<CompletableFuture<UpsertResult>> p1Futures = new
ArrayList<>();
+ List<CompletableFuture<UpsertResult>> p2Futures = new
ArrayList<>();
+ for (int id = 0; id < 10; id++) {
+ p1Futures.add(writer.upsert(row(id, "p1_name_" + id, id * 10,
partition1)));
+ p2Futures.add(writer.upsert(row(id, "p2_name_" + id, id * 20,
partition2)));
+ }
+ for (int i = 0; i < 10; i++) {
+ UpsertResult r1 = p1Futures.get(i).get();
+ checkpointOffsets.put(
+ new TableBucket(tableId, partition1Id,
r1.getBucket().getBucket()),
+ r1.getLogEndOffset());
+ UpsertResult r2 = p2Futures.get(i).get();
+ checkpointOffsets.put(
+ new TableBucket(tableId, partition2Id,
r2.getBucket().getBucket()),
+ r2.getLogEndOffset());
+ }
+ writer.flush();
+
+ // Phase 2: Operations after checkpoint
+ Set<Integer> newKeysInP1 = new HashSet<>(), newKeysInP2 = new
HashSet<>();
+ for (int id = 100; id < 105; id++) {
+ writer.upsert(row(id, "p1_new_" + id, id * 10, partition1));
+ newKeysInP1.add(id);
+ }
+ for (int id = 0; id < 3; id++) {
+ writer.upsert(row(id, "p1_updated_" + id, 9999, partition1));
+ }
+ for (int id = 5; id < 8; id++) {
+ writer.delete(row(id, "p2_name_" + id, id * 20, partition2));
+ }
+ for (int id = 200; id < 205; id++) {
+ writer.upsert(row(id, "p2_new_" + id, id * 20, partition2));
+ newKeysInP2.add(id);
+ }
+ writer.flush();
+
+ // Phase 3: Perform undo recovery
+ Map<TableBucket, UndoOffsets> offsetRanges =
+ buildUndoOffsets(checkpointOffsets, tablePath, admin);
+ new UndoRecoveryManager(table,
null).performUndoRecovery(offsetRanges, 0, 1);
+
+ // Phase 4: Verify results (parallel lookup)
+ List<CompletableFuture<Void>> verifyFutures = new ArrayList<>();
+
+ for (int id = 0; id < 10; id++) {
+ final int keyId = id;
+ verifyFutures.add(
+ lookuper.lookup(row(keyId, partition1))
+ .thenAccept(
+ r -> {
+ InternalRow result =
r.getSingletonRow();
+ assertThat(result)
+ .as("P1 key %d should
exist", keyId)
+ .isNotNull();
+
assertThat(result.getString(1).toString())
+ .isEqualTo("p1_name_" +
keyId);
+
assertThat(result.getInt(2)).isEqualTo(keyId * 10);
+ }));
+ verifyFutures.add(
+ lookuper.lookup(row(keyId, partition2))
+ .thenAccept(
+ r -> {
+ InternalRow result =
r.getSingletonRow();
+ assertThat(result)
+ .as("P2 key %d should
exist", keyId)
+ .isNotNull();
+
assertThat(result.getString(1).toString())
+ .isEqualTo("p2_name_" +
keyId);
+
assertThat(result.getInt(2)).isEqualTo(keyId * 20);
+ }));
+ }
+
+ CompletableFuture.allOf(verifyFutures.toArray(new
CompletableFuture[0])).get();
+ }
+ }
+
+ // ==================== Idempotency and Exception Tests
====================
+
+ /**
+ * Test that undo recovery is idempotent - multiple executions produce the
same result.
+ *
+ * <p>Covers: 4 buckets, 200 keys, mixed operations, 2 recovery rounds.
+ */
+ @Test
+ void testRecoveryIdempotency() throws Exception {
+ final int numBuckets = 4;
+ final int keysPerBucket = 50;
+
+ TablePath tablePath = createTestTablePath("idempotency_test");
+ long tableId =
+ createTable(
+ tablePath,
+ TableDescriptor.builder()
+ .schema(TEST_SCHEMA)
+ .distributedBy(numBuckets, "id")
+ .property(
+ ConfigOptions.TABLE_MERGE_ENGINE,
+ MergeEngineType.AGGREGATION)
+ .build());
+
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter writer = table.newUpsert().createWriter();
+ Lookuper lookuper = table.newLookup().createLookuper();
+ BucketTracker tracker = new BucketTracker(numBuckets, tableId);
+
+ int totalKeys = numBuckets * keysPerBucket;
+
+ // Batch write initial keys
+ List<CompletableFuture<UpsertResult>> initFutures = new
ArrayList<>();
+ for (int id = 0; id < totalKeys; id++) {
+ initFutures.add(writer.upsert(row(id, "original_" + id, id *
10, id * 100)));
+ }
+ for (int i = 0; i < initFutures.size(); i++) {
+ tracker.recordWrite(i, initFutures.get(i).get());
+ }
+ writer.flush();
+
+ Map<TableBucket, Long> checkpointOffsets =
tracker.buildCheckpointOffsetsAll();
+
+ // Batch write new keys
+ Set<Integer> newKeys = new HashSet<>();
+ for (int id = totalKeys; id < totalKeys + 50; id++) {
+ writer.upsert(row(id, "new_" + id, id * 10, id * 100));
+ newKeys.add(id);
+ }
+
+ for (int bucket = 0; bucket < numBuckets; bucket++) {
+ List<Integer> keys = tracker.keysByBucket.get(bucket);
+ for (int i = 0; i < Math.min(3, keys.size()); i++) {
+ writer.upsert(row(keys.get(i), "updated_" + keys.get(i),
9999, 9999));
+ }
+ if (keys.size() > 5) {
+ int id = keys.get(5);
+ writer.delete(row(id, "original_" + id, id * 10, id *
100));
+ }
+ }
+ writer.flush();
+
+ // Execute recovery 2 times - results should be identical
+ // For idempotency, we need to update checkpoint offsets after
first recovery
+ // because recovery writes undo operations which advance the log
end offset
+ Map<TableBucket, Long> currentCheckpointOffsets = new
HashMap<>(checkpointOffsets);
+
+ for (int round = 1; round <= 2; round++) {
+ Map<TableBucket, UndoOffsets> offsetRanges =
+ buildUndoOffsets(currentCheckpointOffsets, tablePath,
admin);
+ new UndoRecoveryManager(table,
null).performUndoRecovery(offsetRanges, 0, 1);
+
+ // After first recovery, update checkpoint offsets to current
log end offsets
+ // This simulates what would happen in a real scenario where
checkpoint is taken
+ // after recovery completes
+ if (round == 1) {
+ for (Map.Entry<TableBucket, UndoOffsets> entry :
offsetRanges.entrySet()) {
+ // Query the new log end offset after recovery
+ TableBucket tb = entry.getKey();
+ List<Integer> bucketIds = new ArrayList<>();
+ bucketIds.add(tb.getBucket());
+ ListOffsetsResult offsetsResult =
+ admin.listOffsets(
+ tablePath, bucketIds, new
OffsetSpec.LatestSpec());
+ Long newOffset =
offsetsResult.bucketResult(tb.getBucket()).get();
+ if (newOffset != null) {
+ currentCheckpointOffsets.put(tb, newOffset);
+ }
+ }
+ }
+
+ // Batch verify (parallel lookup)
+ List<CompletableFuture<Void>> verifyFutures = new
ArrayList<>();
+ final int r = round;
+
+ for (int id : newKeys) {
+ final int keyId = id;
+ verifyFutures.add(
+ lookuper.lookup(row(keyId))
+ .thenAccept(
+ result ->
+
assertThat(result.getSingletonRow())
+ .as(
+ "Round %d:
New key %d should be deleted",
+ r, keyId)
+ .isNull()));
+ }
+
+ for (int bucket = 0; bucket < numBuckets; bucket++) {
+ for (int id : tracker.keysByBucket.get(bucket)) {
+ final int keyId = id;
+ verifyFutures.add(
+ lookuper.lookup(row(keyId))
+ .thenAccept(
+ lookupResult -> {
+ InternalRow result =
+
lookupResult.getSingletonRow();
+ assertThat(result)
+ .as("Round %d: Key
%d exists", r, keyId)
+ .isNotNull();
+ assertRowEquals(
+ result,
+ keyId,
+ "original_" +
keyId,
+ keyId * 10,
+ keyId * 100);
+ }));
+ }
+ }
+ CompletableFuture.allOf(verifyFutures.toArray(new
CompletableFuture[0])).get();
+ }
+ }
+ }
+
+ /** Test that scanner exceptions are properly propagated. */
+ @Test
+ void testRecoveryWithScannerException() throws Exception {
+ TablePath tablePath = createTestTablePath("scanner_exception_test");
+ long tableId =
+ createTable(
+ tablePath,
+ TableDescriptor.builder()
+ .schema(TEST_SCHEMA)
+ .distributedBy(2, "id")
+ .property(
+ ConfigOptions.TABLE_MERGE_ENGINE,
+ MergeEngineType.AGGREGATION)
+ .build());
+
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter writer = table.newUpsert().createWriter();
+ Map<TableBucket, Long> checkpointOffsets = new HashMap<>();
+
+ List<CompletableFuture<UpsertResult>> futures = new ArrayList<>();
+ for (int id = 0; id < 10; id++) {
+ futures.add(writer.upsert(row(id, "name_" + id, id * 10, id)));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ UpsertResult result = futures.get(i).get();
+ checkpointOffsets.put(
+ new TableBucket(tableId,
result.getBucket().getBucket()),
+ result.getLogEndOffset());
+ }
+ writer.flush();
+
+ for (int id = 100; id < 110; id++) {
+ writer.upsert(row(id, "new_" + id, id * 10, id));
+ }
+ writer.flush();
+
+ final String errorMessage = "Simulated scanner failure for
testing";
+ FaultInjectingUndoRecoveryManager faultHandler =
+ new FaultInjectingUndoRecoveryManager(table, null);
+ faultHandler.setExceptionToThrow(new
RuntimeException(errorMessage));
+
+ Map<TableBucket, UndoOffsets> offsetRanges =
+ buildUndoOffsets(checkpointOffsets, tablePath, admin);
+ assertThatThrownBy(() ->
faultHandler.performUndoRecovery(offsetRanges, 0, 1))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(errorMessage);
+ }
+ }
+
+ // ==================== Helper Classes ====================
+
+ /** Tracks bucket assignments and offsets during test setup. */
+ private static class BucketTracker {
+ final int numBuckets;
+ final long tableId;
+ final Map<Integer, List<Integer>> keysByBucket = new HashMap<>();
+ final Map<Integer, Long> bucketOffsets = new HashMap<>();
+
+ BucketTracker(int numBuckets, long tableId) {
+ this.numBuckets = numBuckets;
+ this.tableId = tableId;
+ for (int i = 0; i < numBuckets; i++) {
+ keysByBucket.put(i, new ArrayList<>());
+ }
+ }
+
+ void recordWrite(int key, UpsertResult result) {
+ int bucketId = result.getBucket().getBucket();
+ keysByBucket.get(bucketId).add(key);
+ bucketOffsets.put(bucketId, result.getLogEndOffset());
+ }
+
+ int selectBucketForZeroOffsetRecovery() {
+ int maxBucket = 0, maxKeys = 0;
+ for (int i = 0; i < numBuckets; i++) {
+ if (keysByBucket.get(i).size() > maxKeys) {
+ maxKeys = keysByBucket.get(i).size();
+ maxBucket = i;
+ }
+ }
+ return maxBucket;
+ }
+
+ Map<TableBucket, Long> buildCheckpointOffsetsWithZeroOffset(int
zeroOffsetBucket) {
+ Map<TableBucket, Long> offsets = new HashMap<>();
+ offsets.put(new TableBucket(tableId, zeroOffsetBucket), 0L);
+ for (int i = 0; i < numBuckets; i++) {
+ if (i != zeroOffsetBucket) {
+ offsets.put(new TableBucket(tableId, i),
bucketOffsets.getOrDefault(i, 0L));
+ }
+ }
+ return offsets;
+ }
+
+ Map<TableBucket, Long> buildCheckpointOffsetsAll() {
+ Map<TableBucket, Long> offsets = new HashMap<>();
+ for (int i = 0; i < numBuckets; i++) {
+ offsets.put(new TableBucket(tableId, i),
bucketOffsets.getOrDefault(i, 0L));
+ }
+ return offsets;
+ }
+ }
+
+ private static class FaultInjectingUndoRecoveryManager extends
UndoRecoveryManager {
+ private RuntimeException exceptionToThrow;
+
+ FaultInjectingUndoRecoveryManager(Table table, int[]
targetColumnIndexes) {
+ super(table, targetColumnIndexes);
+ }
+
+ void setExceptionToThrow(RuntimeException exception) {
+ this.exceptionToThrow = exception;
+ }
+
+ @Override
+ protected LogScanner createLogScanner() {
+ return new FaultInjectingLogScanner(super.createLogScanner(),
exceptionToThrow);
+ }
+ }
+
+ private static class FaultInjectingLogScanner implements LogScanner {
+ private final LogScanner delegate;
+ private final RuntimeException exceptionToThrow;
+
+ FaultInjectingLogScanner(LogScanner delegate, RuntimeException
exceptionToThrow) {
+ this.delegate = delegate;
+ this.exceptionToThrow = exceptionToThrow;
+ }
+
+ @Override
+ public ScanRecords poll(Duration timeout) {
+ if (exceptionToThrow != null) {
+ throw exceptionToThrow;
+ }
+ return delegate.poll(timeout);
+ }
+
+ @Override
+ public void subscribe(int bucket, long offset) {
+ delegate.subscribe(bucket, offset);
+ }
+
+ @Override
+ public void subscribe(long partitionId, int bucket, long offset) {
+ delegate.subscribe(partitionId, bucket, offset);
+ }
+
+ @Override
+ public void unsubscribe(long partitionId, int bucket) {
+ delegate.unsubscribe(partitionId, bucket);
+ }
+
+ @Override
+ public void unsubscribe(int bucket) {
+ delegate.unsubscribe(bucket);
+ }
+
+ @Override
+ public void wakeup() {
+ delegate.wakeup();
+ }
+
+ @Override
+ public void close() throws Exception {
+ delegate.close();
+ }
+ }
+
+ // ==================== Helper Methods ====================
+
+ private TablePath createTestTablePath(String prefix) {
+ return TablePath.of(DEFAULT_DB, prefix + "_" +
System.currentTimeMillis());
+ }
+
+ private PartitionSpec newPartitionSpec(String key, String value) {
+ return new PartitionSpec(Collections.singletonMap(key, value));
+ }
+
+ private int selectBucketForZeroOffsetRecovery(
+ Map<Integer, List<Integer>> keysByBucket, int numBuckets) {
+ int maxBucket = 0, maxKeys = 0;
+ for (int i = 0; i < numBuckets; i++) {
+ if (keysByBucket.get(i).size() > maxKeys) {
+ maxKeys = keysByBucket.get(i).size();
+ maxBucket = i;
+ }
+ }
+ return maxBucket;
+ }
+
+ private void assertRowEquals(
+ InternalRow row,
+ int expectedId,
+ String expectedName,
+ int expectedPrice,
+ int expectedStock) {
+ assertThat(row.getInt(0)).as("id").isEqualTo(expectedId);
+
assertThat(row.getString(1).toString()).as("name").isEqualTo(expectedName);
+ assertThat(row.getInt(2)).as("price").isEqualTo(expectedPrice);
+ assertThat(row.getInt(3)).as("stock").isEqualTo(expectedStock);
+ }
+
+ /**
+ * Builds offset ranges by querying the latest offset for each bucket.
+ *
+ * <p>This simulates what the upper layer would do: it already has
checkpoint offsets and needs
+ * to query latest offsets to determine which buckets need recovery.
+ */
+ private Map<TableBucket, UndoOffsets> buildUndoOffsets(
+ Map<TableBucket, Long> checkpointOffsets, TablePath tablePath,
Admin admin)
+ throws Exception {
+ Map<TableBucket, UndoOffsets> result = new HashMap<>();
+
+ // Group buckets by partition for efficient batch queries
+ Map<Long, List<TableBucket>> bucketsByPartition = new HashMap<>();
+ List<TableBucket> nonPartitionedBuckets = new ArrayList<>();
+
+ for (TableBucket bucket : checkpointOffsets.keySet()) {
+ if (bucket.getPartitionId() != null) {
+ bucketsByPartition
+ .computeIfAbsent(bucket.getPartitionId(), k -> new
ArrayList<>())
+ .add(bucket);
+ } else {
+ nonPartitionedBuckets.add(bucket);
+ }
+ }
+
+ // Query non-partitioned buckets
+ if (!nonPartitionedBuckets.isEmpty()) {
+ List<Integer> bucketIds = new ArrayList<>();
+ for (TableBucket tb : nonPartitionedBuckets) {
+ bucketIds.add(tb.getBucket());
+ }
+ ListOffsetsResult offsetsResult =
+ admin.listOffsets(tablePath, bucketIds, new
OffsetSpec.LatestSpec());
+ for (TableBucket tb : nonPartitionedBuckets) {
+ Long latestOffset =
offsetsResult.bucketResult(tb.getBucket()).get();
+ if (latestOffset == null) {
+ latestOffset = 0L;
+ }
+ result.put(tb, new UndoOffsets(checkpointOffsets.get(tb),
latestOffset));
+ }
+ }
+
+ // Query partitioned buckets
+ if (!bucketsByPartition.isEmpty()) {
+ // Get partition name mapping
+ List<PartitionInfo> partitions =
admin.listPartitionInfos(tablePath).get();
+ Map<Long, String> partitionIdToName = new HashMap<>();
+ for (PartitionInfo info : partitions) {
+ partitionIdToName.put(info.getPartitionId(),
info.getPartitionName());
+ }
+
+ for (Map.Entry<Long, List<TableBucket>> entry :
bucketsByPartition.entrySet()) {
+ Long partitionId = entry.getKey();
+ List<TableBucket> buckets = entry.getValue();
+ String partitionName = partitionIdToName.get(partitionId);
+
+ List<Integer> bucketIds = new ArrayList<>();
+ for (TableBucket tb : buckets) {
+ bucketIds.add(tb.getBucket());
+ }
+
+ ListOffsetsResult offsetsResult =
+ admin.listOffsets(
+ tablePath, partitionName, bucketIds, new
OffsetSpec.LatestSpec());
+ for (TableBucket tb : buckets) {
+ Long latestOffset =
offsetsResult.bucketResult(tb.getBucket()).get();
+ if (latestOffset == null) {
+ latestOffset = 0L;
+ }
+ result.put(tb, new UndoOffsets(checkpointOffsets.get(tb),
latestOffset));
+ }
+ }
+ }
+
+ return result;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestLogScanner.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestLogScanner.java
new file mode 100644
index 000000000..44e9a6239
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestLogScanner.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.metadata.TableBucket;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test implementation of {@link LogScanner} for testing.
+ *
+ * <p>Allows pre-configuring records per bucket and returns them on poll().
Supports:
+ *
+ * <ul>
+ * <li>Simulating empty polls for testing exponential backoff behavior
+ * <li>Batch size control for realistic multi-poll scenarios
+ * <li>Always-empty mode for testing fatal exception on max empty polls
+ * </ul>
+ */
+public class TestLogScanner implements LogScanner {
+ private final Map<TableBucket, List<ScanRecord>> recordsByBucket = new
HashMap<>();
+ private final Map<TableBucket, AtomicInteger> pollIndexByBucket = new
HashMap<>();
+
+ /** Number of empty polls to return before returning actual records. */
+ private int emptyPollsBeforeData = 0;
+
+ private int currentEmptyPollCount = 0;
+
+ /** If true, always return empty records (for testing fatal exception on
max empty polls). */
+ private boolean alwaysReturnEmpty = false;
+
+ /** Maximum number of records to return per bucket per poll (0 =
unlimited). */
+ private int batchSize = 0;
+
+ public void setRecordsForBucket(TableBucket bucket, List<ScanRecord>
records) {
+ recordsByBucket.put(bucket, new ArrayList<>(records));
+ pollIndexByBucket.put(bucket, new AtomicInteger(0));
+ }
+
+ /**
+ * Sets the number of empty polls to return before returning actual
records.
+ *
+ * @param count number of empty polls
+ */
+ public void setEmptyPollsBeforeData(int count) {
+ this.emptyPollsBeforeData = count;
+ this.currentEmptyPollCount = 0;
+ }
+
+ /**
+ * Sets whether to always return empty records (for testing fatal
exception).
+ *
+ * @param alwaysEmpty if true, poll() always returns empty
+ */
+ public void setAlwaysReturnEmpty(boolean alwaysEmpty) {
+ this.alwaysReturnEmpty = alwaysEmpty;
+ }
+
+ /**
+ * Sets the maximum number of records to return per bucket per poll.
+ *
+ * <p>This simulates realistic behavior where LogScanner returns records
in batches. Use this to
+ * test multi-poll scenarios and ensure the executor handles partial
results correctly.
+ *
+ * @param batchSize max records per bucket per poll (0 = unlimited,
returns all remaining)
+ */
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public ScanRecords poll(Duration timeout) {
+ // If configured to always return empty, do so
+ if (alwaysReturnEmpty) {
+ return ScanRecords.EMPTY;
+ }
+
+ // Return empty polls if configured
+ if (currentEmptyPollCount < emptyPollsBeforeData) {
+ currentEmptyPollCount++;
+ return ScanRecords.EMPTY;
+ }
+
+ Map<TableBucket, List<ScanRecord>> result = new HashMap<>();
+
+ for (Map.Entry<TableBucket, List<ScanRecord>> entry :
recordsByBucket.entrySet()) {
+ TableBucket bucket = entry.getKey();
+ List<ScanRecord> allRecords = entry.getValue();
+ AtomicInteger index = pollIndexByBucket.get(bucket);
+
+ if (index.get() < allRecords.size()) {
+ int startIndex = index.get();
+ int endIndex;
+ if (batchSize > 0) {
+ // Return at most batchSize records
+ endIndex = Math.min(startIndex + batchSize,
allRecords.size());
+ } else {
+ // Return all remaining records
+ endIndex = allRecords.size();
+ }
+ List<ScanRecord> batch = allRecords.subList(startIndex,
endIndex);
+ result.put(bucket, new ArrayList<>(batch));
+ index.set(endIndex);
+ }
+ }
+
+ return result.isEmpty() ? ScanRecords.EMPTY : new ScanRecords(result);
+ }
+
+ @Override
+ public void subscribe(int bucket, long offset) {}
+
+ @Override
+ public void subscribe(long partitionId, int bucket, long offset) {}
+
+ @Override
+ public void unsubscribe(long partitionId, int bucket) {}
+
+ @Override
+ public void unsubscribe(int bucket) {}
+
+ @Override
+ public void wakeup() {}
+
+ @Override
+ public void close() {}
+
+ public void reset() {
+ recordsByBucket.clear();
+ pollIndexByBucket.clear();
+ emptyPollsBeforeData = 0;
+ currentEmptyPollCount = 0;
+ alwaysReturnEmpty = false;
+ batchSize = 0;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestUpsertWriter.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestUpsertWriter.java
new file mode 100644
index 000000000..fceaa027f
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestUpsertWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.client.table.writer.DeleteResult;
+import org.apache.fluss.client.table.writer.UpsertResult;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.row.InternalRow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Test implementation of {@link UpsertWriter} for testing.
+ *
+ * <p>Tracks all upsert/delete operations and supports failure injection.
+ */
+public class TestUpsertWriter implements UpsertWriter {
+ private int upsertCount = 0;
+ private int deleteCount = 0;
+ private boolean flushCalled = false;
+ private boolean shouldFail = false;
+ private InternalRow lastUpsertedRow;
+ private InternalRow lastDeletedRow;
+ private final List<InternalRow> allUpsertedRows = new ArrayList<>();
+ private final List<InternalRow> allDeletedRows = new ArrayList<>();
+
+ @Override
+ public CompletableFuture<UpsertResult> upsert(InternalRow record) {
+ if (shouldFail) {
+ CompletableFuture<UpsertResult> future = new CompletableFuture<>();
+ future.completeExceptionally(new RuntimeException("Simulated write
failure"));
+ return future;
+ }
+ upsertCount++;
+ lastUpsertedRow = record;
+ allUpsertedRows.add(record);
+ return CompletableFuture.completedFuture(new UpsertResult(new
TableBucket(1L, 0), 0L));
+ }
+
+ @Override
+ public CompletableFuture<DeleteResult> delete(InternalRow record) {
+ if (shouldFail) {
+ CompletableFuture<DeleteResult> future = new CompletableFuture<>();
+ future.completeExceptionally(new RuntimeException("Simulated write
failure"));
+ return future;
+ }
+ deleteCount++;
+ lastDeletedRow = record;
+ allDeletedRows.add(record);
+ return CompletableFuture.completedFuture(new DeleteResult(new
TableBucket(1L, 0), 0L));
+ }
+
+ @Override
+ public void flush() {
+ flushCalled = true;
+ }
+
+ public int getUpsertCount() {
+ return upsertCount;
+ }
+
+ public int getDeleteCount() {
+ return deleteCount;
+ }
+
+ public boolean isFlushCalled() {
+ return flushCalled;
+ }
+
+ public InternalRow getLastUpsertedRow() {
+ return lastUpsertedRow;
+ }
+
+ public InternalRow getLastDeletedRow() {
+ return lastDeletedRow;
+ }
+
+ public List<InternalRow> getAllUpsertedRows() {
+ return allUpsertedRows;
+ }
+
+ public List<InternalRow> getAllDeletedRows() {
+ return allDeletedRows;
+ }
+
+ public void setShouldFail(boolean shouldFail) {
+ this.shouldFail = shouldFail;
+ }
+
+ public void reset() {
+ upsertCount = 0;
+ deleteCount = 0;
+ flushCalled = false;
+ shouldFail = false;
+ lastUpsertedRow = null;
+ lastDeletedRow = null;
+ allUpsertedRows.clear();
+ allDeletedRows.clear();
+ }
+}