This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 6bfc45d96072ee3ede1845a8a3e3ee4368a75c8a Author: Yang Wang <[email protected]> AuthorDate: Tue Feb 10 20:13:00 2026 +0800 [kv] Add zero-offset bucket keys verification in undo recovery test (#2623) --- .../writer/undo/UndoRecoveryManagerITCase.java | 51 ++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java index 9e7762c30..3ea6ec289 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java @@ -464,6 +464,57 @@ public class UndoRecoveryManagerITCase { })); } } + + // Verify zero-offset bucket keys after undo recovery. + // + // Since checkpoint offset is 0, ALL writes to this bucket are undone. + // The undo writer uses partial update (targetColumns = {id, price}) with + // OVERWRITE mode, so undo only clears target columns — non-target columns + // written by other writers or earlier full writes are unaffected. + // + // - id < 80: These keys were first written by fullWriter in Phase 1, which + // populated ALL columns (including non-target columns like name, stock). + // Undo clears the target column (price) to null, but the row still exists + // because non-target columns retain their values. + // + // - id >= 100: These keys were ONLY written by partialWriter in Phase 2, + // which only populated target columns (id, price). Undo clears those + // target columns, leaving no column with data, so the row is effectively + // deleted. + for (int id : zeroOffsetBucketKeysAfterCheckpoint) { + final int keyId = id; + verifyFutures.add( + lookuper.lookup(row(keyId)) + .thenAccept( + r -> { + if (keyId < 80) { + // Key has non-target column data from + // fullWriter (Phase 1), so the row survives + // undo — only target column (price) is cleared. + InternalRow result = r.getSingletonRow(); + assertThat(result) + .as( + "Zero-offset bucket key %d should still exist (non-target columns retain data)", + keyId) + .isNotNull(); + assertThat(result.isNullAt(2)) + .as( + "Zero-offset bucket key %d price (target column) should be null after undo", + keyId) + .isTrue(); + } else { + // Key was only written via partialWriter + // (target columns only) — undo clears all + // its data, so the row no longer exists. + assertThat(r.getSingletonRow()) + .as( + "Zero-offset bucket key %d should not exist (no non-target column data)", + keyId) + .isNull(); + } + })); + } + CompletableFuture.allOf(verifyFutures.toArray(new CompletableFuture[0])).get(); } }
