This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 1fd153034db7c87e8b897cee43539615df3c55f8 Author: Yang Wang <[email protected]> AuthorDate: Mon Feb 9 20:05:44 2026 +0800 [kv] Add undo recovery support for aggregation tables (#2545) --- .../fluss/client/table/scanner/log/LogScanner.java | 10 + .../client/table/scanner/log/LogScannerImpl.java | 17 + .../org/apache/fluss/utils/ByteArrayWrapper.java | 65 ++ .../apache/fluss/utils/ByteArrayWrapperTest.java | 99 ++ .../sink/writer/undo/BucketRecoveryContext.java | 137 +++ .../fluss/flink/sink/writer/undo/UndoComputer.java | 150 ++++ .../sink/writer/undo/UndoRecoveryExecutor.java | 287 ++++++ .../sink/writer/undo/UndoRecoveryManager.java | 248 +++++ .../flink/sink/writer/undo/UndoComputerTest.java | 199 ++++ .../sink/writer/undo/UndoRecoveryExecutorTest.java | 234 +++++ .../writer/undo/UndoRecoveryManagerITCase.java | 999 +++++++++++++++++++++ .../apache/fluss/flink/utils/TestLogScanner.java | 159 ++++ .../apache/fluss/flink/utils/TestUpsertWriter.java | 118 +++ 13 files changed, 2722 insertions(+) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java index 04357f598..d255e340a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java @@ -106,6 +106,16 @@ public interface LogScanner extends AutoCloseable { */ void unsubscribe(long partitionId, int bucket); + /** + * Unsubscribe from the given bucket of a non-partitioned table dynamically. + * + * <p>Please use {@link #unsubscribe(long, int)} to unsubscribe a partitioned table. + * + * @param bucket the table bucket to unsubscribe. + * @throws java.lang.IllegalStateException if the table is a partitioned table. + */ + void unsubscribe(int bucket); + /** * Subscribe to the given partitioned table bucket from beginning dynamically. If the table * bucket is already subscribed, the start offset will be updated. diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index ebcae05c5..33181bd7a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -225,6 +225,23 @@ public class LogScannerImpl implements LogScanner { } } + @Override + public void unsubscribe(int bucket) { + if (isPartitionedTable) { + throw new IllegalStateException( + "The table is a partitioned table, please use " + + "\"unsubscribe(long partitionId, int bucket)\" to " + + "unsubscribe a partitioned bucket instead."); + } + acquireAndEnsureOpen(); + try { + TableBucket tableBucket = new TableBucket(tableId, bucket); + this.logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket)); + } finally { + release(); + } + } + @Override public void wakeup() { logFetcher.wakeup(); diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ByteArrayWrapper.java b/fluss-common/src/main/java/org/apache/fluss/utils/ByteArrayWrapper.java new file mode 100644 index 000000000..da547d36b --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/utils/ByteArrayWrapper.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.utils; + +import java.util.Arrays; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * A wrapper for byte[] that provides proper equals() and hashCode() implementations for use as Map + * keys. + * + * <p>The hashCode is pre-computed at construction time for better performance when used in + * hash-based collections. + */ +public final class ByteArrayWrapper { + + private final byte[] data; + private final int hashCode; + + public ByteArrayWrapper(byte[] data) { + this.data = checkNotNull(data, "data cannot be null"); + this.hashCode = Arrays.hashCode(data); + } + + public byte[] getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ByteArrayWrapper)) { + return false; + } + return Arrays.equals(data, ((ByteArrayWrapper) o).data); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + return "ByteArrayWrapper{length=" + data.length + "}"; + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/ByteArrayWrapperTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/ByteArrayWrapperTest.java new file mode 100644 index 000000000..f0f69d112 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/utils/ByteArrayWrapperTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.utils; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link ByteArrayWrapper}. */ +class ByteArrayWrapperTest { + + @Test + void testEqualsAndHashCode() { + byte[] data1 = new byte[] {1, 2, 3}; + byte[] data2 = new byte[] {1, 2, 3}; + byte[] data3 = new byte[] {1, 2, 4}; + + ByteArrayWrapper wrapper1 = new ByteArrayWrapper(data1); + ByteArrayWrapper wrapper2 = new ByteArrayWrapper(data2); + ByteArrayWrapper wrapper3 = new ByteArrayWrapper(data3); + + // Same content should be equal + assertThat(wrapper1).isEqualTo(wrapper2); + assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode()); + + // Different content should not be equal + assertThat(wrapper1).isNotEqualTo(wrapper3); + } + + @Test + void testNullDataThrowsException() { + assertThatThrownBy(() -> new ByteArrayWrapper(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("data cannot be null"); + } + + @Test + void testAsMapKey() { + byte[] key1 = new byte[] {1, 2, 3}; + byte[] key2 = new byte[] {1, 2, 3}; // Same content, different array + byte[] key3 = new byte[] {4, 5, 6}; + + Map<ByteArrayWrapper, String> map = new HashMap<>(); + map.put(new ByteArrayWrapper(key1), "value1"); + + // Should find with same content + assertThat(map.get(new ByteArrayWrapper(key2))).isEqualTo("value1"); + + // Should not find with different content + assertThat(map.get(new ByteArrayWrapper(key3))).isNull(); + + // Should overwrite with same key + map.put(new ByteArrayWrapper(key2), "value2"); + assertThat(map).hasSize(1); + assertThat(map.get(new ByteArrayWrapper(key1))).isEqualTo("value2"); + } + + @Test + void testGetData() { + byte[] data = new byte[] {1, 2, 3}; + ByteArrayWrapper wrapper = new ByteArrayWrapper(data); + + assertThat(wrapper.getData()).isSameAs(data); + } + + @Test + void testEmptyArray() { + ByteArrayWrapper wrapper1 = new ByteArrayWrapper(new byte[0]); + ByteArrayWrapper wrapper2 = new ByteArrayWrapper(new byte[0]); + + assertThat(wrapper1).isEqualTo(wrapper2); + assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode()); + } + + @Test + void testToString() { + ByteArrayWrapper wrapper = new ByteArrayWrapper(new byte[] {1, 2, 3}); + assertThat(wrapper.toString()).contains("length=3"); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java new file mode 100644 index 000000000..47797d2c4 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.writer.undo; + +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.ByteArrayWrapper; + +import java.util.HashSet; +import java.util.Set; + +/** + * Encapsulates the recovery state for a single bucket. + * + * <p>This class tracks: + * + * <ul> + * <li>The bucket being recovered + * <li>The checkpoint offset (start point for reading changelog) + * <li>The log end offset (end point for reading changelog) + * <li>Processed primary keys for deduplication (streaming execution) + * <li>Progress tracking during changelog scanning + * </ul> + */ +public class BucketRecoveryContext { + + private final TableBucket bucket; + private final long checkpointOffset; + private final long logEndOffset; + + private final Set<ByteArrayWrapper> processedKeys; + private long lastProcessedOffset; + private int totalRecordsProcessed; + + public BucketRecoveryContext(TableBucket bucket, long checkpointOffset, long logEndOffset) { + this.bucket = bucket; + this.checkpointOffset = checkpointOffset; + this.logEndOffset = logEndOffset; + this.processedKeys = new HashSet<>(); + this.lastProcessedOffset = checkpointOffset; + this.totalRecordsProcessed = 0; + } + + public TableBucket getBucket() { + return bucket; + } + + public long getCheckpointOffset() { + return checkpointOffset; + } + + public long getLogEndOffset() { + return logEndOffset; + } + + public Set<ByteArrayWrapper> getProcessedKeys() { + return processedKeys; + } + + /** + * Checks if this bucket needs recovery. + * + * @return true if checkpoint offset is less than log end offset + */ + public boolean needsRecovery() { + return checkpointOffset < logEndOffset; + } + + /** + * Checks if changelog scanning is complete for this bucket. + * + * <p>Complete means either: + * + * <ul> + * <li>No recovery is needed (checkpointOffset >= logEndOffset), or + * <li>The last processed offset has reached or passed logEndOffset - 1 (lastProcessedOffset + * >= logEndOffset - 1) + * </ul> + * + * @return true if changelog scanning is complete + */ + public boolean isComplete() { + // If no recovery is needed, we're already complete + if (!needsRecovery()) { + return true; + } + return lastProcessedOffset >= logEndOffset - 1; + } + + /** + * Records that a changelog record has been processed. + * + * @param offset the offset of the processed record + */ + public void recordProcessed(long offset) { + lastProcessedOffset = offset; + totalRecordsProcessed++; + } + + public int getTotalRecordsProcessed() { + return totalRecordsProcessed; + } + + public long getLastProcessedOffset() { + return lastProcessedOffset; + } + + @Override + public String toString() { + return "BucketRecoveryContext{" + + "bucket=" + + bucket + + ", checkpointOffset=" + + checkpointOffset + + ", logEndOffset=" + + logEndOffset + + ", processedKeys=" + + processedKeys.size() + + ", complete=" + + isComplete() + + '}'; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java new file mode 100644 index 000000000..80d9baac8 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.writer.undo; + +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.KeyEncoder; +import org.apache.fluss.utils.ByteArrayWrapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Computes and executes undo operations from changelog records using streaming execution. + * + * <p>This class uses a {@link KeyEncoder} to encode primary keys as byte arrays for efficient + * deduplication, and executes undo operations immediately via {@link UpsertWriter}. The undo logic + * is: + * + * <ul> + * <li>{@code INSERT} → Delete the row (it didn't exist at checkpoint) + * <li>{@code UPDATE_BEFORE} → Restore the old value (this was the state at checkpoint) + * <li>{@code UPDATE_AFTER} → Ignored (UPDATE_BEFORE already handled the undo) + * <li>{@code DELETE} → Re-insert the deleted row (it existed at checkpoint) + * </ul> + * + * <p>For each primary key, only the first change after checkpoint determines the undo action. The + * original row from {@link ScanRecord} is directly used without copying, as each ScanRecord + * contains an independent row instance. + */ +public class UndoComputer { + + private static final Logger LOG = LoggerFactory.getLogger(UndoComputer.class); + + private final KeyEncoder keyEncoder; + private final UpsertWriter writer; + + /** + * Creates an UndoComputer. + * + * @param keyEncoder the key encoder for primary key deduplication + * @param writer the writer for all undo operations + */ + public UndoComputer(KeyEncoder keyEncoder, UpsertWriter writer) { + this.keyEncoder = keyEncoder; + this.writer = writer; + } + + /** + * Processes a changelog record and executes the undo operation immediately. + * + * <p>Only the first change for each primary key triggers an undo operation. + * + * @param record the changelog record + * @param processedKeys set of already processed primary keys for deduplication + * @return CompletableFuture for the async write, or null if skipped + */ + @Nullable + public CompletableFuture<?> processRecord( + ScanRecord record, Set<ByteArrayWrapper> processedKeys) { + // Skip UPDATE_AFTER before key encoding — UPDATE_BEFORE already handled the undo + if (record.getChangeType() == ChangeType.UPDATE_AFTER) { + return null; + } + + byte[] encodedKey = keyEncoder.encodeKey(record.getRow()); + ByteArrayWrapper keyWrapper = new ByteArrayWrapper(encodedKey); + + // Skip if we already processed this key + if (processedKeys.contains(keyWrapper)) { + return null; + } + + CompletableFuture<?> future = computeAndExecuteUndo(record); + if (future != null) { + processedKeys.add(keyWrapper); + } + return future; + } + + /** + * Computes and executes the undo operation for a changelog record. + * + * <p>UPDATE_AFTER is ignored because UPDATE_BEFORE and UPDATE_AFTER always come in pairs, with + * UPDATE_BEFORE appearing first. The undo logic is determined by UPDATE_BEFORE which contains + * the old value needed for restoration. + * + * <p>The original row is directly used without copying because: + * + * <ul> + * <li>Each ScanRecord contains an independent GenericRow instance (created in + * CompletedFetch.toScanRecord) + * <li>For DELETE operations, UpsertWriter.delete() only needs the primary key fields + * <li>For UPSERT operations, the original row contains the exact data to restore + * </ul> + * + * @param record the changelog record + * @return CompletableFuture for the async write, or null if no action needed (e.g., + * UPDATE_AFTER) + */ + @Nullable + private CompletableFuture<?> computeAndExecuteUndo(ScanRecord record) { + ChangeType changeType = record.getChangeType(); + InternalRow row = record.getRow(); + + switch (changeType) { + case INSERT: + // Row was inserted after checkpoint → delete it + return writer.delete(row); + + case UPDATE_BEFORE: + // Row was updated after checkpoint → restore old value + return writer.upsert(row); + + case UPDATE_AFTER: + // Ignored: UPDATE_BEFORE already handled the undo logic for this key. + return null; + + case DELETE: + // Row was deleted after checkpoint → re-insert it + return writer.upsert(row); + + default: + LOG.warn("Unexpected change type for undo: {}", changeType); + return null; + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java new file mode 100644 index 000000000..a1f3f6637 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.writer.undo; + +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.ScanRecords; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.ByteArrayWrapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Executes undo recovery for multiple buckets using streaming execution. + * + * <p>This executor manages: + * + * <ul> + * <li>Single LogScanner with multiple bucket subscriptions + * <li>Changelog reading with immediate undo operation execution + * <li>Async write operations via UpsertWriter + * </ul> + * + * <p>The execution flow (streaming): + * + * <ol> + * <li>Subscribe all buckets to the LogScanner + * <li>Poll changelog records and execute undo operations immediately + * <li>Collect CompletableFutures for completion tracking + * <li>Flush and wait for all futures to ensure all writes complete + * </ol> + */ +public class UndoRecoveryExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(UndoRecoveryExecutor.class); + + /** Fixed poll timeout in milliseconds. */ + private static final long POLL_TIMEOUT_MS = 10_000; + + /** Default maximum total wait time in milliseconds before failing (1 hour). */ + private static final long DEFAULT_MAX_TOTAL_WAIT_TIME_MS = 60 * 60 * 1000; // 1 hour + + /** Interval for logging progress during long waits (5 minutes). */ + private static final long PROGRESS_LOG_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + + private final LogScanner scanner; + private final UpsertWriter writer; + private final UndoComputer undoComputer; + + // Configurable timeout parameter + private final long maxTotalWaitTimeMs; + + public UndoRecoveryExecutor( + LogScanner scanner, UpsertWriter writer, UndoComputer undoComputer) { + this(scanner, writer, undoComputer, DEFAULT_MAX_TOTAL_WAIT_TIME_MS); + } + + /** + * Creates an executor with custom timeout configuration (for testing). + * + * @param scanner the log scanner + * @param writer the upsert writer + * @param undoComputer the undo computer + * @param maxTotalWaitTimeMs maximum total wait time before failing + */ + public UndoRecoveryExecutor( + LogScanner scanner, + UpsertWriter writer, + UndoComputer undoComputer, + long maxTotalWaitTimeMs) { + this.scanner = scanner; + this.writer = writer; + this.undoComputer = undoComputer; + this.maxTotalWaitTimeMs = maxTotalWaitTimeMs; + } + + /** + * Executes undo recovery for the given bucket contexts. + * + * @param contexts the bucket recovery contexts (must have target offsets set) + * @throws Exception if recovery fails + */ + public void execute(List<BucketRecoveryContext> contexts) throws Exception { + // Filter contexts that need recovery + List<BucketRecoveryContext> toRecover = filterContextsNeedingRecovery(contexts); + if (toRecover.isEmpty()) { + LOG.debug("No buckets need recovery after filtering"); + return; + } + + LOG.debug("Executing undo recovery for {} bucket(s)", toRecover.size()); + + // Subscribe and read changelog with streaming execution + subscribeAll(toRecover); + List<CompletableFuture<?>> allFutures = readChangelogAndExecute(toRecover); + + // Flush first, then wait for all async writes to complete + writer.flush(); + if (!allFutures.isEmpty()) { + CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0])).get(); + } + + // Log summary at INFO level for visibility + int totalUndoOps = 0; + for (BucketRecoveryContext ctx : toRecover) { + totalUndoOps += ctx.getProcessedKeys().size(); + } + + LOG.info( + "Undo recovery execution completed: {} bucket(s), {} total undo operation(s)", + toRecover.size(), + totalUndoOps); + } + + private List<BucketRecoveryContext> filterContextsNeedingRecovery( + List<BucketRecoveryContext> contexts) { + List<BucketRecoveryContext> result = new ArrayList<>(); + for (BucketRecoveryContext ctx : contexts) { + if (ctx.needsRecovery()) { + LOG.debug( + "Bucket {} needs recovery: checkpoint={}, logEndOffset={}", + ctx.getBucket(), + ctx.getCheckpointOffset(), + ctx.getLogEndOffset()); + result.add(ctx); + } else { + LOG.debug("Bucket {} already up-to-date, no recovery needed", ctx.getBucket()); + } + } + return result; + } + + private void subscribeAll(List<BucketRecoveryContext> contexts) { + for (BucketRecoveryContext ctx : contexts) { + TableBucket bucket = ctx.getBucket(); + if (bucket.getPartitionId() != null) { + scanner.subscribe( + bucket.getPartitionId(), bucket.getBucket(), ctx.getCheckpointOffset()); + } else { + scanner.subscribe(bucket.getBucket(), ctx.getCheckpointOffset()); + } + } + } + + private List<CompletableFuture<?>> readChangelogAndExecute(List<BucketRecoveryContext> contexts) + throws Exception { + List<CompletableFuture<?>> allFutures = new ArrayList<>(); + + // Early exit if all contexts are already complete (no records to read) + if (allComplete(contexts)) { + LOG.debug("All buckets already complete, no changelog reading needed"); + return allFutures; + } + + long startTimeMs = System.currentTimeMillis(); + long lastProgressLogTime = System.currentTimeMillis(); + Set<TableBucket> unsubscribedBuckets = new HashSet<>(); + + while (!allComplete(contexts)) { + ScanRecords records = scanner.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); + + if (records.isEmpty()) { + // Check if we've exceeded the maximum total wait time + long elapsedMs = System.currentTimeMillis() - startTimeMs; + if (elapsedMs >= maxTotalWaitTimeMs) { + throw new RuntimeException( + String.format( + "Undo recovery timed out: unable to read all changelog records after " + + "%.1f minutes of waiting. %d bucket(s) still incomplete. " + + "The job will restart and retry the recovery.", + elapsedMs / 60000.0, countIncomplete(contexts))); + } + + LOG.debug("Empty poll (total elapsed: {}ms)", elapsedMs); + } else { + // Process records for each bucket + for (BucketRecoveryContext ctx : contexts) { + if (ctx.isComplete()) { + continue; + } + List<ScanRecord> bucketRecords = records.records(ctx.getBucket()); + if (!bucketRecords.isEmpty()) { + processRecords(ctx, bucketRecords, allFutures); + } + // Unsubscribe completed buckets to stop fetching data from them + if (ctx.isComplete() && unsubscribedBuckets.add(ctx.getBucket())) { + unsubscribeBucket(ctx.getBucket()); + } + } + } + + // Log progress periodically (at the end of each loop iteration) + long now = System.currentTimeMillis(); + if (now - lastProgressLogTime >= PROGRESS_LOG_INTERVAL_MS) { + long elapsedMs = now - startTimeMs; + LOG.info( + "Undo recovery waiting for changelog records: {} bucket(s) incomplete, " + + "waited {} minutes so far (max: {} minutes)", + countIncomplete(contexts), + String.format("%.1f", elapsedMs / 60000.0), + String.format("%.1f", maxTotalWaitTimeMs / 60000.0)); + lastProgressLogTime = now; + } + } + + // Log summary + if (LOG.isDebugEnabled()) { + for (BucketRecoveryContext ctx : contexts) { + LOG.debug( + "Bucket {} read {} records, executed {} undo operations", + ctx.getBucket(), + ctx.getTotalRecordsProcessed(), + ctx.getProcessedKeys().size()); + } + } + + return allFutures; + } + + private void unsubscribeBucket(TableBucket bucket) { + if (bucket.getPartitionId() != null) { + scanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket()); + } else { + scanner.unsubscribe(bucket.getBucket()); + } + } + + private void processRecords( + BucketRecoveryContext ctx, + List<ScanRecord> records, + List<CompletableFuture<?>> futures) { + Set<ByteArrayWrapper> processedKeys = ctx.getProcessedKeys(); + for (ScanRecord record : records) { + CompletableFuture<?> future = undoComputer.processRecord(record, processedKeys); + if (future != null) { + futures.add(future); + } + ctx.recordProcessed(record.logOffset()); + if (ctx.isComplete()) { + break; + } + } + } + + private boolean allComplete(List<BucketRecoveryContext> contexts) { + for (BucketRecoveryContext ctx : contexts) { + if (!ctx.isComplete()) { + return false; + } + } + return true; + } + + private int countIncomplete(List<BucketRecoveryContext> contexts) { + int count = 0; + for (BucketRecoveryContext ctx : contexts) { + if (!ctx.isComplete()) { + count++; + } + } + return count; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManager.java new file mode 100644 index 000000000..fa0adc10f --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManager.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.writer.undo; + +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.writer.Upsert; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.row.encode.KeyEncoder; +import org.apache.fluss.rpc.protocol.MergeMode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Manages undo recovery operations during Flink sink writer initialization. + * + * <p>This manager ensures exactly-once semantics by reversing any writes that occurred after the + * last successful checkpoint but before a failure. The recovery process involves: + * + * <ol> + * <li>Reading changelog records from checkpoint offset to current latest offset + * <li>Computing inverse operations for each affected primary key + * <li>Applying undo operations using OVERWRITE mode to restore bucket state + * </ol> + * + * <p><b>Undo Logic:</b> For each primary key, only the first change after checkpoint determines the + * undo action: + * + * <ul> + * <li>{@code INSERT} → Delete the row (it didn't exist at checkpoint) + * <li>{@code UPDATE_BEFORE} → Restore the old value (this was the state at checkpoint) + * <li>{@code UPDATE_AFTER} → Ignored (UPDATE_BEFORE already handled the undo) + * <li>{@code DELETE} → Re-insert the deleted row (it existed at checkpoint) + * </ul> + * + * <p>This class delegates to specialized components: + * + * <ul> + * <li>{@link UndoComputer} - Undo operation computation using {@link KeyEncoder} for primary key + * encoding + * <li>{@link UndoRecoveryExecutor} - Changelog reading and write execution + * </ul> + * + * @see MergeMode#OVERWRITE + */ +public class UndoRecoveryManager { + + private static final Logger LOG = LoggerFactory.getLogger(UndoRecoveryManager.class); + + private final Table table; + @Nullable private final int[] targetColumnIndexes; + + /** + * Creates a new UndoRecoveryManager. + * + * @param table the Fluss table to perform recovery on + * @param targetColumnIndexes optional target columns for partial update (null for full row) + */ + public UndoRecoveryManager(Table table, @Nullable int[] targetColumnIndexes) { + this.table = table; + this.targetColumnIndexes = targetColumnIndexes; + + logPartialUpdateConfig(targetColumnIndexes); + } + + private void logPartialUpdateConfig(@Nullable int[] targetColumnIndexes) { + if (targetColumnIndexes != null) { + LOG.info( + "Undo recovery configured with partial update columns: {}", + Arrays.toString(targetColumnIndexes)); + } + } + + // ==================== Public API ==================== + + /** + * Performs undo recovery for buckets with known undo offsets. + * + * <p>The caller is responsible for providing both the checkpoint offset (start) and the log end + * offset (end) for each bucket. This avoids redundant listOffset calls since the caller + * typically already has this information from determining which buckets need recovery. + * + * @param bucketUndoOffsets map of bucket to its undo offsets (checkpointOffset, logEndOffset) + * @param subtaskIndex the Flink subtask index (for logging) + * @param parallelism the total parallelism (for logging) + * @throws Exception if recovery fails + */ + public void performUndoRecovery( + Map<TableBucket, UndoOffsets> bucketUndoOffsets, int subtaskIndex, int parallelism) + throws Exception { + + if (bucketUndoOffsets.isEmpty()) { + LOG.debug("No buckets to recover on subtask {}/{}", subtaskIndex, parallelism); + return; + } + + LOG.info( + "Starting undo recovery for {} bucket(s) on subtask {}/{}", + bucketUndoOffsets.size(), + subtaskIndex, + parallelism); + + List<BucketRecoveryContext> contexts = buildRecoveryContexts(bucketUndoOffsets); + + try (LogScanner scanner = createLogScanner()) { + Upsert recoveryUpsert = table.newUpsert().mergeMode(MergeMode.OVERWRITE); + if (targetColumnIndexes != null) { + recoveryUpsert = recoveryUpsert.partialUpdate(targetColumnIndexes); + } + UpsertWriter writer = recoveryUpsert.createWriter(); + + // Create UndoComputer with writer for streaming execution + Schema schema = table.getTableInfo().getSchema(); + // Use CompactedKeyEncoder directly instead of the deprecated KeyEncoder.of(), + // which requires a lake format parameter that is not applicable here. + KeyEncoder keyEncoder = + CompactedKeyEncoder.createKeyEncoder( + schema.getRowType(), schema.getPrimaryKey().get().getColumnNames()); + UndoComputer undoComputer = new UndoComputer(keyEncoder, writer); + + UndoRecoveryExecutor executor = new UndoRecoveryExecutor(scanner, writer, undoComputer); + // This is a blocking operation that reads changelog and executes undo operations. + // It may take a significant amount of time depending on the volume of records to + // process. + executor.execute(contexts); + } + + // Calculate total undo operations for summary + int totalUndoOps = 0; + for (BucketRecoveryContext ctx : contexts) { + totalUndoOps += ctx.getProcessedKeys().size(); + } + + LOG.info( + "Completed undo recovery for {} bucket(s) with {} undo operation(s) on subtask {}/{}", + bucketUndoOffsets.size(), + totalUndoOps, + subtaskIndex, + parallelism); + } + + /** + * Encapsulates the offset information needed for undo recovery of a bucket. + * + * <p>This class holds the checkpoint offset (recovery start point) and log end offset (recovery + * end point) for a bucket's undo recovery operation. + */ + public static class UndoOffsets { + private final long checkpointOffset; + private final long logEndOffset; + + /** + * Creates a new UndoOffsets. + * + * @param checkpointOffset the checkpoint offset (start point, inclusive) + * @param logEndOffset the log end offset (end point, exclusive) + * @throws IllegalArgumentException if offsets are negative or checkpointOffset > + * logEndOffset + */ + public UndoOffsets(long checkpointOffset, long logEndOffset) { + if (checkpointOffset < 0) { + throw new IllegalArgumentException( + "checkpointOffset must be non-negative: " + checkpointOffset); + } + if (logEndOffset < 0) { + throw new IllegalArgumentException( + "logEndOffset must be non-negative: " + logEndOffset); + } + if (checkpointOffset > logEndOffset) { + throw new IllegalArgumentException( + String.format( + "checkpointOffset (%d) must not be greater than logEndOffset (%d)", + checkpointOffset, logEndOffset)); + } + // Note: checkpointOffset == logEndOffset is allowed, it simply means no recovery needed + this.checkpointOffset = checkpointOffset; + this.logEndOffset = logEndOffset; + } + + public long getCheckpointOffset() { + return checkpointOffset; + } + + public long getLogEndOffset() { + return logEndOffset; + } + } + + // ==================== Scanner Factory ==================== + + /** + * Creates a LogScanner for reading changelog records. + * + * <p>This method is protected to allow test subclasses to inject custom scanners for fault + * injection testing. + * + * @return a new LogScanner instance + */ + protected LogScanner createLogScanner() { + return table.newScan().createLogScanner(); + } + + // ==================== Recovery Context Building ==================== + + private List<BucketRecoveryContext> buildRecoveryContexts( + Map<TableBucket, UndoOffsets> bucketUndoOffsets) { + + List<BucketRecoveryContext> contexts = new ArrayList<>(); + + for (Map.Entry<TableBucket, UndoOffsets> entry : bucketUndoOffsets.entrySet()) { + TableBucket bucket = entry.getKey(); + UndoOffsets offsets = entry.getValue(); + + BucketRecoveryContext ctx = + new BucketRecoveryContext( + bucket, offsets.getCheckpointOffset(), offsets.getLogEndOffset()); + contexts.add(ctx); + } + + return contexts; + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoComputerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoComputerTest.java new file mode 100644 index 000000000..d41bb84b4 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoComputerTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.writer.undo; + +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.flink.utils.TestUpsertWriter; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.encode.KeyEncoder; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.ByteArrayWrapper; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link UndoComputer}. + * + * <p>Tests verify: (1) ChangeType to undo operation mapping, (2) primary key deduplication. + */ +class UndoComputerTest { + + private static final RowType ROW_TYPE = + RowType.of(DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()); + private static final List<String> PRIMARY_KEY_COLUMNS = Collections.singletonList("f0"); + + private KeyEncoder keyEncoder; + private TestUpsertWriter mockWriter; + private UndoComputer undoComputer; + + @BeforeEach + void setUp() { + keyEncoder = KeyEncoder.of(ROW_TYPE, PRIMARY_KEY_COLUMNS, null); + mockWriter = new TestUpsertWriter(); + undoComputer = new UndoComputer(keyEncoder, mockWriter); + } + + // ==================== Undo Logic Mapping (Parameterized) ==================== + + /** + * Parameterized test for ChangeType to undo operation mapping. + * + * <p>Validates: Requirements 4.3 - Correct undo logic mapping. + * + * <p>Mapping rules: + * + * <ul> + * <li>INSERT → delete (undo insert by deleting) + * <li>UPDATE_BEFORE → upsert (restore old value) + * <li>UPDATE_AFTER → skip (no action needed) + * <li>DELETE → upsert (restore deleted row) + * </ul> + */ + @ParameterizedTest(name = "{0} → expectDelete={1}, expectUpsert={2}, expectNull={3}") + @MethodSource("changeTypeUndoMappingProvider") + void testChangeTypeToUndoMapping( + ChangeType changeType, + boolean expectDelete, + boolean expectUpsert, + boolean expectNullFuture) { + Set<ByteArrayWrapper> processedKeys = new HashSet<>(); + GenericRow testRow = row(1, "test", 100); + ScanRecord record = new ScanRecord(0L, 0L, changeType, testRow); + + CompletableFuture<?> future = undoComputer.processRecord(record, processedKeys); + + if (expectNullFuture) { + assertThat(future).isNull(); + assertThat(processedKeys).isEmpty(); + } else { + assertThat(future).isNotNull(); + assertThat(processedKeys).hasSize(1); + } + assertThat(mockWriter.getDeleteCount()).isEqualTo(expectDelete ? 1 : 0); + assertThat(mockWriter.getUpsertCount()).isEqualTo(expectUpsert ? 1 : 0); + + if (expectDelete) { + assertThat(mockWriter.getLastDeletedRow()).isSameAs(testRow); + } + if (expectUpsert) { + assertThat(mockWriter.getLastUpsertedRow()).isSameAs(testRow); + } + } + + static Stream<Arguments> changeTypeUndoMappingProvider() { + return Stream.of( + // ChangeType, expectDelete, expectUpsert, expectNullFuture + Arguments.of(ChangeType.INSERT, true, false, false), + Arguments.of(ChangeType.UPDATE_BEFORE, false, true, false), + Arguments.of(ChangeType.UPDATE_AFTER, false, false, true), + Arguments.of(ChangeType.DELETE, false, true, false)); + } + + // ==================== Primary Key Deduplication ==================== + + /** + * Test primary key deduplication: only first occurrence of each key is processed. + * + * <p>Validates: Requirements 2.2, 2.3 - Primary key deduplication and key tracking. + */ + @Test + void testPrimaryKeyDeduplication() { + Set<ByteArrayWrapper> processedKeys = new HashSet<>(); + + // First record for key=1 (INSERT → delete) + GenericRow row1 = row(1, "first", 100); + CompletableFuture<?> f1 = + undoComputer.processRecord( + new ScanRecord(0L, 0L, ChangeType.INSERT, row1), processedKeys); + + // Second record for same key=1 (should be skipped) + GenericRow row2 = row(1, "second", 200); + CompletableFuture<?> f2 = + undoComputer.processRecord( + new ScanRecord(1L, 0L, ChangeType.UPDATE_BEFORE, row2), processedKeys); + + // Third record for different key=2 (INSERT → delete) + GenericRow row3 = row(2, "third", 300); + CompletableFuture<?> f3 = + undoComputer.processRecord( + new ScanRecord(2L, 0L, ChangeType.INSERT, row3), processedKeys); + + // Fourth record for key=2 (should be skipped) + GenericRow row4 = row(2, "fourth", 400); + CompletableFuture<?> f4 = + undoComputer.processRecord( + new ScanRecord(3L, 0L, ChangeType.DELETE, row4), processedKeys); + + // Verify: only first occurrence of each key processed + assertThat(f1).isNotNull(); + assertThat(f2).as("Duplicate key=1 should be skipped").isNull(); + assertThat(f3).isNotNull(); + assertThat(f4).as("Duplicate key=2 should be skipped").isNull(); + + // Verify: 2 unique keys tracked + assertThat(processedKeys).hasSize(2); + + // Verify: 2 deletes (one per unique key, both were INSERTs) + assertThat(mockWriter.getDeleteCount()).isEqualTo(2); + assertThat(mockWriter.getUpsertCount()).isEqualTo(0); + + // Verify: correct rows were processed + assertThat(mockWriter.getAllDeletedRows()).containsExactly(row1, row3); + } + + /** + * Test that UPDATE_AFTER records don't add keys to processed set. + * + * <p>This ensures subsequent records for the same key can still be processed. + */ + @Test + void testUpdateAfterDoesNotBlockSubsequentRecords() { + Set<ByteArrayWrapper> processedKeys = new HashSet<>(); + + // UPDATE_AFTER for key=1 (skipped, key NOT added to set) + undoComputer.processRecord( + new ScanRecord(0L, 0L, ChangeType.UPDATE_AFTER, row(1, "after", 100)), + processedKeys); + + // INSERT for same key=1 (should be processed since UPDATE_AFTER didn't add key) + CompletableFuture<?> f = + undoComputer.processRecord( + new ScanRecord(1L, 0L, ChangeType.INSERT, row(1, "insert", 200)), + processedKeys); + + assertThat(f).isNotNull(); + assertThat(mockWriter.getDeleteCount()).isEqualTo(1); + assertThat(processedKeys).hasSize(1); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutorTest.java new file mode 100644 index 000000000..d60c10d9a --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutorTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.writer.undo; + +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.flink.utils.TestLogScanner; +import org.apache.fluss.flink.utils.TestUpsertWriter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.encode.KeyEncoder; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Unit tests for {@link UndoRecoveryExecutor}. + * + * <p>Tests verify: (1) streaming execution with futures, (2) multi-bucket recovery, (3) exception + * propagation. + */ +class UndoRecoveryExecutorTest { + + private static final RowType ROW_TYPE = + RowType.of(DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()); + private static final List<String> PRIMARY_KEY_COLUMNS = Collections.singletonList("f0"); + private static final long TABLE_ID = 1L; + + private KeyEncoder keyEncoder; + private TestUpsertWriter mockWriter; + private TestLogScanner mockScanner; + private UndoComputer undoComputer; + private UndoRecoveryExecutor executor; + + // Short timeout value for testing (total ~300ms instead of 1 hour) + private static final long TEST_MAX_TOTAL_WAIT_TIME_MS = 300; + + @BeforeEach + void setUp() { + keyEncoder = KeyEncoder.of(ROW_TYPE, PRIMARY_KEY_COLUMNS, null); + mockWriter = new TestUpsertWriter(); + mockScanner = new TestLogScanner(); + undoComputer = new UndoComputer(keyEncoder, mockWriter); + // Use short timeout for testing + executor = + new UndoRecoveryExecutor( + mockScanner, mockWriter, undoComputer, TEST_MAX_TOTAL_WAIT_TIME_MS); + } + + /** + * Test multi-bucket recovery with mixed ChangeTypes and key deduplication. + * + * <p>Validates: Requirements 3.3 - All futures complete after execute. + */ + @Test + void testMultiBucketRecoveryWithDeduplication() throws Exception { + TableBucket bucket0 = new TableBucket(TABLE_ID, 0); + TableBucket bucket1 = new TableBucket(TABLE_ID, 1); + + BucketRecoveryContext ctx0 = new BucketRecoveryContext(bucket0, 0L, 4L); + + BucketRecoveryContext ctx1 = new BucketRecoveryContext(bucket1, 0L, 3L); + + // Bucket0: INSERT(key=1), UPDATE_BEFORE(key=1, dup), INSERT(key=2), DELETE(key=3) + mockScanner.setRecordsForBucket( + bucket0, + Arrays.asList( + new ScanRecord(0L, 0L, ChangeType.INSERT, row(1, "a", 100)), + new ScanRecord(1L, 0L, ChangeType.UPDATE_BEFORE, row(1, "b", 200)), + new ScanRecord(2L, 0L, ChangeType.INSERT, row(2, "c", 300)), + new ScanRecord(3L, 0L, ChangeType.DELETE, row(3, "d", 400)))); + + // Bucket1: DELETE(key=10), UPDATE_AFTER(key=11, skip), INSERT(key=12) + mockScanner.setRecordsForBucket( + bucket1, + Arrays.asList( + new ScanRecord(0L, 0L, ChangeType.DELETE, row(10, "e", 500)), + new ScanRecord(1L, 0L, ChangeType.UPDATE_AFTER, row(11, "f", 600)), + new ScanRecord(2L, 0L, ChangeType.INSERT, row(12, "g", 700)))); + + executor.execute(Arrays.asList(ctx0, ctx1)); + + // Bucket0: 3 unique keys (key=1 deduplicated) + // - key=1: INSERT → delete + // - key=2: INSERT → delete + // - key=3: DELETE → upsert + assertThat(ctx0.getProcessedKeys()).hasSize(3); + + // Bucket1: 2 unique keys (UPDATE_AFTER skipped) + // - key=10: DELETE → upsert + // - key=12: INSERT → delete + assertThat(ctx1.getProcessedKeys()).hasSize(2); + + // Total: 3 deletes (key=1,2,12), 2 upserts (key=3,10) + assertThat(mockWriter.getDeleteCount()).isEqualTo(3); + assertThat(mockWriter.getUpsertCount()).isEqualTo(2); + assertThat(mockWriter.isFlushCalled()).isTrue(); + + assertThat(ctx0.isComplete()).isTrue(); + assertThat(ctx1.isComplete()).isTrue(); + } + + /** + * Test that recovery is skipped when checkpoint offset >= target offset. + * + * <p>Validates: No unnecessary work when no recovery needed. + */ + @Test + void testNoRecoveryNeededSkipsExecution() throws Exception { + TableBucket bucket = new TableBucket(TABLE_ID, 0); + + // Checkpoint offset (5) >= log end offset (5) → no recovery needed + BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 5L, 5L); + + executor.execute(Collections.singletonList(ctx)); + + assertThat(mockWriter.getDeleteCount()).isEqualTo(0); + assertThat(mockWriter.getUpsertCount()).isEqualTo(0); + assertThat(mockWriter.isFlushCalled()).isFalse(); + } + + /** + * Test exception propagation from writer failures. + * + * <p>Validates: Requirements 7.1, 7.2 - Exception propagation. + */ + @Test + void testExceptionPropagationFromWriter() { + TableBucket bucket = new TableBucket(TABLE_ID, 0); + + BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 0L, 2L); + + mockScanner.setRecordsForBucket( + bucket, + Arrays.asList( + new ScanRecord(0L, 0L, ChangeType.INSERT, row(1, "a", 100)), + new ScanRecord(1L, 0L, ChangeType.INSERT, row(2, "b", 200)))); + + mockWriter.setShouldFail(true); + + assertThatThrownBy(() -> executor.execute(Collections.singletonList(ctx))) + .hasCauseInstanceOf(RuntimeException.class) + .hasMessageContaining("Simulated write failure"); + } + + /** + * Test that exception is thrown after max total wait time. + * + * <p>Validates: Undo recovery timeout triggers a retryable exception. Note: This test uses a + * mock scanner that always returns empty, so it will hit the timeout quickly in test + * environment. In production, the timeout is 1 hour. + */ + @Test + void testFatalExceptionOnMaxEmptyPolls() { + TableBucket bucket = new TableBucket(TABLE_ID, 0); + + BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 0L, 2L); + + // Configure scanner to always return empty (simulating network issues or server problems) + mockScanner.setAlwaysReturnEmpty(true); + + // The test will timeout based on total wait time, but since we're using a mock + // scanner with no actual delay, it will fail quickly + assertThatThrownBy(() -> executor.execute(Collections.singletonList(ctx))) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Undo recovery timed out") + .hasMessageContaining("minutes of waiting") + .hasMessageContaining("still incomplete") + .hasMessageContaining("restart and retry"); + } + + /** + * Test multi-poll scenario where records are returned in batches. + * + * <p>This tests realistic LogScanner behavior where records are returned incrementally across + * multiple poll() calls, ensuring the executor correctly handles partial results. + */ + @Test + void testMultiPollBatchProcessing() throws Exception { + TableBucket bucket = new TableBucket(TABLE_ID, 0); + + BucketRecoveryContext ctx = new BucketRecoveryContext(bucket, 0L, 6L); + + // Configure 6 records but return only 2 per poll + mockScanner.setRecordsForBucket( + bucket, + Arrays.asList( + new ScanRecord(0L, 0L, ChangeType.INSERT, row(1, "a", 100)), + new ScanRecord(1L, 0L, ChangeType.INSERT, row(2, "b", 200)), + new ScanRecord(2L, 0L, ChangeType.DELETE, row(3, "c", 300)), + new ScanRecord(3L, 0L, ChangeType.UPDATE_BEFORE, row(4, "d", 400)), + new ScanRecord(4L, 0L, ChangeType.UPDATE_AFTER, row(4, "e", 500)), + new ScanRecord(5L, 0L, ChangeType.INSERT, row(5, "f", 600)))); + mockScanner.setBatchSize(2); + + executor.execute(Collections.singletonList(ctx)); + + // Should process all 6 records across 3 polls + // key=1: INSERT → delete + // key=2: INSERT → delete + // key=3: DELETE → upsert + // key=4: UPDATE_BEFORE → upsert (UPDATE_AFTER skipped) + // key=5: INSERT → delete + assertThat(ctx.getProcessedKeys()).hasSize(5); + assertThat(ctx.getTotalRecordsProcessed()).isEqualTo(6); + assertThat(mockWriter.getDeleteCount()).isEqualTo(3); // keys 1, 2, 5 + assertThat(mockWriter.getUpsertCount()).isEqualTo(2); // keys 3, 4 + assertThat(ctx.isComplete()).isTrue(); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java new file mode 100644 index 000000000..9e7762c30 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryManagerITCase.java @@ -0,0 +1,999 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.writer.undo; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.admin.ListOffsetsResult; +import org.apache.fluss.client.admin.OffsetSpec; +import org.apache.fluss.client.lookup.Lookuper; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.ScanRecords; +import org.apache.fluss.client.table.writer.UpsertResult; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.sink.writer.undo.UndoRecoveryManager.UndoOffsets; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.MergeEngineType; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Integration tests for {@link UndoRecoveryManager}. + * + * <p>Tests use complex multi-bucket, multi-key scenarios to ensure only correct implementations + * pass. Each test covers multiple aspects to maximize value while minimizing redundancy. + */ +public class UndoRecoveryManagerITCase { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setClusterConf( + new Configuration() + // not to clean snapshots for test purpose + .set( + ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, + Integer.MAX_VALUE)) + .setNumOfTabletServers(3) + .build(); + + private static final String DEFAULT_DB = "test-flink-db"; + + protected static Connection conn; + protected static Admin admin; + + @BeforeAll + static void beforeAll() { + Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + conn = ConnectionFactory.createConnection(clientConf); + admin = conn.getAdmin(); + } + + @BeforeEach + void beforeEach() throws Exception { + admin.createDatabase(DEFAULT_DB, DatabaseDescriptor.EMPTY, true).get(); + } + + @AfterAll + static void afterAll() throws Exception { + if (admin != null) { + admin.close(); + admin = null; + } + if (conn != null) { + conn.close(); + conn = null; + } + } + + protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor) + throws Exception { + admin.createTable(tablePath, tableDescriptor, true).get(); + return admin.getTableInfo(tablePath).get().getTableId(); + } + + private static final int NUM_BUCKETS = 4; + + private static final Schema TEST_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("price", DataTypes.INT()) + .column("stock", DataTypes.INT()) + .primaryKey("id") + .build(); + + private static final Schema PARTITIONED_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("price", DataTypes.INT()) + .column("pt", DataTypes.STRING()) + .primaryKey("id", "pt") + .build(); + + // ==================== Full Update Tests ==================== + + /** + * Comprehensive test for Full Update mode with 4 buckets and 100+ keys. + * + * <p>Covers: zero-offset recovery, DELETE undo, UPDATE undo with deduplication, mixed ops. + */ + @Test + void testFullUpdateMultiBucketComprehensiveRecovery() throws Exception { + TablePath tablePath = createTestTablePath("full_update_test"); + long tableId = + createTable( + tablePath, + TableDescriptor.builder() + .schema(TEST_SCHEMA) + .distributedBy(NUM_BUCKETS, "id") + .property( + ConfigOptions.TABLE_MERGE_ENGINE, + MergeEngineType.AGGREGATION) + .build()); + + try (Table table = conn.getTable(tablePath)) { + UpsertWriter writer = table.newUpsert().createWriter(); + Lookuper lookuper = table.newLookup().createLookuper(); + BucketTracker tracker = new BucketTracker(NUM_BUCKETS, tableId); + + // Phase 1: Write initial 100 keys (batch async) + List<CompletableFuture<UpsertResult>> writeFutures = new ArrayList<>(); + for (int id = 0; id < 100; id++) { + writeFutures.add(writer.upsert(row(id, "pre_" + id, id * 10, id))); + } + for (int i = 0; i < writeFutures.size(); i++) { + tracker.recordWrite(i, writeFutures.get(i).get()); + } + writer.flush(); + + int zeroOffsetBucket = tracker.selectBucketForZeroOffsetRecovery(); + Map<TableBucket, Long> checkpointOffsets = + tracker.buildCheckpointOffsetsWithZeroOffset(zeroOffsetBucket); + + Map<Integer, Set<Integer>> keysAfterCheckpoint = new HashMap<>(); + for (int i = 0; i < NUM_BUCKETS; i++) { + keysAfterCheckpoint.put(i, new HashSet<>()); + } + + // Phase 2: Operations after checkpoint (batch async) + List<CompletableFuture<UpsertResult>> newKeyFutures = new ArrayList<>(); + for (int id = 200; id < 300; id++) { + newKeyFutures.add(writer.upsert(row(id, "new_" + id, id * 10, id))); + } + writer.flush(); + for (int i = 0; i < newKeyFutures.size(); i++) { + UpsertResult result = newKeyFutures.get(i).get(); + keysAfterCheckpoint.get(result.getBucket().getBucket()).add(200 + i); + } + + for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) { + if (bucket != zeroOffsetBucket && !tracker.keysByBucket.get(bucket).isEmpty()) { + List<Integer> keys = tracker.keysByBucket.get(bucket); + for (int i = 0; i < Math.min(3, keys.size()); i++) { + int id = keys.get(i); + writer.upsert(row(id, "updated_" + id, 9999, 9999)); + keysAfterCheckpoint.get(bucket).add(id); + } + if (keys.size() > 5) { + for (int i = 3; i < Math.min(5, keys.size()); i++) { + int id = keys.get(i); + writer.delete(row(id, "pre_" + id, id * 10, id)); + keysAfterCheckpoint.get(bucket).add(id); + } + } + if (keys.size() > 6) { + int id = keys.get(5); + writer.upsert(row(id, "multi_v1", 111, 111)); + writer.upsert(row(id, "multi_v2", 222, 222)); + writer.upsert(row(id, "multi_v3", 333, 333)); + keysAfterCheckpoint.get(bucket).add(id); + } + } + } + writer.flush(); + + // Phase 3: Perform undo recovery + Map<TableBucket, UndoOffsets> offsetRanges = + buildUndoOffsets(checkpointOffsets, tablePath, admin); + new UndoRecoveryManager(table, null).performUndoRecovery(offsetRanges, 0, 1); + + // Phase 4: Verify results (parallel lookup) + List<CompletableFuture<Void>> verifyFutures = new ArrayList<>(); + + for (int id : tracker.keysByBucket.get(zeroOffsetBucket)) { + final int keyId = id; + verifyFutures.add( + lookuper.lookup(row(keyId)) + .thenAccept( + r -> + assertThat(r.getSingletonRow()) + .as( + "Zero-offset bucket key %d should be deleted", + keyId) + .isNull())); + } + + for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) { + if (bucket == zeroOffsetBucket) { + continue; + } + for (int id : tracker.keysByBucket.get(bucket)) { + final int keyId = id; + final int b = bucket; + verifyFutures.add( + lookuper.lookup(row(keyId)) + .thenAccept( + r -> { + InternalRow result = r.getSingletonRow(); + assertThat(result) + .as( + "Bucket %d key %d should exist", + b, keyId) + .isNotNull(); + assertRowEquals( + result, + keyId, + "pre_" + keyId, + keyId * 10, + keyId); + })); + } + for (int id : keysAfterCheckpoint.get(bucket)) { + if (id >= 200) { + final int keyId = id; + final int b = bucket; + verifyFutures.add( + lookuper.lookup(row(keyId)) + .thenAccept( + r -> + assertThat(r.getSingletonRow()) + .as( + "Bucket %d new key %d should be deleted", + b, keyId) + .isNull())); + } + } + } + CompletableFuture.allOf(verifyFutures.toArray(new CompletableFuture[0])).get(); + } + } + + // ==================== Partial Update Tests ==================== + + /** + * Comprehensive test for Partial Update mode with 4 buckets. + * + * <p>Covers: target columns rollback, non-target columns preserved, other writer's changes + * preserved. + */ + @Test + void testPartialUpdateMultiBucketComprehensiveRecovery() throws Exception { + TablePath tablePath = createTestTablePath("partial_update_test"); + long tableId = + createTable( + tablePath, + TableDescriptor.builder() + .schema(TEST_SCHEMA) + .distributedBy(NUM_BUCKETS, "id") + .property( + ConfigOptions.TABLE_MERGE_ENGINE, + MergeEngineType.AGGREGATION) + .build()); + + int[] targetColumns = new int[] {0, 2}; // id, price + + try (Table table = conn.getTable(tablePath)) { + UpsertWriter fullWriter = table.newUpsert().createWriter(); + Lookuper lookuper = table.newLookup().createLookuper(); + + // Discover bucket assignments efficiently (batch write + delete) + Map<Integer, Integer> keyToBucket = new HashMap<>(); + List<CompletableFuture<UpsertResult>> discoverFutures = new ArrayList<>(); + for (int id = 0; id < 80; id++) { + discoverFutures.add(fullWriter.upsert(row(id, "temp", 0, 0))); + } + for (int i = 0; i < discoverFutures.size(); i++) { + keyToBucket.put(i, discoverFutures.get(i).get().getBucket().getBucket()); + } + fullWriter.flush(); + for (int id = 0; id < 80; id++) { + fullWriter.delete(row(id, "temp", 0, 0)); + } + fullWriter.flush(); + + Map<Integer, List<Integer>> initialKeysByBucket = new HashMap<>(); + for (int i = 0; i < NUM_BUCKETS; i++) { + initialKeysByBucket.put(i, new ArrayList<>()); + } + for (int id = 0; id < 80; id++) { + initialKeysByBucket.get(keyToBucket.get(id)).add(id); + } + int zeroOffsetBucket = + selectBucketForZeroOffsetRecovery(initialKeysByBucket, NUM_BUCKETS); + + // Phase 1: Write initial data (except zeroOffsetBucket) + Map<Integer, List<Integer>> keysByBucket = new HashMap<>(); + Map<Integer, Long> bucketOffsets = new HashMap<>(); + for (int i = 0; i < NUM_BUCKETS; i++) { + keysByBucket.put(i, new ArrayList<>()); + } + + List<CompletableFuture<UpsertResult>> initFutures = new ArrayList<>(); + List<Integer> initIds = new ArrayList<>(); + for (int id = 0; id < 80; id++) { + int bucket = keyToBucket.get(id); + if (bucket != zeroOffsetBucket) { + initFutures.add(fullWriter.upsert(row(id, "name_" + id, id * 10, id * 100))); + initIds.add(id); + } + } + for (int i = 0; i < initFutures.size(); i++) { + UpsertResult result = initFutures.get(i).get(); + int id = initIds.get(i); + int bucket = result.getBucket().getBucket(); + keysByBucket.get(bucket).add(id); + bucketOffsets.put(bucket, result.getLogEndOffset()); + } + fullWriter.flush(); + + Map<TableBucket, Long> checkpointOffsets = new HashMap<>(); + checkpointOffsets.put(new TableBucket(tableId, zeroOffsetBucket), 0L); + for (int i = 0; i < NUM_BUCKETS; i++) { + if (i != zeroOffsetBucket) { + checkpointOffsets.put( + new TableBucket(tableId, i), bucketOffsets.getOrDefault(i, 0L)); + } + } + + Set<Integer> zeroOffsetBucketKeysAfterCheckpoint = new HashSet<>(); + Map<Integer, Set<Integer>> newKeysAfterCheckpointByBucket = new HashMap<>(); + for (int i = 0; i < NUM_BUCKETS; i++) { + newKeysAfterCheckpointByBucket.put(i, new HashSet<>()); + } + + // Phase 2: Partial updates after checkpoint + UpsertWriter partialWriter = + table.newUpsert().partialUpdate(targetColumns).createWriter(); + + for (int id = 0; id < 80; id++) { + if (keyToBucket.get(id) == zeroOffsetBucket) { + partialWriter.upsert(row(id, null, 9999, null)); + zeroOffsetBucketKeysAfterCheckpoint.add(id); + } + } + + List<CompletableFuture<UpsertResult>> newPartialFutures = new ArrayList<>(); + for (int id = 100; id < 130; id++) { + newPartialFutures.add(partialWriter.upsert(row(id, null, id * 10, null))); + } + for (int i = 0; i < newPartialFutures.size(); i++) { + UpsertResult result = newPartialFutures.get(i).get(); + int id = 100 + i; + int bucket = result.getBucket().getBucket(); + if (bucket == zeroOffsetBucket) { + zeroOffsetBucketKeysAfterCheckpoint.add(id); + } else { + newKeysAfterCheckpointByBucket.get(bucket).add(id); + } + } + + for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) { + if (bucket != zeroOffsetBucket) { + List<Integer> keys = keysByBucket.get(bucket); + for (int i = 0; i < Math.min(5, keys.size()); i++) { + partialWriter.upsert(row(keys.get(i), null, 8888, null)); + } + } + } + partialWriter.flush(); + + // Other Writer updates stock (should NOT be undone) + UpsertWriter otherWriter = + table.newUpsert().partialUpdate(new int[] {0, 3}).createWriter(); + for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) { + if (bucket != zeroOffsetBucket) { + for (int id : keysByBucket.get(bucket)) { + otherWriter.upsert(row(id, null, null, 5555)); + } + } + } + otherWriter.flush(); + + // Phase 3: Perform undo recovery + Map<TableBucket, UndoOffsets> offsetRanges = + buildUndoOffsets(checkpointOffsets, tablePath, admin); + new UndoRecoveryManager(table, targetColumns).performUndoRecovery(offsetRanges, 0, 1); + + // Phase 4: Verify results (parallel lookup) + List<CompletableFuture<Void>> verifyFutures = new ArrayList<>(); + + for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) { + if (bucket == zeroOffsetBucket) { + continue; + } + + for (int id : keysByBucket.get(bucket)) { + final int keyId = id; + final int b = bucket; + verifyFutures.add( + lookuper.lookup(row(keyId)) + .thenAccept( + r -> { + InternalRow result = r.getSingletonRow(); + assertThat(result) + .as( + "Bucket %d key %d should exist", + b, keyId) + .isNotNull(); + assertThat(result.getString(1).toString()) + .isEqualTo("name_" + keyId); + assertThat(result.getInt(2)) + .as("price restored") + .isEqualTo(keyId * 10); + assertThat(result.getInt(3)) + .as("stock kept") + .isEqualTo(5555); + })); + } + } + CompletableFuture.allOf(verifyFutures.toArray(new CompletableFuture[0])).get(); + } + } + + // ==================== Partitioned Table Tests ==================== + + /** + * Test undo recovery for partitioned table with multiple partitions. + * + * <p>Covers: multi-partition recovery, INSERT/UPDATE/DELETE undo across partitions. + */ + @Test + void testPartitionedTableRecovery() throws Exception { + TablePath tablePath = createTestTablePath("partitioned_test"); + long tableId = + createTable( + tablePath, + TableDescriptor.builder() + .schema(PARTITIONED_SCHEMA) + .partitionedBy("pt") + .distributedBy(2, "id") + .property( + ConfigOptions.TABLE_MERGE_ENGINE, + MergeEngineType.AGGREGATION) + .build()); + + String partition1 = "p1", partition2 = "p2"; + admin.createPartition(tablePath, newPartitionSpec("pt", partition1), false).get(); + admin.createPartition(tablePath, newPartitionSpec("pt", partition2), false).get(); + + List<PartitionInfo> partitionInfos = admin.listPartitionInfos(tablePath).get(); + Map<String, Long> partitionNameToId = new HashMap<>(); + for (PartitionInfo info : partitionInfos) { + partitionNameToId.put(info.getPartitionName(), info.getPartitionId()); + } + long partition1Id = partitionNameToId.get(partition1); + long partition2Id = partitionNameToId.get(partition2); + + try (Table table = conn.getTable(tablePath)) { + UpsertWriter writer = table.newUpsert().createWriter(); + Lookuper lookuper = table.newLookup().createLookuper(); + Map<TableBucket, Long> checkpointOffsets = new HashMap<>(); + + // Phase 1: Write initial data (batch) + List<CompletableFuture<UpsertResult>> p1Futures = new ArrayList<>(); + List<CompletableFuture<UpsertResult>> p2Futures = new ArrayList<>(); + for (int id = 0; id < 10; id++) { + p1Futures.add(writer.upsert(row(id, "p1_name_" + id, id * 10, partition1))); + p2Futures.add(writer.upsert(row(id, "p2_name_" + id, id * 20, partition2))); + } + for (int i = 0; i < 10; i++) { + UpsertResult r1 = p1Futures.get(i).get(); + checkpointOffsets.put( + new TableBucket(tableId, partition1Id, r1.getBucket().getBucket()), + r1.getLogEndOffset()); + UpsertResult r2 = p2Futures.get(i).get(); + checkpointOffsets.put( + new TableBucket(tableId, partition2Id, r2.getBucket().getBucket()), + r2.getLogEndOffset()); + } + writer.flush(); + + // Phase 2: Operations after checkpoint + Set<Integer> newKeysInP1 = new HashSet<>(), newKeysInP2 = new HashSet<>(); + for (int id = 100; id < 105; id++) { + writer.upsert(row(id, "p1_new_" + id, id * 10, partition1)); + newKeysInP1.add(id); + } + for (int id = 0; id < 3; id++) { + writer.upsert(row(id, "p1_updated_" + id, 9999, partition1)); + } + for (int id = 5; id < 8; id++) { + writer.delete(row(id, "p2_name_" + id, id * 20, partition2)); + } + for (int id = 200; id < 205; id++) { + writer.upsert(row(id, "p2_new_" + id, id * 20, partition2)); + newKeysInP2.add(id); + } + writer.flush(); + + // Phase 3: Perform undo recovery + Map<TableBucket, UndoOffsets> offsetRanges = + buildUndoOffsets(checkpointOffsets, tablePath, admin); + new UndoRecoveryManager(table, null).performUndoRecovery(offsetRanges, 0, 1); + + // Phase 4: Verify results (parallel lookup) + List<CompletableFuture<Void>> verifyFutures = new ArrayList<>(); + + for (int id = 0; id < 10; id++) { + final int keyId = id; + verifyFutures.add( + lookuper.lookup(row(keyId, partition1)) + .thenAccept( + r -> { + InternalRow result = r.getSingletonRow(); + assertThat(result) + .as("P1 key %d should exist", keyId) + .isNotNull(); + assertThat(result.getString(1).toString()) + .isEqualTo("p1_name_" + keyId); + assertThat(result.getInt(2)).isEqualTo(keyId * 10); + })); + verifyFutures.add( + lookuper.lookup(row(keyId, partition2)) + .thenAccept( + r -> { + InternalRow result = r.getSingletonRow(); + assertThat(result) + .as("P2 key %d should exist", keyId) + .isNotNull(); + assertThat(result.getString(1).toString()) + .isEqualTo("p2_name_" + keyId); + assertThat(result.getInt(2)).isEqualTo(keyId * 20); + })); + } + + CompletableFuture.allOf(verifyFutures.toArray(new CompletableFuture[0])).get(); + } + } + + // ==================== Idempotency and Exception Tests ==================== + + /** + * Test that undo recovery is idempotent - multiple executions produce the same result. + * + * <p>Covers: 4 buckets, 200 keys, mixed operations, 2 recovery rounds. + */ + @Test + void testRecoveryIdempotency() throws Exception { + final int numBuckets = 4; + final int keysPerBucket = 50; + + TablePath tablePath = createTestTablePath("idempotency_test"); + long tableId = + createTable( + tablePath, + TableDescriptor.builder() + .schema(TEST_SCHEMA) + .distributedBy(numBuckets, "id") + .property( + ConfigOptions.TABLE_MERGE_ENGINE, + MergeEngineType.AGGREGATION) + .build()); + + try (Table table = conn.getTable(tablePath)) { + UpsertWriter writer = table.newUpsert().createWriter(); + Lookuper lookuper = table.newLookup().createLookuper(); + BucketTracker tracker = new BucketTracker(numBuckets, tableId); + + int totalKeys = numBuckets * keysPerBucket; + + // Batch write initial keys + List<CompletableFuture<UpsertResult>> initFutures = new ArrayList<>(); + for (int id = 0; id < totalKeys; id++) { + initFutures.add(writer.upsert(row(id, "original_" + id, id * 10, id * 100))); + } + for (int i = 0; i < initFutures.size(); i++) { + tracker.recordWrite(i, initFutures.get(i).get()); + } + writer.flush(); + + Map<TableBucket, Long> checkpointOffsets = tracker.buildCheckpointOffsetsAll(); + + // Batch write new keys + Set<Integer> newKeys = new HashSet<>(); + for (int id = totalKeys; id < totalKeys + 50; id++) { + writer.upsert(row(id, "new_" + id, id * 10, id * 100)); + newKeys.add(id); + } + + for (int bucket = 0; bucket < numBuckets; bucket++) { + List<Integer> keys = tracker.keysByBucket.get(bucket); + for (int i = 0; i < Math.min(3, keys.size()); i++) { + writer.upsert(row(keys.get(i), "updated_" + keys.get(i), 9999, 9999)); + } + if (keys.size() > 5) { + int id = keys.get(5); + writer.delete(row(id, "original_" + id, id * 10, id * 100)); + } + } + writer.flush(); + + // Execute recovery 2 times - results should be identical + // For idempotency, we need to update checkpoint offsets after first recovery + // because recovery writes undo operations which advance the log end offset + Map<TableBucket, Long> currentCheckpointOffsets = new HashMap<>(checkpointOffsets); + + for (int round = 1; round <= 2; round++) { + Map<TableBucket, UndoOffsets> offsetRanges = + buildUndoOffsets(currentCheckpointOffsets, tablePath, admin); + new UndoRecoveryManager(table, null).performUndoRecovery(offsetRanges, 0, 1); + + // After first recovery, update checkpoint offsets to current log end offsets + // This simulates what would happen in a real scenario where checkpoint is taken + // after recovery completes + if (round == 1) { + for (Map.Entry<TableBucket, UndoOffsets> entry : offsetRanges.entrySet()) { + // Query the new log end offset after recovery + TableBucket tb = entry.getKey(); + List<Integer> bucketIds = new ArrayList<>(); + bucketIds.add(tb.getBucket()); + ListOffsetsResult offsetsResult = + admin.listOffsets( + tablePath, bucketIds, new OffsetSpec.LatestSpec()); + Long newOffset = offsetsResult.bucketResult(tb.getBucket()).get(); + if (newOffset != null) { + currentCheckpointOffsets.put(tb, newOffset); + } + } + } + + // Batch verify (parallel lookup) + List<CompletableFuture<Void>> verifyFutures = new ArrayList<>(); + final int r = round; + + for (int id : newKeys) { + final int keyId = id; + verifyFutures.add( + lookuper.lookup(row(keyId)) + .thenAccept( + result -> + assertThat(result.getSingletonRow()) + .as( + "Round %d: New key %d should be deleted", + r, keyId) + .isNull())); + } + + for (int bucket = 0; bucket < numBuckets; bucket++) { + for (int id : tracker.keysByBucket.get(bucket)) { + final int keyId = id; + verifyFutures.add( + lookuper.lookup(row(keyId)) + .thenAccept( + lookupResult -> { + InternalRow result = + lookupResult.getSingletonRow(); + assertThat(result) + .as("Round %d: Key %d exists", r, keyId) + .isNotNull(); + assertRowEquals( + result, + keyId, + "original_" + keyId, + keyId * 10, + keyId * 100); + })); + } + } + CompletableFuture.allOf(verifyFutures.toArray(new CompletableFuture[0])).get(); + } + } + } + + /** Test that scanner exceptions are properly propagated. */ + @Test + void testRecoveryWithScannerException() throws Exception { + TablePath tablePath = createTestTablePath("scanner_exception_test"); + long tableId = + createTable( + tablePath, + TableDescriptor.builder() + .schema(TEST_SCHEMA) + .distributedBy(2, "id") + .property( + ConfigOptions.TABLE_MERGE_ENGINE, + MergeEngineType.AGGREGATION) + .build()); + + try (Table table = conn.getTable(tablePath)) { + UpsertWriter writer = table.newUpsert().createWriter(); + Map<TableBucket, Long> checkpointOffsets = new HashMap<>(); + + List<CompletableFuture<UpsertResult>> futures = new ArrayList<>(); + for (int id = 0; id < 10; id++) { + futures.add(writer.upsert(row(id, "name_" + id, id * 10, id))); + } + for (int i = 0; i < futures.size(); i++) { + UpsertResult result = futures.get(i).get(); + checkpointOffsets.put( + new TableBucket(tableId, result.getBucket().getBucket()), + result.getLogEndOffset()); + } + writer.flush(); + + for (int id = 100; id < 110; id++) { + writer.upsert(row(id, "new_" + id, id * 10, id)); + } + writer.flush(); + + final String errorMessage = "Simulated scanner failure for testing"; + FaultInjectingUndoRecoveryManager faultHandler = + new FaultInjectingUndoRecoveryManager(table, null); + faultHandler.setExceptionToThrow(new RuntimeException(errorMessage)); + + Map<TableBucket, UndoOffsets> offsetRanges = + buildUndoOffsets(checkpointOffsets, tablePath, admin); + assertThatThrownBy(() -> faultHandler.performUndoRecovery(offsetRanges, 0, 1)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining(errorMessage); + } + } + + // ==================== Helper Classes ==================== + + /** Tracks bucket assignments and offsets during test setup. */ + private static class BucketTracker { + final int numBuckets; + final long tableId; + final Map<Integer, List<Integer>> keysByBucket = new HashMap<>(); + final Map<Integer, Long> bucketOffsets = new HashMap<>(); + + BucketTracker(int numBuckets, long tableId) { + this.numBuckets = numBuckets; + this.tableId = tableId; + for (int i = 0; i < numBuckets; i++) { + keysByBucket.put(i, new ArrayList<>()); + } + } + + void recordWrite(int key, UpsertResult result) { + int bucketId = result.getBucket().getBucket(); + keysByBucket.get(bucketId).add(key); + bucketOffsets.put(bucketId, result.getLogEndOffset()); + } + + int selectBucketForZeroOffsetRecovery() { + int maxBucket = 0, maxKeys = 0; + for (int i = 0; i < numBuckets; i++) { + if (keysByBucket.get(i).size() > maxKeys) { + maxKeys = keysByBucket.get(i).size(); + maxBucket = i; + } + } + return maxBucket; + } + + Map<TableBucket, Long> buildCheckpointOffsetsWithZeroOffset(int zeroOffsetBucket) { + Map<TableBucket, Long> offsets = new HashMap<>(); + offsets.put(new TableBucket(tableId, zeroOffsetBucket), 0L); + for (int i = 0; i < numBuckets; i++) { + if (i != zeroOffsetBucket) { + offsets.put(new TableBucket(tableId, i), bucketOffsets.getOrDefault(i, 0L)); + } + } + return offsets; + } + + Map<TableBucket, Long> buildCheckpointOffsetsAll() { + Map<TableBucket, Long> offsets = new HashMap<>(); + for (int i = 0; i < numBuckets; i++) { + offsets.put(new TableBucket(tableId, i), bucketOffsets.getOrDefault(i, 0L)); + } + return offsets; + } + } + + private static class FaultInjectingUndoRecoveryManager extends UndoRecoveryManager { + private RuntimeException exceptionToThrow; + + FaultInjectingUndoRecoveryManager(Table table, int[] targetColumnIndexes) { + super(table, targetColumnIndexes); + } + + void setExceptionToThrow(RuntimeException exception) { + this.exceptionToThrow = exception; + } + + @Override + protected LogScanner createLogScanner() { + return new FaultInjectingLogScanner(super.createLogScanner(), exceptionToThrow); + } + } + + private static class FaultInjectingLogScanner implements LogScanner { + private final LogScanner delegate; + private final RuntimeException exceptionToThrow; + + FaultInjectingLogScanner(LogScanner delegate, RuntimeException exceptionToThrow) { + this.delegate = delegate; + this.exceptionToThrow = exceptionToThrow; + } + + @Override + public ScanRecords poll(Duration timeout) { + if (exceptionToThrow != null) { + throw exceptionToThrow; + } + return delegate.poll(timeout); + } + + @Override + public void subscribe(int bucket, long offset) { + delegate.subscribe(bucket, offset); + } + + @Override + public void subscribe(long partitionId, int bucket, long offset) { + delegate.subscribe(partitionId, bucket, offset); + } + + @Override + public void unsubscribe(long partitionId, int bucket) { + delegate.unsubscribe(partitionId, bucket); + } + + @Override + public void unsubscribe(int bucket) { + delegate.unsubscribe(bucket); + } + + @Override + public void wakeup() { + delegate.wakeup(); + } + + @Override + public void close() throws Exception { + delegate.close(); + } + } + + // ==================== Helper Methods ==================== + + private TablePath createTestTablePath(String prefix) { + return TablePath.of(DEFAULT_DB, prefix + "_" + System.currentTimeMillis()); + } + + private PartitionSpec newPartitionSpec(String key, String value) { + return new PartitionSpec(Collections.singletonMap(key, value)); + } + + private int selectBucketForZeroOffsetRecovery( + Map<Integer, List<Integer>> keysByBucket, int numBuckets) { + int maxBucket = 0, maxKeys = 0; + for (int i = 0; i < numBuckets; i++) { + if (keysByBucket.get(i).size() > maxKeys) { + maxKeys = keysByBucket.get(i).size(); + maxBucket = i; + } + } + return maxBucket; + } + + private void assertRowEquals( + InternalRow row, + int expectedId, + String expectedName, + int expectedPrice, + int expectedStock) { + assertThat(row.getInt(0)).as("id").isEqualTo(expectedId); + assertThat(row.getString(1).toString()).as("name").isEqualTo(expectedName); + assertThat(row.getInt(2)).as("price").isEqualTo(expectedPrice); + assertThat(row.getInt(3)).as("stock").isEqualTo(expectedStock); + } + + /** + * Builds offset ranges by querying the latest offset for each bucket. + * + * <p>This simulates what the upper layer would do: it already has checkpoint offsets and needs + * to query latest offsets to determine which buckets need recovery. + */ + private Map<TableBucket, UndoOffsets> buildUndoOffsets( + Map<TableBucket, Long> checkpointOffsets, TablePath tablePath, Admin admin) + throws Exception { + Map<TableBucket, UndoOffsets> result = new HashMap<>(); + + // Group buckets by partition for efficient batch queries + Map<Long, List<TableBucket>> bucketsByPartition = new HashMap<>(); + List<TableBucket> nonPartitionedBuckets = new ArrayList<>(); + + for (TableBucket bucket : checkpointOffsets.keySet()) { + if (bucket.getPartitionId() != null) { + bucketsByPartition + .computeIfAbsent(bucket.getPartitionId(), k -> new ArrayList<>()) + .add(bucket); + } else { + nonPartitionedBuckets.add(bucket); + } + } + + // Query non-partitioned buckets + if (!nonPartitionedBuckets.isEmpty()) { + List<Integer> bucketIds = new ArrayList<>(); + for (TableBucket tb : nonPartitionedBuckets) { + bucketIds.add(tb.getBucket()); + } + ListOffsetsResult offsetsResult = + admin.listOffsets(tablePath, bucketIds, new OffsetSpec.LatestSpec()); + for (TableBucket tb : nonPartitionedBuckets) { + Long latestOffset = offsetsResult.bucketResult(tb.getBucket()).get(); + if (latestOffset == null) { + latestOffset = 0L; + } + result.put(tb, new UndoOffsets(checkpointOffsets.get(tb), latestOffset)); + } + } + + // Query partitioned buckets + if (!bucketsByPartition.isEmpty()) { + // Get partition name mapping + List<PartitionInfo> partitions = admin.listPartitionInfos(tablePath).get(); + Map<Long, String> partitionIdToName = new HashMap<>(); + for (PartitionInfo info : partitions) { + partitionIdToName.put(info.getPartitionId(), info.getPartitionName()); + } + + for (Map.Entry<Long, List<TableBucket>> entry : bucketsByPartition.entrySet()) { + Long partitionId = entry.getKey(); + List<TableBucket> buckets = entry.getValue(); + String partitionName = partitionIdToName.get(partitionId); + + List<Integer> bucketIds = new ArrayList<>(); + for (TableBucket tb : buckets) { + bucketIds.add(tb.getBucket()); + } + + ListOffsetsResult offsetsResult = + admin.listOffsets( + tablePath, partitionName, bucketIds, new OffsetSpec.LatestSpec()); + for (TableBucket tb : buckets) { + Long latestOffset = offsetsResult.bucketResult(tb.getBucket()).get(); + if (latestOffset == null) { + latestOffset = 0L; + } + result.put(tb, new UndoOffsets(checkpointOffsets.get(tb), latestOffset)); + } + } + } + + return result; + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestLogScanner.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestLogScanner.java new file mode 100644 index 000000000..44e9a6239 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestLogScanner.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.ScanRecords; +import org.apache.fluss.metadata.TableBucket; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Test implementation of {@link LogScanner} for testing. + * + * <p>Allows pre-configuring records per bucket and returns them on poll(). Supports: + * + * <ul> + * <li>Simulating empty polls for testing exponential backoff behavior + * <li>Batch size control for realistic multi-poll scenarios + * <li>Always-empty mode for testing fatal exception on max empty polls + * </ul> + */ +public class TestLogScanner implements LogScanner { + private final Map<TableBucket, List<ScanRecord>> recordsByBucket = new HashMap<>(); + private final Map<TableBucket, AtomicInteger> pollIndexByBucket = new HashMap<>(); + + /** Number of empty polls to return before returning actual records. */ + private int emptyPollsBeforeData = 0; + + private int currentEmptyPollCount = 0; + + /** If true, always return empty records (for testing fatal exception on max empty polls). */ + private boolean alwaysReturnEmpty = false; + + /** Maximum number of records to return per bucket per poll (0 = unlimited). */ + private int batchSize = 0; + + public void setRecordsForBucket(TableBucket bucket, List<ScanRecord> records) { + recordsByBucket.put(bucket, new ArrayList<>(records)); + pollIndexByBucket.put(bucket, new AtomicInteger(0)); + } + + /** + * Sets the number of empty polls to return before returning actual records. + * + * @param count number of empty polls + */ + public void setEmptyPollsBeforeData(int count) { + this.emptyPollsBeforeData = count; + this.currentEmptyPollCount = 0; + } + + /** + * Sets whether to always return empty records (for testing fatal exception). + * + * @param alwaysEmpty if true, poll() always returns empty + */ + public void setAlwaysReturnEmpty(boolean alwaysEmpty) { + this.alwaysReturnEmpty = alwaysEmpty; + } + + /** + * Sets the maximum number of records to return per bucket per poll. + * + * <p>This simulates realistic behavior where LogScanner returns records in batches. Use this to + * test multi-poll scenarios and ensure the executor handles partial results correctly. + * + * @param batchSize max records per bucket per poll (0 = unlimited, returns all remaining) + */ + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + @Override + public ScanRecords poll(Duration timeout) { + // If configured to always return empty, do so + if (alwaysReturnEmpty) { + return ScanRecords.EMPTY; + } + + // Return empty polls if configured + if (currentEmptyPollCount < emptyPollsBeforeData) { + currentEmptyPollCount++; + return ScanRecords.EMPTY; + } + + Map<TableBucket, List<ScanRecord>> result = new HashMap<>(); + + for (Map.Entry<TableBucket, List<ScanRecord>> entry : recordsByBucket.entrySet()) { + TableBucket bucket = entry.getKey(); + List<ScanRecord> allRecords = entry.getValue(); + AtomicInteger index = pollIndexByBucket.get(bucket); + + if (index.get() < allRecords.size()) { + int startIndex = index.get(); + int endIndex; + if (batchSize > 0) { + // Return at most batchSize records + endIndex = Math.min(startIndex + batchSize, allRecords.size()); + } else { + // Return all remaining records + endIndex = allRecords.size(); + } + List<ScanRecord> batch = allRecords.subList(startIndex, endIndex); + result.put(bucket, new ArrayList<>(batch)); + index.set(endIndex); + } + } + + return result.isEmpty() ? ScanRecords.EMPTY : new ScanRecords(result); + } + + @Override + public void subscribe(int bucket, long offset) {} + + @Override + public void subscribe(long partitionId, int bucket, long offset) {} + + @Override + public void unsubscribe(long partitionId, int bucket) {} + + @Override + public void unsubscribe(int bucket) {} + + @Override + public void wakeup() {} + + @Override + public void close() {} + + public void reset() { + recordsByBucket.clear(); + pollIndexByBucket.clear(); + emptyPollsBeforeData = 0; + currentEmptyPollCount = 0; + alwaysReturnEmpty = false; + batchSize = 0; + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestUpsertWriter.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestUpsertWriter.java new file mode 100644 index 000000000..fceaa027f --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/TestUpsertWriter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.client.table.writer.DeleteResult; +import org.apache.fluss.client.table.writer.UpsertResult; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.row.InternalRow; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Test implementation of {@link UpsertWriter} for testing. + * + * <p>Tracks all upsert/delete operations and supports failure injection. + */ +public class TestUpsertWriter implements UpsertWriter { + private int upsertCount = 0; + private int deleteCount = 0; + private boolean flushCalled = false; + private boolean shouldFail = false; + private InternalRow lastUpsertedRow; + private InternalRow lastDeletedRow; + private final List<InternalRow> allUpsertedRows = new ArrayList<>(); + private final List<InternalRow> allDeletedRows = new ArrayList<>(); + + @Override + public CompletableFuture<UpsertResult> upsert(InternalRow record) { + if (shouldFail) { + CompletableFuture<UpsertResult> future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException("Simulated write failure")); + return future; + } + upsertCount++; + lastUpsertedRow = record; + allUpsertedRows.add(record); + return CompletableFuture.completedFuture(new UpsertResult(new TableBucket(1L, 0), 0L)); + } + + @Override + public CompletableFuture<DeleteResult> delete(InternalRow record) { + if (shouldFail) { + CompletableFuture<DeleteResult> future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException("Simulated write failure")); + return future; + } + deleteCount++; + lastDeletedRow = record; + allDeletedRows.add(record); + return CompletableFuture.completedFuture(new DeleteResult(new TableBucket(1L, 0), 0L)); + } + + @Override + public void flush() { + flushCalled = true; + } + + public int getUpsertCount() { + return upsertCount; + } + + public int getDeleteCount() { + return deleteCount; + } + + public boolean isFlushCalled() { + return flushCalled; + } + + public InternalRow getLastUpsertedRow() { + return lastUpsertedRow; + } + + public InternalRow getLastDeletedRow() { + return lastDeletedRow; + } + + public List<InternalRow> getAllUpsertedRows() { + return allUpsertedRows; + } + + public List<InternalRow> getAllDeletedRows() { + return allDeletedRows; + } + + public void setShouldFail(boolean shouldFail) { + this.shouldFail = shouldFail; + } + + public void reset() { + upsertCount = 0; + deleteCount = 0; + flushCalled = false; + shouldFail = false; + lastUpsertedRow = null; + lastDeletedRow = null; + allUpsertedRows.clear(); + allDeletedRows.clear(); + } +}
