This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 2df2d6b1a [kv] Add undo recovery support for aggregation tables (#2545)
2df2d6b1a is described below

commit 2df2d6b1a10e5f95d10bb149a6ca34d4c8a7f2f0
Author: Yang Wang <[email protected]>
AuthorDate: Mon Feb 9 20:05:44 2026 +0800

    [kv] Add undo recovery support for aggregation tables (#2545)
---
 .../fluss/client/table/scanner/log/LogScanner.java |  10 +
 .../client/table/scanner/log/LogScannerImpl.java   |  17 +
 .../org/apache/fluss/utils/ByteArrayWrapper.java   |  65 ++
 .../apache/fluss/utils/ByteArrayWrapperTest.java   |  99 ++
 .../sink/writer/undo/BucketRecoveryContext.java    | 137 +++
 .../fluss/flink/sink/writer/undo/UndoComputer.java | 150 ++++
 .../sink/writer/undo/UndoRecoveryExecutor.java     | 287 ++++++
 .../sink/writer/undo/UndoRecoveryManager.java      | 248 +++++
 .../flink/sink/writer/undo/UndoComputerTest.java   | 199 ++++
 .../sink/writer/undo/UndoRecoveryExecutorTest.java | 234 +++++
 .../writer/undo/UndoRecoveryManagerITCase.java     | 999 +++++++++++++++++++++
 .../apache/fluss/flink/utils/TestLogScanner.java   | 159 ++++
 .../apache/fluss/flink/utils/TestUpsertWriter.java | 118 +++
 13 files changed, 2722 insertions(+)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
index 04357f598..d255e340a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java
@@ -106,6 +106,16 @@ public interface LogScanner extends AutoCloseable {
      */
     void unsubscribe(long partitionId, int bucket);
 
+    /**
+     * Unsubscribe from the given bucket of a non-partitioned table 
dynamically.
+     *
+     * <p>Please use {@link #unsubscribe(long, int)} to unsubscribe a 
partitioned table.
+     *
+     * @param bucket the table bucket to unsubscribe.
+     * @throws java.lang.IllegalStateException if the table is a partitioned 
table.
+     */
+    void unsubscribe(int bucket);
+
     /**
      * Subscribe to the given partitioned table bucket from beginning 
dynamically. If the table
      * bucket is already subscribed, the start offset will be updated.
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
index ebcae05c5..33181bd7a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
@@ -225,6 +225,23 @@ public class LogScannerImpl implements LogScanner {
         }
     }
 
+    @Override
+    public void unsubscribe(int bucket) {
+        if (isPartitionedTable) {
+            throw new IllegalStateException(
+                    "The table is a partitioned table, please use "
+                            + "\"unsubscribe(long partitionId, int bucket)\" 
to "
+                            + "unsubscribe a partitioned bucket instead.");
+        }
+        acquireAndEnsureOpen();
+        try {
+            TableBucket tableBucket = new TableBucket(tableId, bucket);
+            
this.logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket));
+        } finally {
+            release();
+        }
+    }
+
     @Override
     public void wakeup() {
         logFetcher.wakeup();
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/ByteArrayWrapper.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/ByteArrayWrapper.java
new file mode 100644
index 000000000..da547d36b
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/ByteArrayWrapper.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.utils;
+
+import java.util.Arrays;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * A wrapper for byte[] that provides proper equals() and hashCode() 
implementations for use as Map
+ * keys.
+ *
+ * <p>The hashCode is pre-computed at construction time for better performance 
when used in
+ * hash-based collections.
+ */
+public final class ByteArrayWrapper {
+
+    private final byte[] data;
+    private final int hashCode;
+
+    public ByteArrayWrapper(byte[] data) {
+        this.data = checkNotNull(data, "data cannot be null");
+        this.hashCode = Arrays.hashCode(data);
+    }
+
+    public byte[] getData() {
+        return data;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ByteArrayWrapper)) {
+            return false;
+        }
+        return Arrays.equals(data, ((ByteArrayWrapper) o).data);
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public String toString() {
+        return "ByteArrayWrapper{length=" + data.length + "}";
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/ByteArrayWrapperTest.java 
b/fluss-common/src/test/java/org/apache/fluss/utils/ByteArrayWrapperTest.java
new file mode 100644
index 000000000..f0f69d112
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/utils/ByteArrayWrapperTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ByteArrayWrapper}. */
+class ByteArrayWrapperTest {
+
+    @Test
+    void testEqualsAndHashCode() {
+        byte[] data1 = new byte[] {1, 2, 3};
+        byte[] data2 = new byte[] {1, 2, 3};
+        byte[] data3 = new byte[] {1, 2, 4};
+
+        ByteArrayWrapper wrapper1 = new ByteArrayWrapper(data1);
+        ByteArrayWrapper wrapper2 = new ByteArrayWrapper(data2);
+        ByteArrayWrapper wrapper3 = new ByteArrayWrapper(data3);
+
+        // Same content should be equal
+        assertThat(wrapper1).isEqualTo(wrapper2);
+        assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());
+
+        // Different content should not be equal
+        assertThat(wrapper1).isNotEqualTo(wrapper3);
+    }
+
+    @Test
+    void testNullDataThrowsException() {
+        assertThatThrownBy(() -> new ByteArrayWrapper(null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("data cannot be null");
+    }
+
+    @Test
+    void testAsMapKey() {
+        byte[] key1 = new byte[] {1, 2, 3};
+        byte[] key2 = new byte[] {1, 2, 3}; // Same content, different array
+        byte[] key3 = new byte[] {4, 5, 6};
+
+        Map<ByteArrayWrapper, String> map = new HashMap<>();
+        map.put(new ByteArrayWrapper(key1), "value1");
+
+        // Should find with same content
+        assertThat(map.get(new ByteArrayWrapper(key2))).isEqualTo("value1");
+
+        // Should not find with different content
+        assertThat(map.get(new ByteArrayWrapper(key3))).isNull();
+
+        // Should overwrite with same key
+        map.put(new ByteArrayWrapper(key2), "value2");
+        assertThat(map).hasSize(1);
+        assertThat(map.get(new ByteArrayWrapper(key1))).isEqualTo("value2");
+    }
+
+    @Test
+    void testGetData() {
+        byte[] data = new byte[] {1, 2, 3};
+        ByteArrayWrapper wrapper = new ByteArrayWrapper(data);
+
+        assertThat(wrapper.getData()).isSameAs(data);
+    }
+
+    @Test
+    void testEmptyArray() {
+        ByteArrayWrapper wrapper1 = new ByteArrayWrapper(new byte[0]);
+        ByteArrayWrapper wrapper2 = new ByteArrayWrapper(new byte[0]);
+
+        assertThat(wrapper1).isEqualTo(wrapper2);
+        assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());
+    }
+
+    @Test
+    void testToString() {
+        ByteArrayWrapper wrapper = new ByteArrayWrapper(new byte[] {1, 2, 3});
+        assertThat(wrapper.toString()).contains("length=3");
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java
new file mode 100644
index 000000000..47797d2c4
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.ByteArrayWrapper;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Encapsulates the recovery state for a single bucket.
+ *
+ * <p>This class tracks:
+ *
+ * <ul>
+ *   <li>The bucket being recovered
+ *   <li>The checkpoint offset (start point for reading changelog)
+ *   <li>The log end offset (end point for reading changelog)
+ *   <li>Processed primary keys for deduplication (streaming execution)
+ *   <li>Progress tracking during changelog scanning
+ * </ul>
+ */
+public class BucketRecoveryContext {
+
+    private final TableBucket bucket;
+    private final long checkpointOffset;
+    private final long logEndOffset;
+
+    private final Set<ByteArrayWrapper> processedKeys;
+    private long lastProcessedOffset;
+    private int totalRecordsProcessed;
+
+    public BucketRecoveryContext(TableBucket bucket, long checkpointOffset, 
long logEndOffset) {
+        this.bucket = bucket;
+        this.checkpointOffset = checkpointOffset;
+        this.logEndOffset = logEndOffset;
+        this.processedKeys = new HashSet<>();
+        this.lastProcessedOffset = checkpointOffset;
+        this.totalRecordsProcessed = 0;
+    }
+
+    public TableBucket getBucket() {
+        return bucket;
+    }
+
+    public long getCheckpointOffset() {
+        return checkpointOffset;
+    }
+
+    public long getLogEndOffset() {
+        return logEndOffset;
+    }
+
+    public Set<ByteArrayWrapper> getProcessedKeys() {
+        return processedKeys;
+    }
+
+    /**
+     * Checks if this bucket needs recovery.
+     *
+     * @return true if checkpoint offset is less than log end offset
+     */
+    public boolean needsRecovery() {
+        return checkpointOffset < logEndOffset;
+    }
+
+    /**
+     * Checks if changelog scanning is complete for this bucket.
+     *
+     * <p>Complete means either:
+     *
+     * <ul>
+     *   <li>No recovery is needed (checkpointOffset >= logEndOffset), or
+     *   <li>The last processed offset has reached or passed logEndOffset - 1 
(lastProcessedOffset
+     *       >= logEndOffset - 1)
+     * </ul>
+     *
+     * @return true if changelog scanning is complete
+     */
+    public boolean isComplete() {
+        // If no recovery is needed, we're already complete
+        if (!needsRecovery()) {
+            return true;
+        }
+        return lastProcessedOffset >= logEndOffset - 1;
+    }
+
+    /**
+     * Records that a changelog record has been processed.
+     *
+     * @param offset the offset of the processed record
+     */
+    public void recordProcessed(long offset) {
+        lastProcessedOffset = offset;
+        totalRecordsProcessed++;
+    }
+
+    public int getTotalRecordsProcessed() {
+        return totalRecordsProcessed;
+    }
+
+    public long getLastProcessedOffset() {
+        return lastProcessedOffset;
+    }
+
+    @Override
+    public String toString() {
+        return "BucketRecoveryContext{"
+                + "bucket="
+                + bucket
+                + ", checkpointOffset="
+                + checkpointOffset
+                + ", logEndOffset="
+                + logEndOffset
+                + ", processedKeys="
+                + processedKeys.size()
+                + ", complete="
+                + isComplete()
+                + '}';
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java
new file mode 100644
index 000000000..80d9baac8
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.utils.ByteArrayWrapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Computes and executes undo operations from changelog records using 
streaming execution.
+ *
+ * <p>This class uses a {@link KeyEncoder} to encode primary keys as byte 
arrays for efficient
+ * deduplication, and executes undo operations immediately via {@link 
UpsertWriter}. The undo logic
+ * is:
+ *
+ * <ul>
+ *   <li>{@code INSERT} → Delete the row (it didn't exist at checkpoint)
+ *   <li>{@code UPDATE_BEFORE} → Restore the old value (this was the state at 
checkpoint)
+ *   <li>{@code UPDATE_AFTER} → Ignored (UPDATE_BEFORE already handled the 
undo)
+ *   <li>{@code DELETE} → Re-insert the deleted row (it existed at checkpoint)
+ * </ul>
+ *
+ * <p>For each primary key, only the first change after checkpoint determines 
the undo action. The
+ * original row from {@link ScanRecord} is directly used without copying, as 
each ScanRecord
+ * contains an independent row instance.
+ */
+public class UndoComputer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(UndoComputer.class);
+
+    private final KeyEncoder keyEncoder;
+    private final UpsertWriter writer;
+
+    /**
+     * Creates an UndoComputer.
+     *
+     * @param keyEncoder the key encoder for primary key deduplication
+     * @param writer the writer for all undo operations
+     */
+    public UndoComputer(KeyEncoder keyEncoder, UpsertWriter writer) {
+        this.keyEncoder = keyEncoder;
+        this.writer = writer;
+    }
+
+    /**
+     * Processes a changelog record and executes the undo operation 
immediately.
+     *
+     * <p>Only the first change for each primary key triggers an undo 
operation.
+     *
+     * @param record the changelog record
+     * @param processedKeys set of already processed primary keys for 
deduplication
+     * @return CompletableFuture for the async write, or null if skipped
+     */
+    @Nullable
+    public CompletableFuture<?> processRecord(
+            ScanRecord record, Set<ByteArrayWrapper> processedKeys) {
+        // Skip UPDATE_AFTER before key encoding — UPDATE_BEFORE already 
handled the undo
+        if (record.getChangeType() == ChangeType.UPDATE_AFTER) {
+            return null;
+        }
+
+        byte[] encodedKey = keyEncoder.encodeKey(record.getRow());
+        ByteArrayWrapper keyWrapper = new ByteArrayWrapper(encodedKey);
+
+        // Skip if we already processed this key
+        if (processedKeys.contains(keyWrapper)) {
+            return null;
+        }
+
+        CompletableFuture<?> future = computeAndExecuteUndo(record);
+        if (future != null) {
+            processedKeys.add(keyWrapper);
+        }
+        return future;
+    }
+
+    /**
+     * Computes and executes the undo operation for a changelog record.
+     *
+     * <p>UPDATE_AFTER is ignored because UPDATE_BEFORE and UPDATE_AFTER 
always come in pairs, with
+     * UPDATE_BEFORE appearing first. The undo logic is determined by 
UPDATE_BEFORE which contains
+     * the old value needed for restoration.
+     *
+     * <p>The original row is directly used without copying because:
+     *
+     * <ul>
+     *   <li>Each ScanRecord contains an independent GenericRow instance 
(created in
+     *       CompletedFetch.toScanRecord)
+     *   <li>For DELETE operations, UpsertWriter.delete() only needs the 
primary key fields
+     *   <li>For UPSERT operations, the original row contains the exact data 
to restore
+     * </ul>
+     *
+     * @param record the changelog record
+     * @return CompletableFuture for the async write, or null if no action 
needed (e.g.,
+     *     UPDATE_AFTER)
+     */
+    @Nullable
+    private CompletableFuture<?> computeAndExecuteUndo(ScanRecord record) {
+        ChangeType changeType = record.getChangeType();
+        InternalRow row = record.getRow();
+
+        switch (changeType) {
+            case INSERT:
+                // Row was inserted after checkpoint → delete it
+                return writer.delete(row);
+
+            case UPDATE_BEFORE:
+                // Row was updated after checkpoint → restore old value
+                return writer.upsert(row);
+
+            case UPDATE_AFTER:
+                // Ignored: UPDATE_BEFORE already handled the undo logic for 
this key.
+                return null;
+
+            case DELETE:
+                // Row was deleted after checkpoint → re-insert it
+                return writer.upsert(row);
+
+            default:
+                LOG.warn("Unexpected change type for undo: {}", changeType);
+                return null;
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
new file mode 100644
index 000000000..a1f3f6637
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.ByteArrayWrapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Executes undo recovery for multiple buckets using streaming execution.
+ *
+ * <p>This executor manages:
+ *
+ * <ul>
+ *   <li>Single LogScanner with multiple bucket subscriptions
+ *   <li>Changelog reading with immediate undo operation execution
+ *   <li>Async write operations via UpsertWriter
+ * </ul>
+ *
+ * <p>The execution flow (streaming):
+ *
+ * <ol>
+ *   <li>Subscribe all buckets to the LogScanner
+ *   <li>Poll changelog records and execute undo operations immediately
+ *   <li>Collect CompletableFutures for completion tracking
+ *   <li>Flush and wait for all futures to ensure all writes complete
+ * </ol>
+ */
+public class UndoRecoveryExecutor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(UndoRecoveryExecutor.class);
+
+    /** Fixed poll timeout in milliseconds. */
+    private static final long POLL_TIMEOUT_MS = 10_000;
+
+    /** Default maximum total wait time in milliseconds before failing (1 
hour). */
+    private static final long DEFAULT_MAX_TOTAL_WAIT_TIME_MS = 60 * 60 * 1000; 
// 1 hour
+
+    /** Interval for logging progress during long waits (5 minutes). */
+    private static final long PROGRESS_LOG_INTERVAL_MS = 5 * 60 * 1000; // 5 
minutes
+
+    private final LogScanner scanner;
+    private final UpsertWriter writer;
+    private final UndoComputer undoComputer;
+
+    // Configurable timeout parameter
+    private final long maxTotalWaitTimeMs;
+
+    public UndoRecoveryExecutor(
+            LogScanner scanner, UpsertWriter writer, UndoComputer 
undoComputer) {
+        this(scanner, writer, undoComputer, DEFAULT_MAX_TOTAL_WAIT_TIME_MS);
+    }
+
+    /**
+     * Creates an executor with custom timeout configuration (for testing).
+     *
+     * @param scanner the log scanner
+     * @param writer the upsert writer
+     * @param undoComputer the undo computer
+     * @param maxTotalWaitTimeMs maximum total wait time before failing
+     */
+    public UndoRecoveryExecutor(
+            LogScanner scanner,
+            UpsertWriter writer,
+            UndoComputer undoComputer,
+            long maxTotalWaitTimeMs) {
+        this.scanner = scanner;
+        this.writer = writer;
+        this.undoComputer = undoComputer;
+        this.maxTotalWaitTimeMs = maxTotalWaitTimeMs;
+    }
+
+    /**
+     * Executes undo recovery for the given bucket contexts.
+     *
+     * @param contexts the bucket recovery contexts (must have target offsets 
set)
+     * @throws Exception if recovery fails
+     */
+    public void execute(List<BucketRecoveryContext> contexts) throws Exception 
{
+        // Filter contexts that need recovery
+        List<BucketRecoveryContext> toRecover = 
filterContextsNeedingRecovery(contexts);
+        if (toRecover.isEmpty()) {
+            LOG.debug("No buckets need recovery after filtering");
+            return;
+        }
+
+        LOG.debug("Executing undo recovery for {} bucket(s)", 
toRecover.size());
+
+        // Subscribe and read changelog with streaming execution
+        subscribeAll(toRecover);
+        List<CompletableFuture<?>> allFutures = 
readChangelogAndExecute(toRecover);
+
+        // Flush first, then wait for all async writes to complete
+        writer.flush();
+        if (!allFutures.isEmpty()) {
+            CompletableFuture.allOf(allFutures.toArray(new 
CompletableFuture[0])).get();
+        }
+
+        // Log summary at INFO level for visibility
+        int totalUndoOps = 0;
+        for (BucketRecoveryContext ctx : toRecover) {
+            totalUndoOps += ctx.getProcessedKeys().size();
+        }
+
+        LOG.info(
+                "Undo recovery execution completed: {} bucket(s), {} total 
undo operation(s)",
+                toRecover.size(),
+                totalUndoOps);
+    }
+
+    private List<BucketRecoveryContext> filterContextsNeedingRecovery(
+            List<BucketRecoveryContext> contexts) {
+        List<BucketRecoveryContext> result = new ArrayList<>();
+        for (BucketRecoveryContext ctx : contexts) {
+            if (ctx.needsRecovery()) {
+                LOG.debug(
+                        "Bucket {} needs recovery: checkpoint={}, 
logEndOffset={}",
+                        ctx.getBucket(),
+                        ctx.getCheckpointOffset(),
+                        ctx.getLogEndOffset());
+                result.add(ctx);
+            } else {
+                LOG.debug("Bucket {} already up-to-date, no recovery needed", 
ctx.getBucket());
+            }
+        }
+        return result;
+    }
+
+    private void subscribeAll(List<BucketRecoveryContext> contexts) {
+        for (BucketRecoveryContext ctx : contexts) {
+            TableBucket bucket = ctx.getBucket();
+            if (bucket.getPartitionId() != null) {
+                scanner.subscribe(
+                        bucket.getPartitionId(), bucket.getBucket(), 
ctx.getCheckpointOffset());
+            } else {
+                scanner.subscribe(bucket.getBucket(), 
ctx.getCheckpointOffset());
+            }
+        }
+    }
+
+    private List<CompletableFuture<?>> 
readChangelogAndExecute(List<BucketRecoveryContext> contexts)
+            throws Exception {
+        List<CompletableFuture<?>> allFutures = new ArrayList<>();
+
+        // Early exit if all contexts are already complete (no records to read)
+        if (allComplete(contexts)) {
+            LOG.debug("All buckets already complete, no changelog reading 
needed");
+            return allFutures;
+        }
+
+        long startTimeMs = System.currentTimeMillis();
+        long lastProgressLogTime = System.currentTimeMillis();
+        Set<TableBucket> unsubscribedBuckets = new HashSet<>();
+
+        while (!allComplete(contexts)) {
+            ScanRecords records = 
scanner.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+
+            if (records.isEmpty()) {
+                // Check if we've exceeded the maximum total wait time
+                long elapsedMs = System.currentTimeMillis() - startTimeMs;
+                if (elapsedMs >= maxTotalWaitTimeMs) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "Undo recovery timed out: unable to read 
all changelog records after "
+                                            + "%.1f minutes of waiting. %d 
bucket(s) still incomplete. "
+                                            + "The job will restart and retry 
the recovery.",
+                                    elapsedMs / 60000.0, 
countIncomplete(contexts)));
+                }
+
+                LOG.debug("Empty poll (total elapsed: {}ms)", elapsedMs);
+            } else {
+                // Process records for each bucket
+                for (BucketRecoveryContext ctx : contexts) {
+                    if (ctx.isComplete()) {
+                        continue;
+                    }
+                    List<ScanRecord> bucketRecords = 
records.records(ctx.getBucket());
+                    if (!bucketRecords.isEmpty()) {
+                        processRecords(ctx, bucketRecords, allFutures);
+                    }
+                    // Unsubscribe completed buckets to stop fetching data 
from them
+                    if (ctx.isComplete() && 
unsubscribedBuckets.add(ctx.getBucket())) {
+                        unsubscribeBucket(ctx.getBucket());
+                    }
+                }
+            }
+
+            // Log progress periodically (at the end of each loop iteration)
+            long now = System.currentTimeMillis();
+            if (now - lastProgressLogTime >= PROGRESS_LOG_INTERVAL_MS) {
+                long elapsedMs = now - startTimeMs;
+                LOG.info(
+                        "Undo recovery waiting for changelog records: {} 
bucket(s) incomplete, "
+                                + "waited {} minutes so far (max: {} minutes)",
+                        countIncomplete(contexts),
+                        String.format("%.1f", elapsedMs / 60000.0),
+                        String.format("%.1f", maxTotalWaitTimeMs / 60000.0));
+                lastProgressLogTime = now;
+            }
+        }
+
+        // Log summary
+        if (LOG.isDebugEnabled()) {
+            for (BucketRecoveryContext ctx : contexts) {
+                LOG.debug(
+                        "Bucket {} read {} records, executed {} undo 
operations",
+                        ctx.getBucket(),
+                        ctx.getTotalRecordsProcessed(),
+                        ctx.getProcessedKeys().size());
+            }
+        }
+
+        return allFutures;
+    }
+
+    private void unsubscribeBucket(TableBucket bucket) {
+        if (bucket.getPartitionId() != null) {
+            scanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
+        } else {
+            scanner.unsubscribe(bucket.getBucket());
+        }
+    }
+
+    private void processRecords(
+            BucketRecoveryContext ctx,
+            List<ScanRecord> records,
+            List<CompletableFuture<?>> futures) {
+        Set<ByteArrayWrapper> processedKeys = ctx.getProcessedKeys();
+        for (ScanRecord record : records) {
+            CompletableFuture<?> future = undoComputer.processRecord(record, 
processedKeys);
+            if (future != null) {
+                futures.add(future);
+            }
+            ctx.recordProcessed(record.logOffset());
+            if (ctx.isComplete()) {
+                break;
+            }
+        }
+    }
+
+    private boolean allComplete(List<BucketRecoveryContext> contexts) {
+        for (BucketRecoveryContext ctx : contexts) {
+            if (!ctx.isComplete()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private int countIncomplete(List<BucketRecoveryContext> contexts) {
+        int count = 0;
+        for (BucketRecoveryContext ctx : contexts) {
+            if (!ctx.isComplete()) {
+                count++;
+            }
+        }
+        return count;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManager.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManager.java
new file mode 100644
index 000000000..fa0adc10f
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManager.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.writer.Upsert;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.rpc.protocol.MergeMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages undo recovery operations during Flink sink writer initialization.
+ *
+ * <p>This manager ensures exactly-once semantics by reversing any writes that 
occurred after the
+ * last successful checkpoint but before a failure. The recovery process 
involves:
+ *
+ * <ol>
+ *   <li>Reading changelog records from checkpoint offset to current latest 
offset
+ *   <li>Computing inverse operations for each affected primary key
+ *   <li>Applying undo operations using OVERWRITE mode to restore bucket state
+ * </ol>
+ *
+ * <p><b>Undo Logic:</b> For each primary key, only the first change after 
checkpoint determines the
+ * undo action:
+ *
+ * <ul>
+ *   <li>{@code INSERT} → Delete the row (it didn't exist at checkpoint)
+ *   <li>{@code UPDATE_BEFORE} → Restore the old value (this was the state at 
checkpoint)
+ *   <li>{@code UPDATE_AFTER} → Ignored (UPDATE_BEFORE already handled the 
undo)
+ *   <li>{@code DELETE} → Re-insert the deleted row (it existed at checkpoint)
+ * </ul>
+ *
+ * <p>This class delegates to specialized components:
+ *
+ * <ul>
+ *   <li>{@link UndoComputer} - Undo operation computation using {@link 
KeyEncoder} for primary key
+ *       encoding
+ *   <li>{@link UndoRecoveryExecutor} - Changelog reading and write execution
+ * </ul>
+ *
+ * @see MergeMode#OVERWRITE
+ */
+public class UndoRecoveryManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(UndoRecoveryManager.class);
+
+    private final Table table;
+    @Nullable private final int[] targetColumnIndexes;
+
+    /**
+     * Creates a new UndoRecoveryManager.
+     *
+     * @param table the Fluss table to perform recovery on
+     * @param targetColumnIndexes optional target columns for partial update 
(null for full row)
+     */
+    public UndoRecoveryManager(Table table, @Nullable int[] 
targetColumnIndexes) {
+        this.table = table;
+        this.targetColumnIndexes = targetColumnIndexes;
+
+        logPartialUpdateConfig(targetColumnIndexes);
+    }
+
+    private void logPartialUpdateConfig(@Nullable int[] targetColumnIndexes) {
+        if (targetColumnIndexes != null) {
+            LOG.info(
+                    "Undo recovery configured with partial update columns: {}",
+                    Arrays.toString(targetColumnIndexes));
+        }
+    }
+
+    // ==================== Public API ====================
+
+    /**
+     * Performs undo recovery for buckets with known undo offsets.
+     *
+     * <p>The caller is responsible for providing both the checkpoint offset 
(start) and the log end
+     * offset (end) for each bucket. This avoids redundant listOffset calls 
since the caller
+     * typically already has this information from determining which buckets 
need recovery.
+     *
+     * @param bucketUndoOffsets map of bucket to its undo offsets 
(checkpointOffset, logEndOffset)
+     * @param subtaskIndex the Flink subtask index (for logging)
+     * @param parallelism the total parallelism (for logging)
+     * @throws Exception if recovery fails
+     */
+    public void performUndoRecovery(
+            Map<TableBucket, UndoOffsets> bucketUndoOffsets, int subtaskIndex, 
int parallelism)
+            throws Exception {
+
+        if (bucketUndoOffsets.isEmpty()) {
+            LOG.debug("No buckets to recover on subtask {}/{}", subtaskIndex, 
parallelism);
+            return;
+        }
+
+        LOG.info(
+                "Starting undo recovery for {} bucket(s) on subtask {}/{}",
+                bucketUndoOffsets.size(),
+                subtaskIndex,
+                parallelism);
+
+        List<BucketRecoveryContext> contexts = 
buildRecoveryContexts(bucketUndoOffsets);
+
+        try (LogScanner scanner = createLogScanner()) {
+            Upsert recoveryUpsert = 
table.newUpsert().mergeMode(MergeMode.OVERWRITE);
+            if (targetColumnIndexes != null) {
+                recoveryUpsert = 
recoveryUpsert.partialUpdate(targetColumnIndexes);
+            }
+            UpsertWriter writer = recoveryUpsert.createWriter();
+
+            // Create UndoComputer with writer for streaming execution
+            Schema schema = table.getTableInfo().getSchema();
+            // Use CompactedKeyEncoder directly instead of the deprecated 
KeyEncoder.of(),
+            // which requires a lake format parameter that is not applicable 
here.
+            KeyEncoder keyEncoder =
+                    CompactedKeyEncoder.createKeyEncoder(
+                            schema.getRowType(), 
schema.getPrimaryKey().get().getColumnNames());
+            UndoComputer undoComputer = new UndoComputer(keyEncoder, writer);
+
+            UndoRecoveryExecutor executor = new UndoRecoveryExecutor(scanner, 
writer, undoComputer);
+            // This is a blocking operation that reads changelog and executes 
undo operations.
+            // It may take a significant amount of time depending on the 
volume of records to
+            // process.
+            executor.execute(contexts);
+        }
+
+        // Calculate total undo operations for summary
+        int totalUndoOps = 0;
+        for (BucketRecoveryContext ctx : contexts) {
+            totalUndoOps += ctx.getProcessedKeys().size();
+        }
+
+        LOG.info(
+                "Completed undo recovery for {} bucket(s) with {} undo 
operation(s) on subtask {}/{}",
+                bucketUndoOffsets.size(),
+                totalUndoOps,
+                subtaskIndex,
+                parallelism);
+    }
+
+    /**
+     * Encapsulates the offset information needed for undo recovery of a 
bucket.
+     *
+     * <p>This class holds the checkpoint offset (recovery start point) and 
log end offset (recovery
+     * end point) for a bucket's undo recovery operation.
+     */
+    public static class UndoOffsets {
+        private final long checkpointOffset;
+        private final long logEndOffset;
+
+        /**
+         * Creates a new UndoOffsets.
+         *
+         * @param checkpointOffset the checkpoint offset (start point, 
inclusive)
+         * @param logEndOffset the log end offset (end point, exclusive)
+         * @throws IllegalArgumentException if offsets are negative or 
checkpointOffset >
+         *     logEndOffset
+         */
+        public UndoOffsets(long checkpointOffset, long logEndOffset) {
+            if (checkpointOffset < 0) {
+                throw new IllegalArgumentException(
+                        "checkpointOffset must be non-negative: " + 
checkpointOffset);
+            }
+            if (logEndOffset < 0) {
+                throw new IllegalArgumentException(
+                        "logEndOffset must be non-negative: " + logEndOffset);
+            }
+            if (checkpointOffset > logEndOffset) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "checkpointOffset (%d) must not be greater 
than logEndOffset (%d)",
+                                checkpointOffset, logEndOffset));
+            }
+            // Note: checkpointOffset == logEndOffset is allowed, it simply 
means no recovery needed
+            this.checkpointOffset = checkpointOffset;
+            this.logEndOffset = logEndOffset;
+        }
+
+        public long getCheckpointOffset() {
+            return checkpointOffset;
+        }
+
+        public long getLogEndOffset() {
+            return logEndOffset;
+        }
+    }
+
+    // ==================== Scanner Factory ====================
+
+    /**
+     * Creates a LogScanner for reading changelog records.
+     *
+     * <p>This method is protected to allow test subclasses to inject custom 
scanners for fault
+     * injection testing.
+     *
+     * @return a new LogScanner instance
+     */
+    protected LogScanner createLogScanner() {
+        return table.newScan().createLogScanner();
+    }
+
+    // ==================== Recovery Context Building ====================
+
+    private List<BucketRecoveryContext> buildRecoveryContexts(
+            Map<TableBucket, UndoOffsets> bucketUndoOffsets) {
+
+        List<BucketRecoveryContext> contexts = new ArrayList<>();
+
+        for (Map.Entry<TableBucket, UndoOffsets> entry : 
bucketUndoOffsets.entrySet()) {
+            TableBucket bucket = entry.getKey();
+            UndoOffsets offsets = entry.getValue();
+
+            BucketRecoveryContext ctx =
+                    new BucketRecoveryContext(
+                            bucket, offsets.getCheckpointOffset(), 
offsets.getLogEndOffset());
+            contexts.add(ctx);
+        }
+
+        return contexts;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoComputerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoComputerTest.java
new file mode 100644
index 000000000..d41bb84b4
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoComputerTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.flink.utils.TestUpsertWriter;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.ByteArrayWrapper;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link UndoComputer}.
+ *
+ * <p>Tests verify: (1) ChangeType to undo operation mapping, (2) primary key 
deduplication.
+ */
+class UndoComputerTest {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(DataTypes.INT(), DataTypes.STRING(), DataTypes.INT());
+    private static final List<String> PRIMARY_KEY_COLUMNS = 
Collections.singletonList("f0");
+
+    private KeyEncoder keyEncoder;
+    private TestUpsertWriter mockWriter;
+    private UndoComputer undoComputer;
+
+    @BeforeEach
+    void setUp() {
+        keyEncoder = KeyEncoder.of(ROW_TYPE, PRIMARY_KEY_COLUMNS, null);
+        mockWriter = new TestUpsertWriter();
+        undoComputer = new UndoComputer(keyEncoder, mockWriter);
+    }
+
+    // ==================== Undo Logic Mapping (Parameterized) 
====================
+
+    /**
+     * Parameterized test for ChangeType to undo operation mapping.
+     *
+     * <p>Validates: Requirements 4.3 - Correct undo logic mapping.
+     *
+     * <p>Mapping rules:
+     *
+     * <ul>
+     *   <li>INSERT → delete (undo insert by deleting)
+     *   <li>UPDATE_BEFORE → upsert (restore old value)
+     *   <li>UPDATE_AFTER → skip (no action needed)
+     *   <li>DELETE → upsert (restore deleted row)
+     * </ul>
+     */
+    @ParameterizedTest(name = "{0} → expectDelete={1}, expectUpsert={2}, 
expectNull={3}")
+    @MethodSource("changeTypeUndoMappingProvider")
+    void testChangeTypeToUndoMapping(
+            ChangeType changeType,
+            boolean expectDelete,
+            boolean expectUpsert,
+            boolean expectNullFuture) {
+        Set<ByteArrayWrapper> processedKeys = new HashSet<>();
+        GenericRow testRow = row(1, "test", 100);
+        ScanRecord record = new ScanRecord(0L, 0L, changeType, testRow);
+
+        CompletableFuture<?> future = undoComputer.processRecord(record, 
processedKeys);
+
+        if (expectNullFuture) {
+            assertThat(future).isNull();
+            assertThat(processedKeys).isEmpty();
+        } else {
+            assertThat(future).isNotNull();
+            assertThat(processedKeys).hasSize(1);
+        }
+        assertThat(mockWriter.getDeleteCount()).isEqualTo(expectDelete ? 1 : 
0);
+        assertThat(mockWriter.getUpsertCount()).isEqualTo(expectUpsert ? 1 : 
0);
+
+        if (expectDelete) {
+            assertThat(mockWriter.getLastDeletedRow()).isSameAs(testRow);
+        }
+        if (expectUpsert) {
+            assertThat(mockWriter.getLastUpsertedRow()).isSameAs(testRow);
+        }
+    }
+
+    static Stream<Arguments> changeTypeUndoMappingProvider() {
+        return Stream.of(
+                // ChangeType, expectDelete, expectUpsert, expectNullFuture
+                Arguments.of(ChangeType.INSERT, true, false, false),
+                Arguments.of(ChangeType.UPDATE_BEFORE, false, true, false),
+                Arguments.of(ChangeType.UPDATE_AFTER, false, false, true),
+                Arguments.of(ChangeType.DELETE, false, true, false));
+    }
+
+    // ==================== Primary Key Deduplication ====================
+
+    /**
+     * Test primary key deduplication: only first occurrence of each key is 
processed.
+     *
+     * <p>Validates: Requirements 2.2, 2.3 - Primary key deduplication and key 
tracking.
+     */
+    @Test
+    void testPrimaryKeyDeduplication() {
+        Set<ByteArrayWrapper> processedKeys = new HashSet<>();
+
+        // First record for key=1 (INSERT → delete)
+        GenericRow row1 = row(1, "first", 100);
+        CompletableFuture<?> f1 =
+                undoComputer.processRecord(
+                        new ScanRecord(0L, 0L, ChangeType.INSERT, row1), 
processedKeys);
+
+        // Second record for same key=1 (should be skipped)
+        GenericRow row2 = row(1, "second", 200);
+        CompletableFuture<?> f2 =
+                undoComputer.processRecord(
+                        new ScanRecord(1L, 0L, ChangeType.UPDATE_BEFORE, 
row2), processedKeys);
+
+        // Third record for different key=2 (INSERT → delete)
+        GenericRow row3 = row(2, "third", 300);
+        CompletableFuture<?> f3 =
+                undoComputer.processRecord(
+                        new ScanRecord(2L, 0L, ChangeType.INSERT, row3), 
processedKeys);
+
+        // Fourth record for key=2 (should be skipped)
+        GenericRow row4 = row(2, "fourth", 400);
+        CompletableFuture<?> f4 =
+                undoComputer.processRecord(
+                        new ScanRecord(3L, 0L, ChangeType.DELETE, row4), 
processedKeys);
+
+        // Verify: only first occurrence of each key processed
+        assertThat(f1).isNotNull();
+        assertThat(f2).as("Duplicate key=1 should be skipped").isNull();
+        assertThat(f3).isNotNull();
+        assertThat(f4).as("Duplicate key=2 should be skipped").isNull();
+
+        // Verify: 2 unique keys tracked
+        assertThat(processedKeys).hasSize(2);
+
+        // Verify: 2 deletes (one per unique key, both were INSERTs)
+        assertThat(mockWriter.getDeleteCount()).isEqualTo(2);
+        assertThat(mockWriter.getUpsertCount()).isEqualTo(0);
+
+        // Verify: correct rows were processed
+        assertThat(mockWriter.getAllDeletedRows()).containsExactly(row1, row3);
+    }
+
+    /**
+     * Test that UPDATE_AFTER records don't add keys to processed set.
+     *
+     * <p>This ensures subsequent records for the same key can still be 
processed.
+     */
+    @Test
+    void testUpdateAfterDoesNotBlockSubsequentRecords() {
+        Set<ByteArrayWrapper> processedKeys = new HashSet<>();
+
+        // UPDATE_AFTER for key=1 (skipped, key NOT added to set)
+        undoComputer.processRecord(
+                new ScanRecord(0L, 0L, ChangeType.UPDATE_AFTER, row(1, 
"after", 100)),
+                processedKeys);
+
+        // INSERT for same key=1 (should be processed since UPDATE_AFTER 
didn't add key)
+        CompletableFuture<?> f =
+                undoComputer.processRecord(
+                        new ScanRecord(1L, 0L, ChangeType.INSERT, row(1, 
"insert", 200)),
+                        processedKeys);
+
+        assertThat(f).isNotNull();
+        assertThat(mockWriter.getDeleteCount()).isEqualTo(1);
+        assertThat(processedKeys).hasSize(1);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutorTest.java
new file mode 100644
index 000000000..d60c10d9a
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutorTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.flink.utils.TestLogScanner;
+import org.apache.fluss.flink.utils.TestUpsertWriter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link UndoRecoveryExecutor}.
+ *
+ * <p>Tests verify: (1) streaming execution with futures, (2) multi-bucket 
recovery, (3) exception
+ * propagation.
+ */
+class UndoRecoveryExecutorTest {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(DataTypes.INT(), DataTypes.STRING(), DataTypes.INT());
+    private static final List<String> PRIMARY_KEY_COLUMNS = 
Collections.singletonList("f0");
+    private static final long TABLE_ID = 1L;
+
+    private KeyEncoder keyEncoder;
+    private TestUpsertWriter mockWriter;
+    private TestLogScanner mockScanner;
+    private UndoComputer undoComputer;
+    private UndoRecoveryExecutor executor;
+
+    // Short timeout value for testing (total ~300ms instead of 1 hour)
+    private static final long TEST_MAX_TOTAL_WAIT_TIME_MS = 300;
+
+    @BeforeEach
+    void setUp() {
+        keyEncoder = KeyEncoder.of(ROW_TYPE, PRIMARY_KEY_COLUMNS, null);
+        mockWriter = new TestUpsertWriter();
+        mockScanner = new TestLogScanner();
+        undoComputer = new UndoComputer(keyEncoder, mockWriter);
+        // Use short timeout for testing
+        executor =
+                new UndoRecoveryExecutor(
+                        mockScanner, mockWriter, undoComputer, 
TEST_MAX_TOTAL_WAIT_TIME_MS);
+    }
+
+    /**
+     * Test multi-bucket recovery with mixed ChangeTypes and key deduplication.
+     *
+     * <p>Validates: Requirements 3.3 - All futures complete after execute.
+     */
+    @Test
+    void testMultiBucketRecoveryWithDeduplication() throws Exception {
+        TableBucket bucket0 = new TableBucket(TABLE_ID, 0);
+        TableBucket bucket1 = new TableBucket(TABLE_ID, 1);
+
+        BucketRecoveryContext ctx0 = new BucketRecoveryContext(bucket0, 0L, 
4L);
+
+        BucketRecoveryContext ctx1 = new BucketRecoveryContext(bucket1, 0L, 
3L);
+
+        // Bucket0: INSERT(key=1), UPDATE_BEFORE(key=1, dup), INSERT(key=2), 
DELETE(key=3)
+        mockScanner.setRecordsForBucket(
+                bucket0,
+                Arrays.asList(
+                        new ScanRecord(0L, 0L, ChangeType.INSERT, row(1, "a", 
100)),
+                        new ScanRecord(1L, 0L, ChangeType.UPDATE_BEFORE, 
row(1, "b", 200)),
+                        new ScanRecord(2L, 0L, ChangeType.INSERT, row(2, "c", 
300)),
+                        new ScanRecord(3L, 0L, ChangeType.DELETE, row(3, "d", 
400))));
+
+        // Bucket1: DELETE(key=10), UPDATE_AFTER(key=11, skip), INSERT(key=12)
+        mockScanner.setRecordsForBucket(
+                bucket1,
+                Arrays.asList(
+                        new ScanRecord(0L, 0L, ChangeType.DELETE, row(10, "e", 
500)),
+                        new ScanRecord(1L, 0L, ChangeType.UPDATE_AFTER, 
row(11, "f", 600)),
+                        new ScanRecord(2L, 0L, ChangeType.INSERT, row(12, "g", 
700))));
+
+        executor.execute(Arrays.asList(ctx0, ctx1));
+
+        // Bucket0: 3 unique keys (key=1 deduplicated)
+        // - key=1: INSERT → delete
+        // - key=2: INSERT → delete
+        // - key=3: DELETE → upsert
+        assertThat(ctx0.getProcessedKeys()).hasSize(3);
+
+        // Bucket1: 2 unique keys (UPDATE_AFTER skipped)
+        // - key=10: DELETE → upsert
+        // - key=12: INSERT → delete
+        assertThat(ctx1.getProcessedKeys()).hasSize(2);
+
+        // Total: 3 deletes (key=1,2,12), 2 upserts (key=3,10)
+        assertThat(mockWriter.getDeleteCount()).isEqualTo(3);
+        assertThat(mockWriter.getUpsertCount()).isEqualTo(2);
+        assertThat(mockWriter.isFlushCalled()).isTrue();
+
+        assertThat(ctx0.isComplete()).isTrue();
+        assertThat(ctx1.isComplete()).isTrue();
+    }
+
+    /**
+     * Test that recovery is skipped when checkpoint offset >= target offset.
+     *
+     * <p>Validates: No unnecessary work when no recovery needed.
+     */
+    @Test
+    void testNoRecoveryNeededSkipsExecution() throws Exception {
+        TableBucket bucket = new TableBucket(TABLE_ID, 0);
+
+        // Checkpoint offset (5) >= log end offset (5) → no recovery needed
+        BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 5L, 5L);
+
+        executor.execute(Collections.singletonList(ctx));
+
+        assertThat(mockWriter.getDeleteCount()).isEqualTo(0);
+        assertThat(mockWriter.getUpsertCount()).isEqualTo(0);
+        assertThat(mockWriter.isFlushCalled()).isFalse();
+    }
+
+    /**
+     * Test exception propagation from writer failures.
+     *
+     * <p>Validates: Requirements 7.1, 7.2 - Exception propagation.
+     */
+    @Test
+    void testExceptionPropagationFromWriter() {
+        TableBucket bucket = new TableBucket(TABLE_ID, 0);
+
+        BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 0L, 2L);
+
+        mockScanner.setRecordsForBucket(
+                bucket,
+                Arrays.asList(
+                        new ScanRecord(0L, 0L, ChangeType.INSERT, row(1, "a", 
100)),
+                        new ScanRecord(1L, 0L, ChangeType.INSERT, row(2, "b", 
200))));
+
+        mockWriter.setShouldFail(true);
+
+        assertThatThrownBy(() -> 
executor.execute(Collections.singletonList(ctx)))
+                .hasCauseInstanceOf(RuntimeException.class)
+                .hasMessageContaining("Simulated write failure");
+    }
+
+    /**
+     * Test that exception is thrown after max total wait time.
+     *
+     * <p>Validates: Undo recovery timeout triggers a retryable exception. 
Note: This test uses a
+     * mock scanner that always returns empty, so it will hit the timeout 
quickly in test
+     * environment. In production, the timeout is 1 hour.
+     */
+    @Test
+    void testFatalExceptionOnMaxEmptyPolls() {
+        TableBucket bucket = new TableBucket(TABLE_ID, 0);
+
+        BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 0L, 2L);
+
+        // Configure scanner to always return empty (simulating network issues 
or server problems)
+        mockScanner.setAlwaysReturnEmpty(true);
+
+        // The test will timeout based on total wait time, but since we're 
using a mock
+        // scanner with no actual delay, it will fail quickly
+        assertThatThrownBy(() -> 
executor.execute(Collections.singletonList(ctx)))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("Undo recovery timed out")
+                .hasMessageContaining("minutes of waiting")
+                .hasMessageContaining("still incomplete")
+                .hasMessageContaining("restart and retry");
+    }
+
+    /**
+     * Test multi-poll scenario where records are returned in batches.
+     *
+     * <p>This tests realistic LogScanner behavior where records are returned 
incrementally across
+     * multiple poll() calls, ensuring the executor correctly handles partial 
results.
+     */
+    @Test
+    void testMultiPollBatchProcessing() throws Exception {
+        TableBucket bucket = new TableBucket(TABLE_ID, 0);
+
+        BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 0L, 6L);
+
+        // Configure 6 records but return only 2 per poll
+        mockScanner.setRecordsForBucket(
+                bucket,
+                Arrays.asList(
+                        new ScanRecord(0L, 0L, ChangeType.INSERT, row(1, "a", 
100)),
+                        new ScanRecord(1L, 0L, ChangeType.INSERT, row(2, "b", 
200)),
+                        new ScanRecord(2L, 0L, ChangeType.DELETE, row(3, "c", 
300)),
+                        new ScanRecord(3L, 0L, ChangeType.UPDATE_BEFORE, 
row(4, "d", 400)),
+                        new ScanRecord(4L, 0L, ChangeType.UPDATE_AFTER, row(4, 
"e", 500)),
+                        new ScanRecord(5L, 0L, ChangeType.INSERT, row(5, "f", 
600))));
+        mockScanner.setBatchSize(2);
+
+        executor.execute(Collections.singletonList(ctx));
+
+        // Should process all 6 records across 3 polls
+        // key=1: INSERT → delete
+        // key=2: INSERT → delete
+        // key=3: DELETE → upsert
+        // key=4: UPDATE_BEFORE → upsert (UPDATE_AFTER skipped)
+        // key=5: INSERT → delete
+        assertThat(ctx.getProcessedKeys()).hasSize(5);
+        assertThat(ctx.getTotalRecordsProcessed()).isEqualTo(6);
+        assertThat(mockWriter.getDeleteCount()).isEqualTo(3); // keys 1, 2, 5
+        assertThat(mockWriter.getUpsertCount()).isEqualTo(2); // keys 3, 4
+        assertThat(ctx.isComplete()).isTrue();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
new file mode 100644
index 000000000..9e7762c30
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
@@ -0,0 +1,999 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.writer.undo;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.ListOffsetsResult;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.UpsertResult;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.sink.writer.undo.UndoRecoveryManager.UndoOffsets;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.MergeEngineType;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.PartitionSpec;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration tests for {@link UndoRecoveryManager}.
+ *
+ * <p>Tests use complex multi-bucket, multi-key scenarios to ensure only 
correct implementations
+ * pass. Each test covers multiple aspects to maximize value while minimizing 
redundancy.
+ */
+public class UndoRecoveryManagerITCase {
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(
+                            new Configuration()
+                                    // not to clean snapshots for test purpose
+                                    .set(
+                                            
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
+                                            Integer.MAX_VALUE))
+                    .setNumOfTabletServers(3)
+                    .build();
+
+    private static final String DEFAULT_DB = "test-flink-db";
+
+    protected static Connection conn;
+    protected static Admin admin;
+
+    @BeforeAll
+    static void beforeAll() {
+        Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @BeforeEach
+    void beforeEach() throws Exception {
+        admin.createDatabase(DEFAULT_DB, DatabaseDescriptor.EMPTY, true).get();
+    }
+
+    @AfterAll
+    static void afterAll() throws Exception {
+        if (admin != null) {
+            admin.close();
+            admin = null;
+        }
+        if (conn != null) {
+            conn.close();
+            conn = null;
+        }
+    }
+
+    protected long createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+            throws Exception {
+        admin.createTable(tablePath, tableDescriptor, true).get();
+        return admin.getTableInfo(tablePath).get().getTableId();
+    }
+
+    private static final int NUM_BUCKETS = 4;
+
+    private static final Schema TEST_SCHEMA =
+            Schema.newBuilder()
+                    .column("id", DataTypes.INT())
+                    .column("name", DataTypes.STRING())
+                    .column("price", DataTypes.INT())
+                    .column("stock", DataTypes.INT())
+                    .primaryKey("id")
+                    .build();
+
+    private static final Schema PARTITIONED_SCHEMA =
+            Schema.newBuilder()
+                    .column("id", DataTypes.INT())
+                    .column("name", DataTypes.STRING())
+                    .column("price", DataTypes.INT())
+                    .column("pt", DataTypes.STRING())
+                    .primaryKey("id", "pt")
+                    .build();
+
+    // ==================== Full Update Tests ====================
+
+    /**
+     * Comprehensive test for Full Update mode with 4 buckets and 100+ keys.
+     *
+     * <p>Covers: zero-offset recovery, DELETE undo, UPDATE undo with 
deduplication, mixed ops.
+     */
+    @Test
+    void testFullUpdateMultiBucketComprehensiveRecovery() throws Exception {
+        TablePath tablePath = createTestTablePath("full_update_test");
+        long tableId =
+                createTable(
+                        tablePath,
+                        TableDescriptor.builder()
+                                .schema(TEST_SCHEMA)
+                                .distributedBy(NUM_BUCKETS, "id")
+                                .property(
+                                        ConfigOptions.TABLE_MERGE_ENGINE,
+                                        MergeEngineType.AGGREGATION)
+                                .build());
+
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter writer = table.newUpsert().createWriter();
+            Lookuper lookuper = table.newLookup().createLookuper();
+            BucketTracker tracker = new BucketTracker(NUM_BUCKETS, tableId);
+
+            // Phase 1: Write initial 100 keys (batch async)
+            List<CompletableFuture<UpsertResult>> writeFutures = new 
ArrayList<>();
+            for (int id = 0; id < 100; id++) {
+                writeFutures.add(writer.upsert(row(id, "pre_" + id, id * 10, 
id)));
+            }
+            for (int i = 0; i < writeFutures.size(); i++) {
+                tracker.recordWrite(i, writeFutures.get(i).get());
+            }
+            writer.flush();
+
+            int zeroOffsetBucket = tracker.selectBucketForZeroOffsetRecovery();
+            Map<TableBucket, Long> checkpointOffsets =
+                    
tracker.buildCheckpointOffsetsWithZeroOffset(zeroOffsetBucket);
+
+            Map<Integer, Set<Integer>> keysAfterCheckpoint = new HashMap<>();
+            for (int i = 0; i < NUM_BUCKETS; i++) {
+                keysAfterCheckpoint.put(i, new HashSet<>());
+            }
+
+            // Phase 2: Operations after checkpoint (batch async)
+            List<CompletableFuture<UpsertResult>> newKeyFutures = new 
ArrayList<>();
+            for (int id = 200; id < 300; id++) {
+                newKeyFutures.add(writer.upsert(row(id, "new_" + id, id * 10, 
id)));
+            }
+            writer.flush();
+            for (int i = 0; i < newKeyFutures.size(); i++) {
+                UpsertResult result = newKeyFutures.get(i).get();
+                
keysAfterCheckpoint.get(result.getBucket().getBucket()).add(200 + i);
+            }
+
+            for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+                if (bucket != zeroOffsetBucket && 
!tracker.keysByBucket.get(bucket).isEmpty()) {
+                    List<Integer> keys = tracker.keysByBucket.get(bucket);
+                    for (int i = 0; i < Math.min(3, keys.size()); i++) {
+                        int id = keys.get(i);
+                        writer.upsert(row(id, "updated_" + id, 9999, 9999));
+                        keysAfterCheckpoint.get(bucket).add(id);
+                    }
+                    if (keys.size() > 5) {
+                        for (int i = 3; i < Math.min(5, keys.size()); i++) {
+                            int id = keys.get(i);
+                            writer.delete(row(id, "pre_" + id, id * 10, id));
+                            keysAfterCheckpoint.get(bucket).add(id);
+                        }
+                    }
+                    if (keys.size() > 6) {
+                        int id = keys.get(5);
+                        writer.upsert(row(id, "multi_v1", 111, 111));
+                        writer.upsert(row(id, "multi_v2", 222, 222));
+                        writer.upsert(row(id, "multi_v3", 333, 333));
+                        keysAfterCheckpoint.get(bucket).add(id);
+                    }
+                }
+            }
+            writer.flush();
+
+            // Phase 3: Perform undo recovery
+            Map<TableBucket, UndoOffsets> offsetRanges =
+                    buildUndoOffsets(checkpointOffsets, tablePath, admin);
+            new UndoRecoveryManager(table, 
null).performUndoRecovery(offsetRanges, 0, 1);
+
+            // Phase 4: Verify results (parallel lookup)
+            List<CompletableFuture<Void>> verifyFutures = new ArrayList<>();
+
+            for (int id : tracker.keysByBucket.get(zeroOffsetBucket)) {
+                final int keyId = id;
+                verifyFutures.add(
+                        lookuper.lookup(row(keyId))
+                                .thenAccept(
+                                        r ->
+                                                assertThat(r.getSingletonRow())
+                                                        .as(
+                                                                "Zero-offset 
bucket key %d should be deleted",
+                                                                keyId)
+                                                        .isNull()));
+            }
+
+            for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+                if (bucket == zeroOffsetBucket) {
+                    continue;
+                }
+                for (int id : tracker.keysByBucket.get(bucket)) {
+                    final int keyId = id;
+                    final int b = bucket;
+                    verifyFutures.add(
+                            lookuper.lookup(row(keyId))
+                                    .thenAccept(
+                                            r -> {
+                                                InternalRow result = 
r.getSingletonRow();
+                                                assertThat(result)
+                                                        .as(
+                                                                "Bucket %d key 
%d should exist",
+                                                                b, keyId)
+                                                        .isNotNull();
+                                                assertRowEquals(
+                                                        result,
+                                                        keyId,
+                                                        "pre_" + keyId,
+                                                        keyId * 10,
+                                                        keyId);
+                                            }));
+                }
+                for (int id : keysAfterCheckpoint.get(bucket)) {
+                    if (id >= 200) {
+                        final int keyId = id;
+                        final int b = bucket;
+                        verifyFutures.add(
+                                lookuper.lookup(row(keyId))
+                                        .thenAccept(
+                                                r ->
+                                                        
assertThat(r.getSingletonRow())
+                                                                .as(
+                                                                        
"Bucket %d new key %d should be deleted",
+                                                                        b, 
keyId)
+                                                                .isNull()));
+                    }
+                }
+            }
+            CompletableFuture.allOf(verifyFutures.toArray(new 
CompletableFuture[0])).get();
+        }
+    }
+
+    // ==================== Partial Update Tests ====================
+
+    /**
+     * Comprehensive test for Partial Update mode with 4 buckets.
+     *
+     * <p>Covers: target columns rollback, non-target columns preserved, other 
writer's changes
+     * preserved.
+     */
+    @Test
+    void testPartialUpdateMultiBucketComprehensiveRecovery() throws Exception {
+        TablePath tablePath = createTestTablePath("partial_update_test");
+        long tableId =
+                createTable(
+                        tablePath,
+                        TableDescriptor.builder()
+                                .schema(TEST_SCHEMA)
+                                .distributedBy(NUM_BUCKETS, "id")
+                                .property(
+                                        ConfigOptions.TABLE_MERGE_ENGINE,
+                                        MergeEngineType.AGGREGATION)
+                                .build());
+
+        int[] targetColumns = new int[] {0, 2}; // id, price
+
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter fullWriter = table.newUpsert().createWriter();
+            Lookuper lookuper = table.newLookup().createLookuper();
+
+            // Discover bucket assignments efficiently (batch write + delete)
+            Map<Integer, Integer> keyToBucket = new HashMap<>();
+            List<CompletableFuture<UpsertResult>> discoverFutures = new 
ArrayList<>();
+            for (int id = 0; id < 80; id++) {
+                discoverFutures.add(fullWriter.upsert(row(id, "temp", 0, 0)));
+            }
+            for (int i = 0; i < discoverFutures.size(); i++) {
+                keyToBucket.put(i, 
discoverFutures.get(i).get().getBucket().getBucket());
+            }
+            fullWriter.flush();
+            for (int id = 0; id < 80; id++) {
+                fullWriter.delete(row(id, "temp", 0, 0));
+            }
+            fullWriter.flush();
+
+            Map<Integer, List<Integer>> initialKeysByBucket = new HashMap<>();
+            for (int i = 0; i < NUM_BUCKETS; i++) {
+                initialKeysByBucket.put(i, new ArrayList<>());
+            }
+            for (int id = 0; id < 80; id++) {
+                initialKeysByBucket.get(keyToBucket.get(id)).add(id);
+            }
+            int zeroOffsetBucket =
+                    selectBucketForZeroOffsetRecovery(initialKeysByBucket, 
NUM_BUCKETS);
+
+            // Phase 1: Write initial data (except zeroOffsetBucket)
+            Map<Integer, List<Integer>> keysByBucket = new HashMap<>();
+            Map<Integer, Long> bucketOffsets = new HashMap<>();
+            for (int i = 0; i < NUM_BUCKETS; i++) {
+                keysByBucket.put(i, new ArrayList<>());
+            }
+
+            List<CompletableFuture<UpsertResult>> initFutures = new 
ArrayList<>();
+            List<Integer> initIds = new ArrayList<>();
+            for (int id = 0; id < 80; id++) {
+                int bucket = keyToBucket.get(id);
+                if (bucket != zeroOffsetBucket) {
+                    initFutures.add(fullWriter.upsert(row(id, "name_" + id, id 
* 10, id * 100)));
+                    initIds.add(id);
+                }
+            }
+            for (int i = 0; i < initFutures.size(); i++) {
+                UpsertResult result = initFutures.get(i).get();
+                int id = initIds.get(i);
+                int bucket = result.getBucket().getBucket();
+                keysByBucket.get(bucket).add(id);
+                bucketOffsets.put(bucket, result.getLogEndOffset());
+            }
+            fullWriter.flush();
+
+            Map<TableBucket, Long> checkpointOffsets = new HashMap<>();
+            checkpointOffsets.put(new TableBucket(tableId, zeroOffsetBucket), 
0L);
+            for (int i = 0; i < NUM_BUCKETS; i++) {
+                if (i != zeroOffsetBucket) {
+                    checkpointOffsets.put(
+                            new TableBucket(tableId, i), 
bucketOffsets.getOrDefault(i, 0L));
+                }
+            }
+
+            Set<Integer> zeroOffsetBucketKeysAfterCheckpoint = new HashSet<>();
+            Map<Integer, Set<Integer>> newKeysAfterCheckpointByBucket = new 
HashMap<>();
+            for (int i = 0; i < NUM_BUCKETS; i++) {
+                newKeysAfterCheckpointByBucket.put(i, new HashSet<>());
+            }
+
+            // Phase 2: Partial updates after checkpoint
+            UpsertWriter partialWriter =
+                    
table.newUpsert().partialUpdate(targetColumns).createWriter();
+
+            for (int id = 0; id < 80; id++) {
+                if (keyToBucket.get(id) == zeroOffsetBucket) {
+                    partialWriter.upsert(row(id, null, 9999, null));
+                    zeroOffsetBucketKeysAfterCheckpoint.add(id);
+                }
+            }
+
+            List<CompletableFuture<UpsertResult>> newPartialFutures = new 
ArrayList<>();
+            for (int id = 100; id < 130; id++) {
+                newPartialFutures.add(partialWriter.upsert(row(id, null, id * 
10, null)));
+            }
+            for (int i = 0; i < newPartialFutures.size(); i++) {
+                UpsertResult result = newPartialFutures.get(i).get();
+                int id = 100 + i;
+                int bucket = result.getBucket().getBucket();
+                if (bucket == zeroOffsetBucket) {
+                    zeroOffsetBucketKeysAfterCheckpoint.add(id);
+                } else {
+                    newKeysAfterCheckpointByBucket.get(bucket).add(id);
+                }
+            }
+
+            for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+                if (bucket != zeroOffsetBucket) {
+                    List<Integer> keys = keysByBucket.get(bucket);
+                    for (int i = 0; i < Math.min(5, keys.size()); i++) {
+                        partialWriter.upsert(row(keys.get(i), null, 8888, 
null));
+                    }
+                }
+            }
+            partialWriter.flush();
+
+            // Other Writer updates stock (should NOT be undone)
+            UpsertWriter otherWriter =
+                    table.newUpsert().partialUpdate(new int[] {0, 
3}).createWriter();
+            for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+                if (bucket != zeroOffsetBucket) {
+                    for (int id : keysByBucket.get(bucket)) {
+                        otherWriter.upsert(row(id, null, null, 5555));
+                    }
+                }
+            }
+            otherWriter.flush();
+
+            // Phase 3: Perform undo recovery
+            Map<TableBucket, UndoOffsets> offsetRanges =
+                    buildUndoOffsets(checkpointOffsets, tablePath, admin);
+            new UndoRecoveryManager(table, 
targetColumns).performUndoRecovery(offsetRanges, 0, 1);
+
+            // Phase 4: Verify results (parallel lookup)
+            List<CompletableFuture<Void>> verifyFutures = new ArrayList<>();
+
+            for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+                if (bucket == zeroOffsetBucket) {
+                    continue;
+                }
+
+                for (int id : keysByBucket.get(bucket)) {
+                    final int keyId = id;
+                    final int b = bucket;
+                    verifyFutures.add(
+                            lookuper.lookup(row(keyId))
+                                    .thenAccept(
+                                            r -> {
+                                                InternalRow result = 
r.getSingletonRow();
+                                                assertThat(result)
+                                                        .as(
+                                                                "Bucket %d key 
%d should exist",
+                                                                b, keyId)
+                                                        .isNotNull();
+                                                
assertThat(result.getString(1).toString())
+                                                        .isEqualTo("name_" + 
keyId);
+                                                assertThat(result.getInt(2))
+                                                        .as("price restored")
+                                                        .isEqualTo(keyId * 10);
+                                                assertThat(result.getInt(3))
+                                                        .as("stock kept")
+                                                        .isEqualTo(5555);
+                                            }));
+                }
+            }
+            CompletableFuture.allOf(verifyFutures.toArray(new 
CompletableFuture[0])).get();
+        }
+    }
+
+    // ==================== Partitioned Table Tests ====================
+
+    /**
+     * Test undo recovery for partitioned table with multiple partitions.
+     *
+     * <p>Covers: multi-partition recovery, INSERT/UPDATE/DELETE undo across 
partitions.
+     */
+    @Test
+    void testPartitionedTableRecovery() throws Exception {
+        TablePath tablePath = createTestTablePath("partitioned_test");
+        long tableId =
+                createTable(
+                        tablePath,
+                        TableDescriptor.builder()
+                                .schema(PARTITIONED_SCHEMA)
+                                .partitionedBy("pt")
+                                .distributedBy(2, "id")
+                                .property(
+                                        ConfigOptions.TABLE_MERGE_ENGINE,
+                                        MergeEngineType.AGGREGATION)
+                                .build());
+
+        String partition1 = "p1", partition2 = "p2";
+        admin.createPartition(tablePath, newPartitionSpec("pt", partition1), 
false).get();
+        admin.createPartition(tablePath, newPartitionSpec("pt", partition2), 
false).get();
+
+        List<PartitionInfo> partitionInfos = 
admin.listPartitionInfos(tablePath).get();
+        Map<String, Long> partitionNameToId = new HashMap<>();
+        for (PartitionInfo info : partitionInfos) {
+            partitionNameToId.put(info.getPartitionName(), 
info.getPartitionId());
+        }
+        long partition1Id = partitionNameToId.get(partition1);
+        long partition2Id = partitionNameToId.get(partition2);
+
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter writer = table.newUpsert().createWriter();
+            Lookuper lookuper = table.newLookup().createLookuper();
+            Map<TableBucket, Long> checkpointOffsets = new HashMap<>();
+
+            // Phase 1: Write initial data (batch)
+            List<CompletableFuture<UpsertResult>> p1Futures = new 
ArrayList<>();
+            List<CompletableFuture<UpsertResult>> p2Futures = new 
ArrayList<>();
+            for (int id = 0; id < 10; id++) {
+                p1Futures.add(writer.upsert(row(id, "p1_name_" + id, id * 10, 
partition1)));
+                p2Futures.add(writer.upsert(row(id, "p2_name_" + id, id * 20, 
partition2)));
+            }
+            for (int i = 0; i < 10; i++) {
+                UpsertResult r1 = p1Futures.get(i).get();
+                checkpointOffsets.put(
+                        new TableBucket(tableId, partition1Id, 
r1.getBucket().getBucket()),
+                        r1.getLogEndOffset());
+                UpsertResult r2 = p2Futures.get(i).get();
+                checkpointOffsets.put(
+                        new TableBucket(tableId, partition2Id, 
r2.getBucket().getBucket()),
+                        r2.getLogEndOffset());
+            }
+            writer.flush();
+
+            // Phase 2: Operations after checkpoint
+            Set<Integer> newKeysInP1 = new HashSet<>(), newKeysInP2 = new 
HashSet<>();
+            for (int id = 100; id < 105; id++) {
+                writer.upsert(row(id, "p1_new_" + id, id * 10, partition1));
+                newKeysInP1.add(id);
+            }
+            for (int id = 0; id < 3; id++) {
+                writer.upsert(row(id, "p1_updated_" + id, 9999, partition1));
+            }
+            for (int id = 5; id < 8; id++) {
+                writer.delete(row(id, "p2_name_" + id, id * 20, partition2));
+            }
+            for (int id = 200; id < 205; id++) {
+                writer.upsert(row(id, "p2_new_" + id, id * 20, partition2));
+                newKeysInP2.add(id);
+            }
+            writer.flush();
+
+            // Phase 3: Perform undo recovery
+            Map<TableBucket, UndoOffsets> offsetRanges =
+                    buildUndoOffsets(checkpointOffsets, tablePath, admin);
+            new UndoRecoveryManager(table, 
null).performUndoRecovery(offsetRanges, 0, 1);
+
+            // Phase 4: Verify results (parallel lookup)
+            List<CompletableFuture<Void>> verifyFutures = new ArrayList<>();
+
+            for (int id = 0; id < 10; id++) {
+                final int keyId = id;
+                verifyFutures.add(
+                        lookuper.lookup(row(keyId, partition1))
+                                .thenAccept(
+                                        r -> {
+                                            InternalRow result = 
r.getSingletonRow();
+                                            assertThat(result)
+                                                    .as("P1 key %d should 
exist", keyId)
+                                                    .isNotNull();
+                                            
assertThat(result.getString(1).toString())
+                                                    .isEqualTo("p1_name_" + 
keyId);
+                                            
assertThat(result.getInt(2)).isEqualTo(keyId * 10);
+                                        }));
+                verifyFutures.add(
+                        lookuper.lookup(row(keyId, partition2))
+                                .thenAccept(
+                                        r -> {
+                                            InternalRow result = 
r.getSingletonRow();
+                                            assertThat(result)
+                                                    .as("P2 key %d should 
exist", keyId)
+                                                    .isNotNull();
+                                            
assertThat(result.getString(1).toString())
+                                                    .isEqualTo("p2_name_" + 
keyId);
+                                            
assertThat(result.getInt(2)).isEqualTo(keyId * 20);
+                                        }));
+            }
+
+            CompletableFuture.allOf(verifyFutures.toArray(new 
CompletableFuture[0])).get();
+        }
+    }
+
+    // ==================== Idempotency and Exception Tests 
====================
+
+    /**
+     * Test that undo recovery is idempotent - multiple executions produce the 
same result.
+     *
+     * <p>Covers: 4 buckets, 200 keys, mixed operations, 2 recovery rounds.
+     */
+    @Test
+    void testRecoveryIdempotency() throws Exception {
+        final int numBuckets = 4;
+        final int keysPerBucket = 50;
+
+        TablePath tablePath = createTestTablePath("idempotency_test");
+        long tableId =
+                createTable(
+                        tablePath,
+                        TableDescriptor.builder()
+                                .schema(TEST_SCHEMA)
+                                .distributedBy(numBuckets, "id")
+                                .property(
+                                        ConfigOptions.TABLE_MERGE_ENGINE,
+                                        MergeEngineType.AGGREGATION)
+                                .build());
+
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter writer = table.newUpsert().createWriter();
+            Lookuper lookuper = table.newLookup().createLookuper();
+            BucketTracker tracker = new BucketTracker(numBuckets, tableId);
+
+            int totalKeys = numBuckets * keysPerBucket;
+
+            // Batch write initial keys
+            List<CompletableFuture<UpsertResult>> initFutures = new 
ArrayList<>();
+            for (int id = 0; id < totalKeys; id++) {
+                initFutures.add(writer.upsert(row(id, "original_" + id, id * 
10, id * 100)));
+            }
+            for (int i = 0; i < initFutures.size(); i++) {
+                tracker.recordWrite(i, initFutures.get(i).get());
+            }
+            writer.flush();
+
+            Map<TableBucket, Long> checkpointOffsets = 
tracker.buildCheckpointOffsetsAll();
+
+            // Batch write new keys
+            Set<Integer> newKeys = new HashSet<>();
+            for (int id = totalKeys; id < totalKeys + 50; id++) {
+                writer.upsert(row(id, "new_" + id, id * 10, id * 100));
+                newKeys.add(id);
+            }
+
+            for (int bucket = 0; bucket < numBuckets; bucket++) {
+                List<Integer> keys = tracker.keysByBucket.get(bucket);
+                for (int i = 0; i < Math.min(3, keys.size()); i++) {
+                    writer.upsert(row(keys.get(i), "updated_" + keys.get(i), 
9999, 9999));
+                }
+                if (keys.size() > 5) {
+                    int id = keys.get(5);
+                    writer.delete(row(id, "original_" + id, id * 10, id * 
100));
+                }
+            }
+            writer.flush();
+
+            // Execute recovery 2 times - results should be identical
+            // For idempotency, we need to update checkpoint offsets after 
first recovery
+            // because recovery writes undo operations which advance the log 
end offset
+            Map<TableBucket, Long> currentCheckpointOffsets = new 
HashMap<>(checkpointOffsets);
+
+            for (int round = 1; round <= 2; round++) {
+                Map<TableBucket, UndoOffsets> offsetRanges =
+                        buildUndoOffsets(currentCheckpointOffsets, tablePath, 
admin);
+                new UndoRecoveryManager(table, 
null).performUndoRecovery(offsetRanges, 0, 1);
+
+                // After first recovery, update checkpoint offsets to current 
log end offsets
+                // This simulates what would happen in a real scenario where 
checkpoint is taken
+                // after recovery completes
+                if (round == 1) {
+                    for (Map.Entry<TableBucket, UndoOffsets> entry : 
offsetRanges.entrySet()) {
+                        // Query the new log end offset after recovery
+                        TableBucket tb = entry.getKey();
+                        List<Integer> bucketIds = new ArrayList<>();
+                        bucketIds.add(tb.getBucket());
+                        ListOffsetsResult offsetsResult =
+                                admin.listOffsets(
+                                        tablePath, bucketIds, new 
OffsetSpec.LatestSpec());
+                        Long newOffset = 
offsetsResult.bucketResult(tb.getBucket()).get();
+                        if (newOffset != null) {
+                            currentCheckpointOffsets.put(tb, newOffset);
+                        }
+                    }
+                }
+
+                // Batch verify (parallel lookup)
+                List<CompletableFuture<Void>> verifyFutures = new 
ArrayList<>();
+                final int r = round;
+
+                for (int id : newKeys) {
+                    final int keyId = id;
+                    verifyFutures.add(
+                            lookuper.lookup(row(keyId))
+                                    .thenAccept(
+                                            result ->
+                                                    
assertThat(result.getSingletonRow())
+                                                            .as(
+                                                                    "Round %d: 
New key %d should be deleted",
+                                                                    r, keyId)
+                                                            .isNull()));
+                }
+
+                for (int bucket = 0; bucket < numBuckets; bucket++) {
+                    for (int id : tracker.keysByBucket.get(bucket)) {
+                        final int keyId = id;
+                        verifyFutures.add(
+                                lookuper.lookup(row(keyId))
+                                        .thenAccept(
+                                                lookupResult -> {
+                                                    InternalRow result =
+                                                            
lookupResult.getSingletonRow();
+                                                    assertThat(result)
+                                                            .as("Round %d: Key 
%d exists", r, keyId)
+                                                            .isNotNull();
+                                                    assertRowEquals(
+                                                            result,
+                                                            keyId,
+                                                            "original_" + 
keyId,
+                                                            keyId * 10,
+                                                            keyId * 100);
+                                                }));
+                    }
+                }
+                CompletableFuture.allOf(verifyFutures.toArray(new 
CompletableFuture[0])).get();
+            }
+        }
+    }
+
+    /** Test that scanner exceptions are properly propagated. */
+    @Test
+    void testRecoveryWithScannerException() throws Exception {
+        TablePath tablePath = createTestTablePath("scanner_exception_test");
+        long tableId =
+                createTable(
+                        tablePath,
+                        TableDescriptor.builder()
+                                .schema(TEST_SCHEMA)
+                                .distributedBy(2, "id")
+                                .property(
+                                        ConfigOptions.TABLE_MERGE_ENGINE,
+                                        MergeEngineType.AGGREGATION)
+                                .build());
+
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter writer = table.newUpsert().createWriter();
+            Map<TableBucket, Long> checkpointOffsets = new HashMap<>();
+
+            List<CompletableFuture<UpsertResult>> futures = new ArrayList<>();
+            for (int id = 0; id < 10; id++) {
+                futures.add(writer.upsert(row(id, "name_" + id, id * 10, id)));
+            }
+            for (int i = 0; i < futures.size(); i++) {
+                UpsertResult result = futures.get(i).get();
+                checkpointOffsets.put(
+                        new TableBucket(tableId, 
result.getBucket().getBucket()),
+                        result.getLogEndOffset());
+            }
+            writer.flush();
+
+            for (int id = 100; id < 110; id++) {
+                writer.upsert(row(id, "new_" + id, id * 10, id));
+            }
+            writer.flush();
+
+            final String errorMessage = "Simulated scanner failure for 
testing";
+            FaultInjectingUndoRecoveryManager faultHandler =
+                    new FaultInjectingUndoRecoveryManager(table, null);
+            faultHandler.setExceptionToThrow(new 
RuntimeException(errorMessage));
+
+            Map<TableBucket, UndoOffsets> offsetRanges =
+                    buildUndoOffsets(checkpointOffsets, tablePath, admin);
+            assertThatThrownBy(() -> 
faultHandler.performUndoRecovery(offsetRanges, 0, 1))
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessageContaining(errorMessage);
+        }
+    }
+
+    // ==================== Helper Classes ====================
+
+    /** Tracks bucket assignments and offsets during test setup. */
+    private static class BucketTracker {
+        final int numBuckets;
+        final long tableId;
+        final Map<Integer, List<Integer>> keysByBucket = new HashMap<>();
+        final Map<Integer, Long> bucketOffsets = new HashMap<>();
+
+        BucketTracker(int numBuckets, long tableId) {
+            this.numBuckets = numBuckets;
+            this.tableId = tableId;
+            for (int i = 0; i < numBuckets; i++) {
+                keysByBucket.put(i, new ArrayList<>());
+            }
+        }
+
+        void recordWrite(int key, UpsertResult result) {
+            int bucketId = result.getBucket().getBucket();
+            keysByBucket.get(bucketId).add(key);
+            bucketOffsets.put(bucketId, result.getLogEndOffset());
+        }
+
+        int selectBucketForZeroOffsetRecovery() {
+            int maxBucket = 0, maxKeys = 0;
+            for (int i = 0; i < numBuckets; i++) {
+                if (keysByBucket.get(i).size() > maxKeys) {
+                    maxKeys = keysByBucket.get(i).size();
+                    maxBucket = i;
+                }
+            }
+            return maxBucket;
+        }
+
+        Map<TableBucket, Long> buildCheckpointOffsetsWithZeroOffset(int 
zeroOffsetBucket) {
+            Map<TableBucket, Long> offsets = new HashMap<>();
+            offsets.put(new TableBucket(tableId, zeroOffsetBucket), 0L);
+            for (int i = 0; i < numBuckets; i++) {
+                if (i != zeroOffsetBucket) {
+                    offsets.put(new TableBucket(tableId, i), 
bucketOffsets.getOrDefault(i, 0L));
+                }
+            }
+            return offsets;
+        }
+
+        Map<TableBucket, Long> buildCheckpointOffsetsAll() {
+            Map<TableBucket, Long> offsets = new HashMap<>();
+            for (int i = 0; i < numBuckets; i++) {
+                offsets.put(new TableBucket(tableId, i), 
bucketOffsets.getOrDefault(i, 0L));
+            }
+            return offsets;
+        }
+    }
+
+    private static class FaultInjectingUndoRecoveryManager extends 
UndoRecoveryManager {
+        private RuntimeException exceptionToThrow;
+
+        FaultInjectingUndoRecoveryManager(Table table, int[] 
targetColumnIndexes) {
+            super(table, targetColumnIndexes);
+        }
+
+        void setExceptionToThrow(RuntimeException exception) {
+            this.exceptionToThrow = exception;
+        }
+
+        @Override
+        protected LogScanner createLogScanner() {
+            return new FaultInjectingLogScanner(super.createLogScanner(), 
exceptionToThrow);
+        }
+    }
+
+    private static class FaultInjectingLogScanner implements LogScanner {
+        private final LogScanner delegate;
+        private final RuntimeException exceptionToThrow;
+
+        FaultInjectingLogScanner(LogScanner delegate, RuntimeException 
exceptionToThrow) {
+            this.delegate = delegate;
+            this.exceptionToThrow = exceptionToThrow;
+        }
+
+        @Override
+        public ScanRecords poll(Duration timeout) {
+            if (exceptionToThrow != null) {
+                throw exceptionToThrow;
+            }
+            return delegate.poll(timeout);
+        }
+
+        @Override
+        public void subscribe(int bucket, long offset) {
+            delegate.subscribe(bucket, offset);
+        }
+
+        @Override
+        public void subscribe(long partitionId, int bucket, long offset) {
+            delegate.subscribe(partitionId, bucket, offset);
+        }
+
+        @Override
+        public void unsubscribe(long partitionId, int bucket) {
+            delegate.unsubscribe(partitionId, bucket);
+        }
+
+        @Override
+        public void unsubscribe(int bucket) {
+            delegate.unsubscribe(bucket);
+        }
+
+        @Override
+        public void wakeup() {
+            delegate.wakeup();
+        }
+
+        @Override
+        public void close() throws Exception {
+            delegate.close();
+        }
+    }
+
+    // ==================== Helper Methods ====================
+
+    private TablePath createTestTablePath(String prefix) {
+        return TablePath.of(DEFAULT_DB, prefix + "_" + 
System.currentTimeMillis());
+    }
+
+    private PartitionSpec newPartitionSpec(String key, String value) {
+        return new PartitionSpec(Collections.singletonMap(key, value));
+    }
+
+    private int selectBucketForZeroOffsetRecovery(
+            Map<Integer, List<Integer>> keysByBucket, int numBuckets) {
+        int maxBucket = 0, maxKeys = 0;
+        for (int i = 0; i < numBuckets; i++) {
+            if (keysByBucket.get(i).size() > maxKeys) {
+                maxKeys = keysByBucket.get(i).size();
+                maxBucket = i;
+            }
+        }
+        return maxBucket;
+    }
+
+    private void assertRowEquals(
+            InternalRow row,
+            int expectedId,
+            String expectedName,
+            int expectedPrice,
+            int expectedStock) {
+        assertThat(row.getInt(0)).as("id").isEqualTo(expectedId);
+        
assertThat(row.getString(1).toString()).as("name").isEqualTo(expectedName);
+        assertThat(row.getInt(2)).as("price").isEqualTo(expectedPrice);
+        assertThat(row.getInt(3)).as("stock").isEqualTo(expectedStock);
+    }
+
+    /**
+     * Builds offset ranges by querying the latest offset for each bucket.
+     *
+     * <p>This simulates what the upper layer would do: it already has 
checkpoint offsets and needs
+     * to query latest offsets to determine which buckets need recovery.
+     */
+    private Map<TableBucket, UndoOffsets> buildUndoOffsets(
+            Map<TableBucket, Long> checkpointOffsets, TablePath tablePath, 
Admin admin)
+            throws Exception {
+        Map<TableBucket, UndoOffsets> result = new HashMap<>();
+
+        // Group buckets by partition for efficient batch queries
+        Map<Long, List<TableBucket>> bucketsByPartition = new HashMap<>();
+        List<TableBucket> nonPartitionedBuckets = new ArrayList<>();
+
+        for (TableBucket bucket : checkpointOffsets.keySet()) {
+            if (bucket.getPartitionId() != null) {
+                bucketsByPartition
+                        .computeIfAbsent(bucket.getPartitionId(), k -> new 
ArrayList<>())
+                        .add(bucket);
+            } else {
+                nonPartitionedBuckets.add(bucket);
+            }
+        }
+
+        // Query non-partitioned buckets
+        if (!nonPartitionedBuckets.isEmpty()) {
+            List<Integer> bucketIds = new ArrayList<>();
+            for (TableBucket tb : nonPartitionedBuckets) {
+                bucketIds.add(tb.getBucket());
+            }
+            ListOffsetsResult offsetsResult =
+                    admin.listOffsets(tablePath, bucketIds, new 
OffsetSpec.LatestSpec());
+            for (TableBucket tb : nonPartitionedBuckets) {
+                Long latestOffset = 
offsetsResult.bucketResult(tb.getBucket()).get();
+                if (latestOffset == null) {
+                    latestOffset = 0L;
+                }
+                result.put(tb, new UndoOffsets(checkpointOffsets.get(tb), 
latestOffset));
+            }
+        }
+
+        // Query partitioned buckets
+        if (!bucketsByPartition.isEmpty()) {
+            // Get partition name mapping
+            List<PartitionInfo> partitions = 
admin.listPartitionInfos(tablePath).get();
+            Map<Long, String> partitionIdToName = new HashMap<>();
+            for (PartitionInfo info : partitions) {
+                partitionIdToName.put(info.getPartitionId(), 
info.getPartitionName());
+            }
+
+            for (Map.Entry<Long, List<TableBucket>> entry : 
bucketsByPartition.entrySet()) {
+                Long partitionId = entry.getKey();
+                List<TableBucket> buckets = entry.getValue();
+                String partitionName = partitionIdToName.get(partitionId);
+
+                List<Integer> bucketIds = new ArrayList<>();
+                for (TableBucket tb : buckets) {
+                    bucketIds.add(tb.getBucket());
+                }
+
+                ListOffsetsResult offsetsResult =
+                        admin.listOffsets(
+                                tablePath, partitionName, bucketIds, new 
OffsetSpec.LatestSpec());
+                for (TableBucket tb : buckets) {
+                    Long latestOffset = 
offsetsResult.bucketResult(tb.getBucket()).get();
+                    if (latestOffset == null) {
+                        latestOffset = 0L;
+                    }
+                    result.put(tb, new UndoOffsets(checkpointOffsets.get(tb), 
latestOffset));
+                }
+            }
+        }
+
+        return result;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestLogScanner.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestLogScanner.java
new file mode 100644
index 000000000..44e9a6239
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestLogScanner.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.metadata.TableBucket;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test implementation of {@link LogScanner} for testing.
+ *
+ * <p>Allows pre-configuring records per bucket and returns them on poll(). 
Supports:
+ *
+ * <ul>
+ *   <li>Simulating empty polls for testing exponential backoff behavior
+ *   <li>Batch size control for realistic multi-poll scenarios
+ *   <li>Always-empty mode for testing fatal exception on max empty polls
+ * </ul>
+ */
+public class TestLogScanner implements LogScanner {
+    private final Map<TableBucket, List<ScanRecord>> recordsByBucket = new 
HashMap<>();
+    private final Map<TableBucket, AtomicInteger> pollIndexByBucket = new 
HashMap<>();
+
+    /** Number of empty polls to return before returning actual records. */
+    private int emptyPollsBeforeData = 0;
+
+    private int currentEmptyPollCount = 0;
+
+    /** If true, always return empty records (for testing fatal exception on 
max empty polls). */
+    private boolean alwaysReturnEmpty = false;
+
+    /** Maximum number of records to return per bucket per poll (0 = 
unlimited). */
+    private int batchSize = 0;
+
+    public void setRecordsForBucket(TableBucket bucket, List<ScanRecord> 
records) {
+        recordsByBucket.put(bucket, new ArrayList<>(records));
+        pollIndexByBucket.put(bucket, new AtomicInteger(0));
+    }
+
+    /**
+     * Sets the number of empty polls to return before returning actual 
records.
+     *
+     * @param count number of empty polls
+     */
+    public void setEmptyPollsBeforeData(int count) {
+        this.emptyPollsBeforeData = count;
+        this.currentEmptyPollCount = 0;
+    }
+
+    /**
+     * Sets whether to always return empty records (for testing fatal 
exception).
+     *
+     * @param alwaysEmpty if true, poll() always returns empty
+     */
+    public void setAlwaysReturnEmpty(boolean alwaysEmpty) {
+        this.alwaysReturnEmpty = alwaysEmpty;
+    }
+
+    /**
+     * Sets the maximum number of records to return per bucket per poll.
+     *
+     * <p>This simulates realistic behavior where LogScanner returns records 
in batches. Use this to
+     * test multi-poll scenarios and ensure the executor handles partial 
results correctly.
+     *
+     * @param batchSize max records per bucket per poll (0 = unlimited, 
returns all remaining)
+     */
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public ScanRecords poll(Duration timeout) {
+        // If configured to always return empty, do so
+        if (alwaysReturnEmpty) {
+            return ScanRecords.EMPTY;
+        }
+
+        // Return empty polls if configured
+        if (currentEmptyPollCount < emptyPollsBeforeData) {
+            currentEmptyPollCount++;
+            return ScanRecords.EMPTY;
+        }
+
+        Map<TableBucket, List<ScanRecord>> result = new HashMap<>();
+
+        for (Map.Entry<TableBucket, List<ScanRecord>> entry : 
recordsByBucket.entrySet()) {
+            TableBucket bucket = entry.getKey();
+            List<ScanRecord> allRecords = entry.getValue();
+            AtomicInteger index = pollIndexByBucket.get(bucket);
+
+            if (index.get() < allRecords.size()) {
+                int startIndex = index.get();
+                int endIndex;
+                if (batchSize > 0) {
+                    // Return at most batchSize records
+                    endIndex = Math.min(startIndex + batchSize, 
allRecords.size());
+                } else {
+                    // Return all remaining records
+                    endIndex = allRecords.size();
+                }
+                List<ScanRecord> batch = allRecords.subList(startIndex, 
endIndex);
+                result.put(bucket, new ArrayList<>(batch));
+                index.set(endIndex);
+            }
+        }
+
+        return result.isEmpty() ? ScanRecords.EMPTY : new ScanRecords(result);
+    }
+
+    @Override
+    public void subscribe(int bucket, long offset) {}
+
+    @Override
+    public void subscribe(long partitionId, int bucket, long offset) {}
+
+    @Override
+    public void unsubscribe(long partitionId, int bucket) {}
+
+    @Override
+    public void unsubscribe(int bucket) {}
+
+    @Override
+    public void wakeup() {}
+
+    @Override
+    public void close() {}
+
+    public void reset() {
+        recordsByBucket.clear();
+        pollIndexByBucket.clear();
+        emptyPollsBeforeData = 0;
+        currentEmptyPollCount = 0;
+        alwaysReturnEmpty = false;
+        batchSize = 0;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestUpsertWriter.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestUpsertWriter.java
new file mode 100644
index 000000000..fceaa027f
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestUpsertWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.client.table.writer.DeleteResult;
+import org.apache.fluss.client.table.writer.UpsertResult;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.row.InternalRow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Test implementation of {@link UpsertWriter} for testing.
+ *
+ * <p>Tracks all upsert/delete operations and supports failure injection.
+ */
+public class TestUpsertWriter implements UpsertWriter {
+    private int upsertCount = 0;
+    private int deleteCount = 0;
+    private boolean flushCalled = false;
+    private boolean shouldFail = false;
+    private InternalRow lastUpsertedRow;
+    private InternalRow lastDeletedRow;
+    private final List<InternalRow> allUpsertedRows = new ArrayList<>();
+    private final List<InternalRow> allDeletedRows = new ArrayList<>();
+
+    @Override
+    public CompletableFuture<UpsertResult> upsert(InternalRow record) {
+        if (shouldFail) {
+            CompletableFuture<UpsertResult> future = new CompletableFuture<>();
+            future.completeExceptionally(new RuntimeException("Simulated write 
failure"));
+            return future;
+        }
+        upsertCount++;
+        lastUpsertedRow = record;
+        allUpsertedRows.add(record);
+        return CompletableFuture.completedFuture(new UpsertResult(new 
TableBucket(1L, 0), 0L));
+    }
+
+    @Override
+    public CompletableFuture<DeleteResult> delete(InternalRow record) {
+        if (shouldFail) {
+            CompletableFuture<DeleteResult> future = new CompletableFuture<>();
+            future.completeExceptionally(new RuntimeException("Simulated write 
failure"));
+            return future;
+        }
+        deleteCount++;
+        lastDeletedRow = record;
+        allDeletedRows.add(record);
+        return CompletableFuture.completedFuture(new DeleteResult(new 
TableBucket(1L, 0), 0L));
+    }
+
+    @Override
+    public void flush() {
+        flushCalled = true;
+    }
+
+    public int getUpsertCount() {
+        return upsertCount;
+    }
+
+    public int getDeleteCount() {
+        return deleteCount;
+    }
+
+    public boolean isFlushCalled() {
+        return flushCalled;
+    }
+
+    public InternalRow getLastUpsertedRow() {
+        return lastUpsertedRow;
+    }
+
+    public InternalRow getLastDeletedRow() {
+        return lastDeletedRow;
+    }
+
+    public List<InternalRow> getAllUpsertedRows() {
+        return allUpsertedRows;
+    }
+
+    public List<InternalRow> getAllDeletedRows() {
+        return allDeletedRows;
+    }
+
+    public void setShouldFail(boolean shouldFail) {
+        this.shouldFail = shouldFail;
+    }
+
+    public void reset() {
+        upsertCount = 0;
+        deleteCount = 0;
+        flushCalled = false;
+        shouldFail = false;
+        lastUpsertedRow = null;
+        lastDeletedRow = null;
+        allUpsertedRows.clear();
+        allDeletedRows.clear();
+    }
+}

Reply via email to