This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 6bfc45d96072ee3ede1845a8a3e3ee4368a75c8a
Author: Yang Wang <[email protected]>
AuthorDate: Tue Feb 10 20:13:00 2026 +0800

    [kv] Add zero-offset bucket keys verification in undo recovery test (#2623)
---
 .../writer/undo/UndoRecoveryManagerITCase.java     | 51 ++++++++++++++++++++++
 1 file changed, 51 insertions(+)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
index 9e7762c30..3ea6ec289 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java
@@ -464,6 +464,57 @@ public class UndoRecoveryManagerITCase {
                                             }));
                 }
             }
+
+            // Verify zero-offset bucket keys after undo recovery.
+            //
+            // Since checkpoint offset is 0, ALL writes to this bucket are 
undone.
+            // The undo writer uses partial update (targetColumns = {id, 
price}) with
+            // OVERWRITE mode, so undo only clears target columns — non-target 
columns
+            // written by other writers or earlier full writes are unaffected.
+            //
+            // - id < 80: These keys were first written by fullWriter in Phase 
1, which
+            //   populated ALL columns (including non-target columns like 
name, stock).
+            //   Undo clears the target column (price) to null, but the row 
still exists
+            //   because non-target columns retain their values.
+            //
+            // - id >= 100: These keys were ONLY written by partialWriter in 
Phase 2,
+            //   which only populated target columns (id, price). Undo clears 
those
+            //   target columns, leaving no column with data, so the row is 
effectively
+            //   deleted.
+            for (int id : zeroOffsetBucketKeysAfterCheckpoint) {
+                final int keyId = id;
+                verifyFutures.add(
+                        lookuper.lookup(row(keyId))
+                                .thenAccept(
+                                        r -> {
+                                            if (keyId < 80) {
+                                                // Key has non-target column 
data from
+                                                // fullWriter (Phase 1), so 
the row survives
+                                                // undo — only target column 
(price) is cleared.
+                                                InternalRow result = 
r.getSingletonRow();
+                                                assertThat(result)
+                                                        .as(
+                                                                "Zero-offset 
bucket key %d should still exist (non-target columns retain data)",
+                                                                keyId)
+                                                        .isNotNull();
+                                                assertThat(result.isNullAt(2))
+                                                        .as(
+                                                                "Zero-offset 
bucket key %d price (target column) should be null after undo",
+                                                                keyId)
+                                                        .isTrue();
+                                            } else {
+                                                // Key was only written via 
partialWriter
+                                                // (target columns only) — 
undo clears all
+                                                // its data, so the row no 
longer exists.
+                                                assertThat(r.getSingletonRow())
+                                                        .as(
+                                                                "Zero-offset 
bucket key %d should not exist (no non-target column data)",
+                                                                keyId)
+                                                        .isNull();
+                                            }
+                                        }));
+            }
+
             CompletableFuture.allOf(verifyFutures.toArray(new 
CompletableFuture[0])).get();
         }
     }

Reply via email to