This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 4bdc5d61d2cfa0cdac0c2cabc035260b116a0029 Author: Jark Wu <[email protected]> AuthorDate: Wed Feb 11 17:09:44 2026 +0800 [kv] Introduce TabletState to support persistence of auto-inc buffer and row count (#2651) --- .../fluss/client/table/FlussTableITCase.java | 49 ++++- .../apache/fluss/server/kv/KvRecoverHelper.java | 242 ++++++++++++++++++++- .../java/org/apache/fluss/server/kv/KvTablet.java | 42 +++- .../fluss/server/kv/autoinc/AutoIncIDRange.java | 84 +++++++ .../server/kv/autoinc/AutoIncrementManager.java | 28 ++- .../autoinc/BoundedSegmentSequenceGenerator.java | 21 +- .../fluss/server/kv/autoinc/SequenceGenerator.java | 14 ++ .../kv/autoinc/ZkSequenceGeneratorFactory.java | 33 +-- .../fluss/server/kv/prewrite/KvPreWriteBuffer.java | 8 +- .../server/kv/snapshot/CompletedSnapshot.java | 52 ++++- .../kv/snapshot/CompletedSnapshotJsonSerde.java | 60 ++++- .../server/kv/snapshot/KvTabletSnapshotTarget.java | 26 ++- .../kv/snapshot/RocksIncrementalSnapshot.java | 12 +- .../fluss/server/kv/snapshot/SnapshotResult.java | 12 +- .../fluss/server/kv/snapshot/SnapshotRunner.java | 4 +- .../fluss/server/kv/snapshot/TabletState.java | 74 +++++++ .../org/apache/fluss/server/replica/Replica.java | 28 ++- .../kv/autoinc/SegmentSequenceGeneratorTest.java | 38 ++++ .../autoinc/TestingSequenceGeneratorFactory.java | 1 + .../snapshot/CompletedSnapshotJsonSerdeTest.java | 12 +- .../kv/snapshot/KvTabletSnapshotTargetTest.java | 13 +- .../kv/snapshot/PeriodicSnapshotManagerTest.java | 4 +- .../kv/snapshot/RocksIncrementalSnapshotTest.java | 6 +- .../apache/fluss/server/testutils/KvTestUtils.java | 4 +- 24 files changed, 788 insertions(+), 79 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index 1fbd78ff7..8fd0c138d 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -498,7 +498,13 @@ class FlussTableITCase extends ClientToServerITCaseBase { .primaryKey("col1") .build(); TableDescriptor tableDescriptor = - TableDescriptor.builder().schema(schema).distributedBy(1, "col1").build(); + TableDescriptor.builder() + .schema(schema) + .distributedBy(1, "col1") + // set a small cache size to test the auto-increment buffer rollover and + // recovery logic in case of server failure and schema change. + .property(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE, 5L) + .build(); // create the table TablePath tablePath = TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc"); @@ -547,11 +553,15 @@ class FlussTableITCase extends ClientToServerITCaseBase { }; verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema); + // trigger snapshot to make sure the auto-inc buffer is snapshotted + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath); + // schema change case2: update new data with new schema. Object[][] recordsWithNewSchema = { {"a", null, "batch3", 10}, - {"b", null, "batch3", 11} + {"b", null, "batch3", 11}, + {"f", null, "batch3", 12} }; partialUpdateRecords( new String[] {"col1", "col3", "col4"}, recordsWithNewSchema, newSchemaTable); @@ -567,7 +577,8 @@ class FlussTableITCase extends ClientToServerITCaseBase { {"b", 2L, "batch3", 11}, {"c", 3L, "batch1", null}, {"d", 4L, "batch2", null}, - {"e", 5L, "batch2", null} + {"e", 5L, "batch2", null}, + {"f", 6L, "batch3", 12} }; verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema); @@ -582,14 +593,42 @@ class FlussTableITCase extends ClientToServerITCaseBase { newSchemaTable = conn.getTable(tablePath); verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema); - Object[][] restartWriteRecords = {{"f", null, "batch4", 12}}; + Object[][] restartWriteRecords = {{"g", null, "batch4", 13}}; partialUpdateRecords( new String[] {"col1", "col3", "col4"}, restartWriteRecords, newSchemaTable); // The auto-increment column should start from a new segment for now, and local cached // IDs have been discarded. - Object[][] expectedRestartWriteRecords = {{"f", 100001L, "batch4", 12}}; + Object[][] expectedRestartWriteRecords = {{"g", 7L, "batch4", 13}}; verifyRecords(expectedRestartWriteRecords, newSchemaTable, newSchema); + + // trigger snapshot to make sure the auto-inc buffer is snapshotted again + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath); + + Object[][] writeRecordsAgain = { + {"h", null, "batch5", 14}, + {"i", null, "batch5", 15}, + {"j", null, "batch5", 16} + }; + partialUpdateRecords( + new String[] {"col1", "col3", "col4"}, writeRecordsAgain, newSchemaTable); + + // kill and restart all tablet server, the recovered id range will hit 10 cache limit + for (int i = 0; i < 3; i++) { + FLUSS_CLUSTER_EXTENSION.stopTabletServer(i); + FLUSS_CLUSTER_EXTENSION.startTabletServer(i); + } + + Object[][] finalWriteRecords = {{"k", null, "batch6", 17}}; + partialUpdateRecords( + new String[] {"col1", "col3", "col4"}, finalWriteRecords, newSchemaTable); + Object[][] expectedFinalRecords = { + {"h", 8L, "batch5", 14}, + {"i", 9L, "batch5", 15}, + {"j", 10L, "batch5", 16}, + {"k", 11L, "batch6", 17} + }; + verifyRecords(expectedFinalRecords, newSchemaTable, newSchema); } @Test diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java index 437eb49fd..5d32e413e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java @@ -19,8 +19,8 @@ package org.apache.fluss.server.kv; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaGetter; -import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.ChangeType; @@ -35,24 +35,34 @@ import org.apache.fluss.row.encode.KeyEncoder; import org.apache.fluss.row.encode.RowEncoder; import org.apache.fluss.row.encode.ValueEncoder; import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; import org.apache.fluss.server.log.FetchIsolation; import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeRoot; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.function.ThrowingConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; +import java.util.List; + import static org.apache.fluss.server.TabletManagerBase.getTableInfo; /** A helper for recovering Kv from log. */ public class KvRecoverHelper { + private static final Logger LOG = LoggerFactory.getLogger(KvRecoverHelper.class); private final KvTablet kvTablet; private final LogTablet logTablet; private final long recoverPointOffset; + @Nullable private final Long recoverPointRowCount; + @Nullable private final AutoIncIDRange autoIncRange; private final KvRecoverContext recoverContext; private final KvFormat kvFormat; private final LogFormat logFormat; @@ -71,6 +81,8 @@ public class KvRecoverHelper { KvTablet kvTablet, LogTablet logTablet, long recoverPointOffset, + @Nullable Long recoverPointRowCount, + @Nullable AutoIncIDRange autoIncRange, KvRecoverContext recoverContext, KvFormat kvFormat, LogFormat logFormat, @@ -78,6 +90,8 @@ public class KvRecoverHelper { this.kvTablet = kvTablet; this.logTablet = logTablet; this.recoverPointOffset = recoverPointOffset; + this.recoverPointRowCount = recoverPointRowCount; + this.autoIncRange = autoIncRange; this.recoverContext = recoverContext; this.kvFormat = kvFormat; this.logFormat = logFormat; @@ -97,7 +111,18 @@ public class KvRecoverHelper { initSchema(schemaGetter.getLatestSchemaInfo().getSchemaId()); + Schema schema = schemaGetter.getLatestSchemaInfo().getSchema(); + long nextLogOffset = recoverPointOffset; + RowCountUpdater rowCountUpdater = + recoverPointRowCount != null + ? new RowCountUpdaterImpl(recoverPointRowCount) + : new NoOpRowCountUpdater(); + AutoIncIDRangeUpdater autoIncIdRangeUpdater = + autoIncRange != null + ? new AutoIncIDRangeUpdaterImpl( + schema, autoIncRange, kvTablet.getAutoIncrementCacheSize()) + : new NoOpAutoIncIDRangeUpdater(); // read to high watermark try (KvBatchWriter kvBatchWriter = kvTablet.createKvBatchWriter()) { ThrowingConsumer<KeyValueAndLogOffset, Exception> resumeRecordApplier = @@ -111,22 +136,59 @@ public class KvRecoverHelper { nextLogOffset = readLogRecordsAndApply( - nextLogOffset, FetchIsolation.HIGH_WATERMARK, resumeRecordApplier); + nextLogOffset, + rowCountUpdater, + autoIncIdRangeUpdater, + FetchIsolation.HIGH_WATERMARK, + resumeRecordApplier); } // the all data up to nextLogOffset has been flush into kv kvTablet.setFlushedLogOffset(nextLogOffset); + // if we have valid row count (means this table supports row count), + // update the row count in kv tablet + if (recoverPointRowCount != null) { + kvTablet.setRowCount(rowCountUpdater.getRowCount()); + LOG.info( + "Updated row count to {} for table '{}' after recovering from log", + rowCountUpdater.getRowCount(), + kvTablet.getTablePath()); + } else { + LOG.info( + "Skipping row count update after recovering from log, because this table '{}' doesn't support row count.", + kvTablet.getTablePath()); + } // read to log end offset ThrowingConsumer<KeyValueAndLogOffset, Exception> resumeRecordApplier = (resumeRecord) -> kvTablet.putToPreWriteBuffer( resumeRecord.key, resumeRecord.value, resumeRecord.logOffset); - readLogRecordsAndApply(nextLogOffset, FetchIsolation.LOG_END, resumeRecordApplier); + readLogRecordsAndApply( + nextLogOffset, + // records in pre-write-buffer shouldn't affect the row count, the high-watermark + // of pre-write-buffer records will update the row-count async + new NoOpRowCountUpdater(), + // we should update auto-inc-id for pre-write-buffer, because id has been used. + autoIncIdRangeUpdater, + FetchIsolation.LOG_END, + resumeRecordApplier); + + if (autoIncRange != null) { + AutoIncIDRange newRange = autoIncIdRangeUpdater.getNewRange(); + kvTablet.updateAutoIncrementIDRange(newRange); + LOG.info( + "Updated auto inc id range to [{}, {}] for table '{}' after recovering from log", + newRange.getStart(), + newRange.getEnd(), + kvTablet.getTablePath()); + } } private long readLogRecordsAndApply( long startFetchOffset, + RowCountUpdater rowCountUpdater, + AutoIncIDRangeUpdater autoIncIdRangeUpdater, FetchIsolation fetchIsolation, ThrowingConsumer<KeyValueAndLogOffset, Exception> resumeRecordConsumer) throws Exception { @@ -151,14 +213,17 @@ public class KvRecoverHelper { logRecordBatch.records(readContext)) { while (logRecordIter.hasNext()) { LogRecord logRecord = logRecordIter.next(); - if (logRecord.getChangeType() != ChangeType.UPDATE_BEFORE) { + ChangeType changeType = logRecord.getChangeType(); + rowCountUpdater.applyChange(changeType); + + if (changeType != ChangeType.UPDATE_BEFORE) { InternalRow logRow = logRecord.getRow(); byte[] key = keyEncoder.encodeKey(logRow); byte[] value = null; - if (logRecord.getChangeType() != ChangeType.DELETE) { + if (changeType != ChangeType.DELETE) { // the log row format may not compatible with kv row format, // e.g, arrow vs. compacted, thus needs a conversion here. - BinaryRow row = toKvRow(logRecord.getRow()); + BinaryRow row = toKvRow(logRow); value = ValueEncoder.encodeValue( currentSchemaId.shortValue(), row); @@ -166,6 +231,10 @@ public class KvRecoverHelper { resumeRecordConsumer.accept( new KeyValueAndLogOffset( key, value, logRecord.logOffset())); + + // reuse the logRow instance which is usually a CompactedRow which + // has been deserialized during toKvRow(..) + autoIncIdRangeUpdater.applyRecord(changeType, logRow); } } } @@ -247,20 +316,169 @@ public class KvRecoverHelper { public static class KvRecoverContext { private final TablePath tablePath; - private final TableBucket tableBucket; private final ZooKeeperClient zkClient; private final int maxFetchLogSizeInRecoverKv; public KvRecoverContext( - TablePath tablePath, - TableBucket tableBucket, - ZooKeeperClient zkClient, - int maxFetchLogSizeInRecoverKv) { + TablePath tablePath, ZooKeeperClient zkClient, int maxFetchLogSizeInRecoverKv) { this.tablePath = tablePath; - this.tableBucket = tableBucket; this.zkClient = zkClient; this.maxFetchLogSizeInRecoverKv = maxFetchLogSizeInRecoverKv; } } + + // ------------------------------------------------------------------------------------------ + // Below are some helpers for recovering tablet state from log + // ------------------------------------------------------------------------------------------ + + /** A helper to update the latest row count during recovering from log. */ + private interface RowCountUpdater { + + /** Apply the change to the row count according to the change type. */ + void applyChange(ChangeType changeType); + + /** Get the latest row count. Returns -1 if this table doesn't support row count. */ + long getRowCount(); + } + + /** + * A simple implementation of {@link RowCountUpdater} which maintains a row count by applying + * the change type of each log record. + */ + private static class RowCountUpdaterImpl implements RowCountUpdater { + private long rowCount; + + public RowCountUpdaterImpl(long initialRowCount) { + this.rowCount = initialRowCount; + } + + @Override + public void applyChange(ChangeType changeType) { + if (changeType == ChangeType.INSERT || changeType == ChangeType.UPDATE_AFTER) { + rowCount++; + } else if (changeType == ChangeType.DELETE || changeType == ChangeType.UPDATE_BEFORE) { + rowCount--; + } + } + + @Override + public long getRowCount() { + return rowCount; + } + } + + /** + * A no-op implementation of {@link RowCountUpdater} for the table which doesn't support row + * count. + */ + private static class NoOpRowCountUpdater implements RowCountUpdater { + + @Override + public void applyChange(ChangeType changeType) { + // do nothing + } + + @Override + public long getRowCount() { + return -1; + } + } + + /** A helper to update the auto inc id range during recovering from log. */ + private interface AutoIncIDRangeUpdater { + /** + * Apply the change to the auto inc id range according to the change type and log record. + */ + void applyRecord(ChangeType changeType, InternalRow changelog); + + /** + * Get a new auto inc id range for the next pre-write buffer. Returns null if this table + * doesn't support auto inc id. + */ + AutoIncIDRange getNewRange(); + } + + /** + * A simple implementation of {@link AutoIncIDRangeUpdater} which maintains a auto inc id range + * by applying the change type and log record of each log record. It assumes the auto inc column + * is always increasing when new record inserted, and the auto inc column's value won't be + * updated once it's inserted. + */ + private static class AutoIncIDRangeUpdaterImpl implements AutoIncIDRangeUpdater { + private final long autoIncrementCacheSize; + private final boolean isLongType; + private final int columnIndex; + private final int columnId; + private final long end; + + private long current; + + private AutoIncIDRangeUpdaterImpl( + Schema schema, AutoIncIDRange autoIncRange, long autoIncrementCacheSize) { + this.autoIncrementCacheSize = autoIncrementCacheSize; + this.end = autoIncRange.getEnd(); + this.columnId = autoIncRange.getColumnId(); + this.columnIndex = autoIncColumnIndex(schema, autoIncRange.getColumnId()); + this.isLongType = + schema.getColumns().get(columnIndex).getDataType().is(DataTypeRoot.BIGINT); + // initialize current to be one less than the start of the range + this.current = autoIncRange.getStart() - 1; + } + + private int autoIncColumnIndex(Schema schema, int columnId) { + List<Schema.Column> columns = schema.getColumns(); + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getColumnId() == columnId) { + return i; + } + } + throw new IllegalArgumentException( + "Auto inc column with column id " + columnId + " not found in schema"); + } + + @Override + public void applyRecord(ChangeType changeType, InternalRow changelog) { + // only INSERT records will have new sequence id, and it must be not null + if (changeType == ChangeType.INSERT) { + if (isLongType) { + current = changelog.getLong(columnIndex); + } else { + // currently, we only support long and int type for auto inc column, and they + // must be not null + current = changelog.getInt(columnIndex); + } + } + } + + @Override + public AutoIncIDRange getNewRange() { + long newStart = current + 1; + // id range is starting from [1, end], so if newStart is greater than end, it means we + // have exhausted the id range, and the kv tablet has allocate a new range after the + // snapshot, so we calculate the new range based on the current new start + long newEnd = end; + while (newStart > newEnd) { + newEnd = newEnd + autoIncrementCacheSize; + } + return new AutoIncIDRange(columnId, newStart, newEnd); + } + } + + /** + * A no-op implementation of {@link AutoIncIDRangeUpdater} for the table which doesn't support + * auto inc column. + */ + private static class NoOpAutoIncIDRangeUpdater implements AutoIncIDRangeUpdater { + + @Override + public void applyRecord(ChangeType changeType, InternalRow changelog) { + // do nothing + } + + @Override + public AutoIncIDRange getNewRange() { + return null; + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 8d279555a..ca89fe829 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -46,6 +46,7 @@ import org.apache.fluss.row.arrow.ArrowWriterPool; import org.apache.fluss.row.arrow.ArrowWriterProvider; import org.apache.fluss.row.encode.ValueDecoder; import org.apache.fluss.rpc.protocol.MergeMode; +import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer; @@ -59,6 +60,7 @@ import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath; import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader; import org.apache.fluss.server.kv.snapshot.RocksIncrementalSnapshot; +import org.apache.fluss.server.kv.snapshot.TabletState; import org.apache.fluss.server.kv.wal.ArrowWalBuilder; import org.apache.fluss.server.kv.wal.CompactedWalBuilder; import org.apache.fluss.server.kv.wal.IndexWalBuilder; @@ -96,6 +98,7 @@ import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; @ThreadSafe public final class KvTablet { private static final Logger LOG = LoggerFactory.getLogger(KvTablet.class); + private static final long ROW_COUNT_DISABLED = -1; private final PhysicalTablePath physicalPath; private final TableBucket tableBucket; @@ -136,6 +139,8 @@ public final class KvTablet { */ private volatile long flushedLogOffset = 0; + private volatile long rowCount; + @GuardedBy("kvLock") private volatile boolean isClosed = false; @@ -178,6 +183,8 @@ public final class KvTablet { this.changelogImage = changelogImage; this.rocksDBStatistics = rocksDBStatistics; this.autoIncrementManager = autoIncrementManager; + // disable row count for WAL image mode. + this.rowCount = changelogImage == ChangelogImage.WAL ? ROW_COUNT_DISABLED : 0L; } public static KvTablet create( @@ -253,6 +260,14 @@ public final class KvTablet { return physicalPath.getTablePath(); } + public long getAutoIncrementCacheSize() { + return autoIncrementManager.getAutoIncrementCacheSize(); + } + + public void updateAutoIncrementIDRange(AutoIncIDRange newRange) { + autoIncrementManager.updateIDRange(newRange); + } + @Nullable public String getPartitionName() { return physicalPath.getPartitionName(); @@ -276,8 +291,24 @@ public final class KvTablet { this.flushedLogOffset = flushedLogOffset; } - public long getFlushedLogOffset() { - return flushedLogOffset; + void setRowCount(long rowCount) { + this.rowCount = rowCount; + } + + /** + * Get the current state of the tablet, including the log offset, row count and auto-increment + * ID range. This is used for snapshot and recovery to capture the state of the tablet at a + * specific log offset. + * + * <p>Note: this method must be called under the kvLock to ensure the consistency between the + * returned state and the log offset. + */ + @GuardedBy("kvLock") + public TabletState getTabletState() { + return new TabletState( + flushedLogOffset, + rowCount == ROW_COUNT_DISABLED ? null : rowCount, + autoIncrementManager.getCurrentIDRanges()); } /** @@ -632,8 +663,13 @@ public final class KvTablet { tableBucket); } else { try { - kvPreWriteBuffer.flush(exclusiveUpToLogOffset); + int rowCountDiff = kvPreWriteBuffer.flush(exclusiveUpToLogOffset); flushedLogOffset = exclusiveUpToLogOffset; + if (rowCount != ROW_COUNT_DISABLED) { + // row count is enabled, we update the row count after flush. + long currentRowCount = rowCount; + rowCount = currentRowCount + rowCountDiff; + } } catch (Throwable t) { fatalErrorHandler.onFatalError( new KvStorageException("Failed to flush kv pre-write buffer.")); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncIDRange.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncIDRange.java new file mode 100644 index 000000000..3c1297c67 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncIDRange.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import java.util.Objects; + +/** + * Represents a range of IDs allocated for auto-increment purposes. This class encapsulates the + * auto-increment column id, the start and end of the ID range. The range is [start, end]. There is + * possible that the start > end which means the range is empty. + */ +public class AutoIncIDRange { + + private final int columnId; + private final long start; + private final long end; + + public AutoIncIDRange(int columnId, long start, long end) { + this.columnId = columnId; + this.start = start; + this.end = end; + } + + /** + * Returns the column ID of the auto-increment column that associated with this auto-increment + * ID range. + */ + public int getColumnId() { + return columnId; + } + + /** Returns the starting ID of the range (inclusive). */ + public long getStart() { + return start; + } + + /** Returns the ending ID of the range (inclusive). */ + public long getEnd() { + return end; + } + + /** Checks if the ID range is empty (i.e., start is greater than end). */ + public boolean isEmpty() { + return start > end; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + AutoIncIDRange autoIncIdRange = (AutoIncIDRange) o; + return columnId == autoIncIdRange.getColumnId() + && start == autoIncIdRange.start + && end == autoIncIdRange.end; + } + + @Override + public int hashCode() { + return Objects.hash(columnId, start, end); + } + + @Override + public String toString() { + return "IDRange{" + "columnId=" + columnId + ", start=" + start + ", end=" + end + '}'; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java index fca383c8f..8a69e1f1b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java @@ -27,9 +27,11 @@ import org.apache.fluss.metadata.TablePath; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.time.Duration; +import java.util.Collections; import java.util.List; import static org.apache.fluss.utils.Preconditions.checkState; @@ -43,6 +45,7 @@ public class AutoIncrementManager { // No-op implementation that returns the input unchanged. public static final AutoIncrementUpdater NO_OP_UPDATER = rowValue -> rowValue; + private final long autoIncrementCacheSize; private final SchemaGetter schemaGetter; private final Cache<Integer, AutoIncrementUpdater> autoIncrementUpdaterCache; private final int autoIncrementColumnId; @@ -54,6 +57,7 @@ public class AutoIncrementManager { TableConfig tableConf, SequenceGeneratorFactory seqGeneratorFactory) { this.schemaGetter = schemaGetter; + this.autoIncrementCacheSize = tableConf.getAutoIncrementCacheSize(); int schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId(); Schema schema = schemaGetter.getSchema(schemaId); List<String> autoIncrementColumnNames = schema.getAutoIncrementColumnNames(); @@ -68,7 +72,7 @@ public class AutoIncrementManager { autoIncrementColumnId = autoIncrementColumn.getColumnId(); sequenceGenerator = seqGeneratorFactory.createSequenceGenerator( - tablePath, autoIncrementColumn, tableConf.getAutoIncrementCacheSize()); + tablePath, autoIncrementColumn, autoIncrementCacheSize); autoIncrementUpdaterCache = Caffeine.newBuilder() .maximumSize(5) @@ -93,6 +97,28 @@ public class AutoIncrementManager { } } + @Nullable + public List<AutoIncIDRange> getCurrentIDRanges() { + if (autoIncrementColumnId == -1) { + return null; + } else { + return Collections.singletonList(sequenceGenerator.currentSequenceRange()); + } + } + + public void updateIDRange(AutoIncIDRange newRange) { + if (autoIncrementColumnId != -1) { + sequenceGenerator.updateSequenceRange(newRange); + } else { + throw new IllegalStateException( + "Cannot update ID range for a table without auto-increment column."); + } + } + + public long getAutoIncrementCacheSize() { + return autoIncrementCacheSize; + } + private AutoIncrementUpdater createAutoIncrementUpdater(KvFormat kvFormat, int schemaId) { Schema schema = schemaGetter.getSchema(schemaId); int[] autoIncrementColumnIds = schema.getAutoIncrementColumnIds(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java index 44afe717e..1198f6b85 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java @@ -36,6 +36,7 @@ public class BoundedSegmentSequenceGenerator implements SequenceGenerator { private final SequenceIDCounter sequenceIDCounter; private final TablePath tablePath; + private final int columnId; private final String columnName; private final long cacheSize; private final long maxAllowedValue; @@ -44,16 +45,18 @@ public class BoundedSegmentSequenceGenerator implements SequenceGenerator { public BoundedSegmentSequenceGenerator( TablePath tablePath, + int columnId, String columnName, SequenceIDCounter sequenceIDCounter, long idCacheSize, long maxAllowedValue) { this.cacheSize = idCacheSize; + this.columnId = columnId; this.columnName = columnName; this.tablePath = tablePath; this.sequenceIDCounter = sequenceIDCounter; - this.segment = IdSegment.EMPTY; this.maxAllowedValue = maxAllowedValue; + this.segment = IdSegment.EMPTY; } private void fetchSegment() { @@ -93,6 +96,22 @@ public class BoundedSegmentSequenceGenerator implements SequenceGenerator { return id; } + @Override + public AutoIncIDRange currentSequenceRange() { + return new AutoIncIDRange(columnId, segment.current, segment.end); + } + + @Override + public void updateSequenceRange(AutoIncIDRange newRange) { + if (newRange.getColumnId() != columnId) { + throw new IllegalArgumentException( + String.format( + "Column ID mismatch when updating sequence range. Expected column ID: %d, but got: %d.", + columnId, newRange.getColumnId())); + } + this.segment = new IdSegment(newRange.getStart(), newRange.getEnd()); + } + private static class IdSegment { private static final IdSegment EMPTY = new IdSegment(0, -1); final long end; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java index d448ffa4c..648479376 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java @@ -29,4 +29,18 @@ public interface SequenceGenerator { * @return the next sequential value of the auto-increment column */ long nextVal(); + + /** + * Returns the current upper bound of the sequence range that has been allocated. This is useful + * for monitoring the current state of the sequence generator. + */ + AutoIncIDRange currentSequenceRange(); + + /** + * Updates the sequence range with a new range. This method is typically called when the kv + * tablet is restored from snapshot. + * + * @param newRange the new sequence range to be set + */ + void updateSequenceRange(AutoIncIDRange newRange); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java index 1842f47f6..998893aaf 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java @@ -21,6 +21,7 @@ package org.apache.fluss.server.kv.autoinc; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.SequenceIDCounter; import org.apache.fluss.server.zk.ZkSequenceIDCounter; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.ZkData; @@ -38,24 +39,30 @@ public class ZkSequenceGeneratorFactory implements SequenceGeneratorFactory { @Override public SequenceGenerator createSequenceGenerator( TablePath tablePath, Schema.Column autoIncrementColumn, long idCacheSize) { - DataTypeRoot typeRoot = autoIncrementColumn.getDataType().getTypeRoot(); - final long maxAllowedValue; + return new BoundedSegmentSequenceGenerator( + tablePath, + autoIncrementColumn.getColumnId(), + autoIncrementColumn.getName(), + createSequenceIDCounter(tablePath, autoIncrementColumn.getColumnId()), + idCacheSize, + checkMaxAllowedValue(autoIncrementColumn)); + } + + private SequenceIDCounter createSequenceIDCounter(TablePath tablePath, int columnId) { + return new ZkSequenceIDCounter( + zkClient.getCuratorClient(), + ZkData.AutoIncrementColumnZNode.path(tablePath, columnId)); + } + + private static long checkMaxAllowedValue(Schema.Column column) { + DataTypeRoot typeRoot = column.getDataType().getTypeRoot(); if (typeRoot == DataTypeRoot.INTEGER) { - maxAllowedValue = Integer.MAX_VALUE; + return Integer.MAX_VALUE; } else if (typeRoot == DataTypeRoot.BIGINT) { - maxAllowedValue = Long.MAX_VALUE; + return Long.MAX_VALUE; } else { throw new IllegalArgumentException( "Auto-increment column must be of type INTEGER or BIGINT"); } - return new BoundedSegmentSequenceGenerator( - tablePath, - autoIncrementColumn.getName(), - new ZkSequenceIDCounter( - zkClient.getCuratorClient(), - ZkData.AutoIncrementColumnZNode.path( - tablePath, autoIncrementColumn.getColumnId())), - idCacheSize, - maxAllowedValue); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java index d040967f3..5908ba4b1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java @@ -200,8 +200,10 @@ public class KvPreWriteBuffer implements AutoCloseable { * * @param exclusiveUpToLogSequenceNumber the exclusive upper bound of the log sequence number to * be flushed + * @return the row count difference of the kv entries in the buffer after flushing. */ - public void flush(long exclusiveUpToLogSequenceNumber) throws IOException { + public int flush(long exclusiveUpToLogSequenceNumber) throws IOException { + int rowCountDiff = 0; int flushedCount = 0; for (Iterator<KvEntry> it = allKvEntries.iterator(); it.hasNext(); ) { KvEntry entry = it.next(); @@ -218,9 +220,11 @@ public class KvPreWriteBuffer implements AutoCloseable { Value value = entry.getValue(); if (value.value != null) { flushedCount += 1; + rowCountDiff += 1; kvBatchWriter.put(entry.getKey().key, value.value); } else { flushedCount += 1; + rowCountDiff -= 1; kvBatchWriter.delete(entry.getKey().key); } @@ -233,6 +237,8 @@ public class KvPreWriteBuffer implements AutoCloseable { if (flushedCount > 0) { kvBatchWriter.flush(); } + + return rowCountDiff; } @VisibleForTesting diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java index b58e1728c..86cd25858 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java @@ -21,12 +21,15 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; import org.apache.fluss.utils.concurrent.FutureUtils; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -68,6 +71,18 @@ public class CompletedSnapshot { /** The next log offset when the snapshot is triggered. */ private final long logOffset; + /** + * The row count of the snapshot, null for legacy tables that doesn't support row count + * statistics. + */ + @Nullable private final Long rowCount; + + /** + * The auto-increment ID ranges of the snapshot, null for legacy tables that doesn't support or + * doesn't have auto-increment columns. + */ + @Nullable private final List<AutoIncIDRange> autoIncIDRanges; + /** The location where the snapshot is stored. */ private final FsPath snapshotLocation; @@ -78,12 +93,16 @@ public class CompletedSnapshot { long snapshotID, FsPath snapshotLocation, KvSnapshotHandle kvSnapshotHandle, - long logOffset) { + long logOffset, + @Nullable Long rowCount, + @Nullable List<AutoIncIDRange> autoIncIDRanges) { this.tableBucket = tableBucket; this.snapshotID = snapshotID; this.snapshotLocation = snapshotLocation; this.kvSnapshotHandle = kvSnapshotHandle; this.logOffset = logOffset; + this.rowCount = rowCount; + this.autoIncIDRanges = autoIncIDRanges; } @VisibleForTesting @@ -92,7 +111,7 @@ public class CompletedSnapshot { long snapshotID, FsPath snapshotLocation, KvSnapshotHandle kvSnapshotHandle) { - this(tableBucket, snapshotID, snapshotLocation, kvSnapshotHandle, 0); + this(tableBucket, snapshotID, snapshotLocation, kvSnapshotHandle, 0, null, null); } public long getSnapshotID() { @@ -111,6 +130,24 @@ public class CompletedSnapshot { return logOffset; } + @Nullable + public Long getRowCount() { + return rowCount; + } + + @Nullable + public List<AutoIncIDRange> getAutoIncIDRanges() { + return autoIncIDRanges; + } + + @Nullable + public AutoIncIDRange getFirstAutoIncIDRange() { + if (autoIncIDRanges == null || autoIncIDRanges.isEmpty()) { + return null; + } + return autoIncIDRanges.get(0); + } + public long getSnapshotSize() { return kvSnapshotHandle.getSnapshotSize(); } @@ -188,11 +225,20 @@ public class CompletedSnapshot { && logOffset == that.logOffset && Objects.equals(tableBucket, that.tableBucket) && Objects.equals(kvSnapshotHandle, that.kvSnapshotHandle) + && Objects.equals(rowCount, that.rowCount) + && Objects.equals(autoIncIDRanges, that.autoIncIDRanges) && Objects.equals(snapshotLocation, that.snapshotLocation); } @Override public int hashCode() { - return Objects.hash(tableBucket, snapshotID, kvSnapshotHandle, logOffset, snapshotLocation); + return Objects.hash( + tableBucket, + snapshotID, + kvSnapshotHandle, + logOffset, + rowCount, + autoIncIDRanges, + snapshotLocation); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java index 6b79966da..99a7c3d23 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java @@ -19,6 +19,7 @@ package org.apache.fluss.server.kv.snapshot; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.fluss.utils.json.JsonDeserializer; @@ -55,8 +56,17 @@ public class CompletedSnapshotJsonSerde private static final String KV_FILE_LOCAL_PATH = "local_path"; private static final String SNAPSHOT_INCREMENTAL_SIZE = "snapshot_incremental_size"; - // -- for the next log offset when the snapshot is triggered; + // --------------------------------------------------------------------------------- + // kv tablet state for the snapshot + // --------------------------------------------------------------------------------- + + // for the next log offset when the snapshot is triggered; private static final String LOG_OFFSET = "log_offset"; + private static final String ROW_COUNT = "row_count"; + private static final String AUTO_INC_ID_RANGE = "auto_inc_id_range"; + private static final String AUTO_INC_COLUMN_ID = "column_id"; + private static final String AUTO_INC_ID_START = "start"; + private static final String AUTO_INC_ID_END = "end"; @Override public void serialize(CompletedSnapshot completedSnapshot, JsonGenerator generator) @@ -103,6 +113,29 @@ public class CompletedSnapshotJsonSerde // serialize log offset generator.writeNumberField(LOG_OFFSET, completedSnapshot.getLogOffset()); + // ROW_COUNT and AUTO_INC_ID_RANGE are added in v0.9, but they are nullable and optional, so + // we don't bump JSON version here to guarantee the RPC protocol compatibility between + // TabletServer and CoordinatorServer. See CoordinatorGateway#commitKvSnapshot RPC. + + // serialize row count if exists + if (completedSnapshot.getRowCount() != null) { + generator.writeNumberField(ROW_COUNT, completedSnapshot.getRowCount()); + } + + // serialize auto-increment id range for each auto-increment column + if (completedSnapshot.getAutoIncIDRanges() != null + && !completedSnapshot.getAutoIncIDRanges().isEmpty()) { + generator.writeArrayFieldStart(AUTO_INC_ID_RANGE); + for (AutoIncIDRange autoIncIDRange : completedSnapshot.getAutoIncIDRanges()) { + generator.writeStartObject(); + generator.writeNumberField(AUTO_INC_COLUMN_ID, autoIncIDRange.getColumnId()); + generator.writeNumberField(AUTO_INC_ID_START, autoIncIDRange.getStart()); + generator.writeNumberField(AUTO_INC_ID_END, autoIncIDRange.getEnd()); + generator.writeEndObject(); + } + generator.writeEndArray(); + } + generator.writeEndObject(); } @@ -161,8 +194,31 @@ public class CompletedSnapshotJsonSerde // construct CompletedSnapshot KvSnapshotHandle kvSnapshotHandle = new KvSnapshotHandle(sharedFileHandles, privateFileHandles, incrementalSize); + + Long rowCount = null; + if (node.has(ROW_COUNT)) { + rowCount = node.get(ROW_COUNT).asLong(); + } + + List<AutoIncIDRange> autoIncIDRanges = null; + if (node.has(AUTO_INC_ID_RANGE)) { + autoIncIDRanges = new ArrayList<>(); + for (JsonNode autoIncIDRangeNode : node.get(AUTO_INC_ID_RANGE)) { + int columnId = autoIncIDRangeNode.get(AUTO_INC_COLUMN_ID).asInt(); + long start = autoIncIDRangeNode.get(AUTO_INC_ID_START).asLong(); + long end = autoIncIDRangeNode.get(AUTO_INC_ID_END).asLong(); + autoIncIDRanges.add(new AutoIncIDRange(columnId, start, end)); + } + } + return new CompletedSnapshot( - tableBucket, snapshotId, new FsPath(snapshotLocation), kvSnapshotHandle, logOffset); + tableBucket, + snapshotId, + new FsPath(snapshotLocation), + kvSnapshotHandle, + logOffset, + rowCount, + autoIncIDRanges); } private List<KvFileHandleAndLocalPath> deserializeKvFileHandles( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java index e61a4999a..569d75c6e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java @@ -67,7 +67,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT private final int snapshotWriteBufferSize; private final FileSystem remoteFileSystem; private final SequenceIDCounter snapshotIdCounter; - private final Supplier<Long> logOffsetSupplier; + private final Supplier<TabletState> tabletStateSupplier; private final Consumer<Long> updateMinRetainOffset; private final Supplier<Integer> bucketLeaderEpochSupplier; private final Supplier<Integer> coordinatorEpochSupplier; @@ -94,7 +94,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT Executor ioExecutor, CloseableRegistry cancelStreamRegistry, SequenceIDCounter snapshotIdCounter, - Supplier<Long> logOffsetSupplier, + Supplier<TabletState> tabletStateSupplier, Consumer<Long> updateMinRetainOffset, Supplier<Integer> bucketLeaderEpochSupplier, Supplier<Integer> coordinatorEpochSupplier, @@ -111,7 +111,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT ioExecutor, cancelStreamRegistry, snapshotIdCounter, - logOffsetSupplier, + tabletStateSupplier, updateMinRetainOffset, bucketLeaderEpochSupplier, coordinatorEpochSupplier, @@ -129,7 +129,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT Executor ioExecutor, CloseableRegistry cancelStreamRegistry, SequenceIDCounter snapshotIdCounter, - Supplier<Long> logOffsetSupplier, + Supplier<TabletState> tabletStateSupplier, Consumer<Long> updateMinRetainOffset, Supplier<Integer> bucketLeaderEpochSupplier, Supplier<Integer> coordinatorEpochSupplier, @@ -145,7 +145,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT this.snapshotWriteBufferSize = snapshotWriteBufferSize; this.remoteFileSystem = remoteKvTabletDir.getFileSystem(); this.snapshotIdCounter = snapshotIdCounter; - this.logOffsetSupplier = logOffsetSupplier; + this.tabletStateSupplier = tabletStateSupplier; this.updateMinRetainOffset = updateMinRetainOffset; this.bucketLeaderEpochSupplier = bucketLeaderEpochSupplier; this.coordinatorEpochSupplier = coordinatorEpochSupplier; @@ -168,7 +168,8 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT @Override public Optional<PeriodicSnapshotManager.SnapshotRunnable> initSnapshot() throws Exception { - long logOffset = logOffsetSupplier.get(); + TabletState tabletState = tabletStateSupplier.get(); + long logOffset = tabletState.getFlushedLogOffset(); if (logOffset <= logOffsetOfLatestSnapshot) { LOG.debug( "The current offset for the log whose kv data is flushed is {}, " @@ -187,7 +188,8 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT try { PeriodicSnapshotManager.SnapshotRunnable snapshotRunnable = new PeriodicSnapshotManager.SnapshotRunnable( - snapshotRunner.snapshot(currentSnapshotId, logOffset, snapshotLocation), + snapshotRunner.snapshot( + currentSnapshotId, tabletState, snapshotLocation), currentSnapshotId, coordinatorEpoch, bucketLeaderEpoch, @@ -220,13 +222,16 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT SnapshotLocation snapshotLocation, SnapshotResult snapshotResult) throws Throwable { + TabletState tabletState = snapshotResult.getTabletState(); CompletedSnapshot completedSnapshot = new CompletedSnapshot( tableBucket, snapshotId, snapshotResult.getSnapshotPath(), snapshotResult.getKvSnapshotHandle(), - snapshotResult.getLogOffset()); + tabletState.getFlushedLogOffset(), + tabletState.getRowCount(), + tabletState.getAutoIncIDRanges()); try { // commit the completed snapshot completedKvSnapshotCommitter.commitKvSnapshot( @@ -270,12 +275,13 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT * minimum offset to retain. */ private void updateStateOnCommitSuccess(long snapshotId, SnapshotResult snapshotResult) { + long flushedLogOffset = snapshotResult.getTabletState().getFlushedLogOffset(); // notify the snapshot complete rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId); - logOffsetOfLatestSnapshot = snapshotResult.getLogOffset(); + logOffsetOfLatestSnapshot = flushedLogOffset; snapshotSize = snapshotResult.getSnapshotSize(); // update LogTablet to notify the lowest offset that should be retained - updateMinRetainOffset.accept(snapshotResult.getLogOffset()); + updateMinRetainOffset.accept(flushedLogOffset); } /** diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java index fbfa42e1f..3ae7318f6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java @@ -98,11 +98,11 @@ public class RocksIncrementalSnapshot implements AutoCloseable { public SnapshotResultSupplier asyncSnapshot( NativeRocksDBSnapshotResources snapshotResources, long snapshotId, - long logOffset, + TabletState tabletState, @Nonnull SnapshotLocation snapshotLocation) { return new RocksDBIncrementalSnapshotOperation( snapshotId, - logOffset, + tabletState, snapshotLocation, snapshotResources.previousSnapshot, snapshotResources.snapshotDirectory); @@ -185,7 +185,7 @@ public class RocksIncrementalSnapshot implements AutoCloseable { private final File localSnapshotDirectory; private final long snapshotId; - private final long logOffset; + private final TabletState tabletState; /** The target snapshot location and factory that creates the output streams to DFS. */ @Nonnull private final SnapshotLocation snapshotLocation; @@ -194,12 +194,12 @@ public class RocksIncrementalSnapshot implements AutoCloseable { public RocksDBIncrementalSnapshotOperation( long snapshotId, - long logOffset, + TabletState tabletState, @Nonnull SnapshotLocation snapshotLocation, PreviousSnapshot previousSnapshot, File localSnapshotDirectory) { this.snapshotId = snapshotId; - this.logOffset = logOffset; + this.tabletState = tabletState; this.snapshotLocation = snapshotLocation; this.previousSnapshot = previousSnapshot; this.localSnapshotDirectory = localSnapshotDirectory; @@ -223,7 +223,7 @@ public class RocksIncrementalSnapshot implements AutoCloseable { final KvSnapshotHandle kvSnapshotHandle = new KvSnapshotHandle(sstFiles, miscFiles, snapshotIncrementalSize); return new SnapshotResult( - kvSnapshotHandle, snapshotLocation.getSnapshotDirectory(), logOffset); + kvSnapshotHandle, snapshotLocation.getSnapshotDirectory(), tabletState); } finally { if (!completed) { cleanupIncompleteSnapshot(tmpResourcesRegistry); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotResult.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotResult.java index 13d1e75aa..b8bdafddb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotResult.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotResult.java @@ -33,12 +33,14 @@ public class SnapshotResult implements Serializable { private final KvSnapshotHandle kvSnapshotHandle; private final FsPath snapshotPath; - private final long logOffset; - public SnapshotResult(KvSnapshotHandle kvSnapshotHandle, FsPath snapshotPath, long logOffset) { + private final TabletState tabletState; + + public SnapshotResult( + KvSnapshotHandle kvSnapshotHandle, FsPath snapshotPath, TabletState tabletState) { this.kvSnapshotHandle = kvSnapshotHandle; this.snapshotPath = snapshotPath; - this.logOffset = logOffset; + this.tabletState = tabletState; } public KvSnapshotHandle getKvSnapshotHandle() { @@ -49,8 +51,8 @@ public class SnapshotResult implements Serializable { return snapshotPath; } - public long getLogOffset() { - return logOffset; + public TabletState getTabletState() { + return tabletState; } public long getSnapshotSize() { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotRunner.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotRunner.java index 82a8cb291..830dd944a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotRunner.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotRunner.java @@ -50,7 +50,7 @@ public class SnapshotRunner { } public RunnableFuture<SnapshotResult> snapshot( - long snapshotId, long logOffset, @Nonnull SnapshotLocation snapshotLocation) + long snapshotId, TabletState tabletState, @Nonnull SnapshotLocation snapshotLocation) throws Exception { long startTime = System.currentTimeMillis(); RocksIncrementalSnapshot.NativeRocksDBSnapshotResources snapshotResources = @@ -59,7 +59,7 @@ public class SnapshotRunner { SnapshotResultSupplier asyncSnapshot = rocksIncrementalSnapshot.asyncSnapshot( - snapshotResources, snapshotId, logOffset, snapshotLocation); + snapshotResources, snapshotId, tabletState, snapshotLocation); return new AsyncSnapshotCallable<SnapshotResult>() { @Override diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/TabletState.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/TabletState.java new file mode 100644 index 000000000..a5f19cee7 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/TabletState.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.server.kv.snapshot; + +import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * TabletState represents the state of a kv tablet at a certain log offset. It contains the flushed + * log offset, the row count of the tablet at that log offset, and the auto-increment ID ranges of + * the tablet at that log offset. The row count and auto-increment ID ranges are optional, and may + * be null if the information is not available or not needed for a particular use case. + */ +public class TabletState { + + private final long flushedLogOffset; + @Nullable private final Long rowCount; + @Nullable private final List<AutoIncIDRange> autoIncIDRanges; + + public TabletState( + long flushedLogOffset, + @Nullable Long rowCount, + @Nullable List<AutoIncIDRange> autoIncIDRanges) { + this.flushedLogOffset = flushedLogOffset; + this.rowCount = rowCount; + this.autoIncIDRanges = autoIncIDRanges; + } + + public long getFlushedLogOffset() { + return flushedLogOffset; + } + + @Nullable + public Long getRowCount() { + return rowCount; + } + + @Nullable + public List<AutoIncIDRange> getAutoIncIDRanges() { + return autoIncIDRanges; + } + + @Override + public String toString() { + return "TabletState{" + + "flushedLogOffset=" + + flushedLogOffset + + ", rowCount=" + + rowCount + + ", autoIncIDRanges=" + + autoIncIDRanges + + '}'; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 536f8b2be..8a1621a84 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -31,6 +31,7 @@ import org.apache.fluss.exception.NonPrimaryKeyTableException; import org.apache.fluss.exception.NotEnoughReplicasException; import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.ChangelogImage; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.SchemaGetter; @@ -52,6 +53,7 @@ import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.kv.KvManager; import org.apache.fluss.server.kv.KvRecoverHelper; import org.apache.fluss.server.kv.KvTablet; +import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder; import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; @@ -697,6 +699,8 @@ public final class Replica { long restoreStartOffset = 0; Optional<CompletedSnapshot> optCompletedSnapshot = getLatestSnapshot(tableBucket); try { + Long rowCount; + AutoIncIDRange autoIncIDRange; if (optCompletedSnapshot.isPresent()) { LOG.info( "Use snapshot {} to restore kv tablet for {} of table {}.", @@ -714,6 +718,9 @@ public final class Replica { checkNotNull(kvTablet, "kv tablet should not be null."); restoreStartOffset = completedSnapshot.getLogOffset(); + rowCount = completedSnapshot.getRowCount(); + // currently, we only support one auto-increment column. + autoIncIDRange = completedSnapshot.getFirstAutoIncIDRange(); } else { LOG.info( "No snapshot found for {} of {}, restore from log.", @@ -731,10 +738,19 @@ public final class Replica { schemaGetter, tableConfig, arrowCompressionInfo); + + // we don't support rowCount + rowCount = tableConfig.getChangelogImage() == ChangelogImage.WAL ? null : 0L; + // TODO: it is possible that this is a recovered kv tablet without kv snapshot but + // with changelogs, in this case, the kv tablet should also have the + // autoIncIDRange, we may need to get it from the changelog in the future. + // but currently, we set it to null for simplification, as the auto-inc id + // will be fetched from zk if not exist in local. + autoIncIDRange = null; } logTablet.updateMinRetainOffset(restoreStartOffset); - recoverKvTablet(restoreStartOffset); + recoverKvTablet(restoreStartOffset, rowCount, autoIncIDRange); } catch (Exception e) { throw new KvStorageException( String.format( @@ -800,14 +816,16 @@ public final class Replica { return Optional.empty(); } - private void recoverKvTablet(long startRecoverLogOffset) { + private void recoverKvTablet( + long startRecoverLogOffset, + @Nullable Long rowCount, + @Nullable AutoIncIDRange autoIncIDRange) { long start = clock.milliseconds(); checkNotNull(kvTablet, "kv tablet should not be null."); try { KvRecoverHelper.KvRecoverContext recoverContext = new KvRecoverHelper.KvRecoverContext( getTablePath(), - tableBucket, snapshotContext.getZooKeeperClient(), snapshotContext.maxFetchLogSizeInRecoverKv()); KvRecoverHelper kvRecoverHelper = @@ -815,6 +833,8 @@ public final class Replica { kvTablet, logTablet, startRecoverLogOffset, + rowCount, + autoIncIDRange, recoverContext, tableConfig.getKvFormat(), tableConfig.getLogFormat(), @@ -900,7 +920,7 @@ public final class Replica { snapshotContext.getAsyncOperationsThreadPool(), closeableRegistryForKv, snapshotIDCounter, - kvTablet::getFlushedLogOffset, + kvTablet::getTabletState, logTablet::updateMinRetainOffset, bucketLeaderEpochSupplier, coordinatorEpochSupplier, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java index c7c90ab6c..0070f1563 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java @@ -37,6 +37,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; class SegmentSequenceGeneratorTest { private static final TablePath TABLE_PATH = new TablePath("test_db", "test_table"); + private static final int COLUMN_ID = 1; private static final String COLUMN_NAME = "id"; private static final long CACHE_SIZE = 100; @@ -52,6 +53,7 @@ class SegmentSequenceGeneratorTest { BoundedSegmentSequenceGenerator generator = new BoundedSegmentSequenceGenerator( TABLE_PATH, + COLUMN_ID, COLUMN_NAME, new TestingSequenceIDCounter(snapshotIdGenerator), CACHE_SIZE, @@ -77,6 +79,7 @@ class SegmentSequenceGeneratorTest { BoundedSegmentSequenceGenerator generator = new BoundedSegmentSequenceGenerator( new TablePath("test_db", "table1"), + COLUMN_ID, COLUMN_NAME, new TestingSequenceIDCounter(snapshotIdGenerator), CACHE_SIZE, @@ -103,6 +106,7 @@ class SegmentSequenceGeneratorTest { BoundedSegmentSequenceGenerator generator = new BoundedSegmentSequenceGenerator( new TablePath("test_db", "table1"), + COLUMN_ID, COLUMN_NAME, new TestingSequenceIDCounter(snapshotIdGenerator, 2), CACHE_SIZE, @@ -125,6 +129,7 @@ class SegmentSequenceGeneratorTest { BoundedSegmentSequenceGenerator generator = new BoundedSegmentSequenceGenerator( new TablePath("test_db", "table1"), + COLUMN_ID, COLUMN_NAME, new TestingSequenceIDCounter(snapshotIdGenerator), CACHE_SIZE, @@ -144,4 +149,37 @@ class SegmentSequenceGeneratorTest { "Reached maximum value of sequence \"<%s>\" (%d).", COLUMN_NAME, Integer.MAX_VALUE)); } + + @Test + void testUpdateIDRange() { + BoundedSegmentSequenceGenerator generator = + new BoundedSegmentSequenceGenerator( + TABLE_PATH, + COLUMN_ID, + COLUMN_NAME, + new TestingSequenceIDCounter(snapshotIdGenerator), + CACHE_SIZE, + Long.MAX_VALUE); + for (long i = 1; i <= CACHE_SIZE; i++) { + assertThat(generator.nextVal()).isEqualTo(i); + } + + assertThat(generator.currentSequenceRange()) + .isEqualTo(new AutoIncIDRange(COLUMN_ID, 101, CACHE_SIZE)); + + // update the ID range to [1000, 1100] + generator.updateSequenceRange(new AutoIncIDRange(COLUMN_ID, 1000, 1100)); + + for (long i = 1000; i <= 1050; i++) { + assertThat(generator.nextVal()).isEqualTo(i); + } + assertThat(generator.currentSequenceRange()) + .isEqualTo(new AutoIncIDRange(COLUMN_ID, 1051, 1100)); + + assertThatThrownBy( + () -> generator.updateSequenceRange(new AutoIncIDRange(9999, 1000, 1100))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Column ID mismatch when updating sequence range. Expected column ID: 1, but got: 9999."); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java index 2bafcd670..bd2d764a7 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java @@ -32,6 +32,7 @@ public class TestingSequenceGeneratorFactory implements SequenceGeneratorFactory TablePath tablePath, Schema.Column autoIncrementColumn, long idCacheSize) { return new BoundedSegmentSequenceGenerator( tablePath, + autoIncrementColumn.getColumnId(), autoIncrementColumn.getName(), new TestingSequenceIDCounter(new AtomicLong(0)), idCacheSize, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java index d89087ab2..2bd933977 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java @@ -19,9 +19,11 @@ package org.apache.fluss.server.kv.snapshot; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; import org.apache.fluss.utils.json.JsonSerdeTestBase; import java.util.Arrays; +import java.util.Collections; import java.util.List; /** Test for {@link org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde}. */ @@ -55,14 +57,18 @@ class CompletedSnapshotJsonSerdeTest extends JsonSerdeTestBase<CompletedSnapshot 1, new FsPath("oss://bucket/snapshot"), new KvSnapshotHandle(sharedFileHandles, privateFileHandles, 5), - 10); + 10, + null, + null); CompletedSnapshot completedSnapshot2 = new CompletedSnapshot( new TableBucket(1, 10L, 1), 1, new FsPath("oss://bucket/snapshot"), new KvSnapshotHandle(sharedFileHandles, privateFileHandles, 5), - 10); + 10, + 1234L, + Collections.singletonList(new AutoIncIDRange(2, 10000, 20000))); return new CompletedSnapshot[] {completedSnapshot1, completedSnapshot2}; } @@ -88,7 +94,7 @@ class CompletedSnapshotJsonSerdeTest extends JsonSerdeTestBase<CompletedSnapshot + "{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/shared/t2.sst\",\"size\":2},\"local_path\":\"localPath2\"}]," + "\"private_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t3\",\"size\":3},\"local_path\":\"localPath3\"}," + "{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t4\",\"size\":4},\"local_path\":\"localPath4\"}]," - + "\"snapshot_incremental_size\":5},\"log_offset\":10}" + + "\"snapshot_incremental_size\":5},\"log_offset\":10,\"row_count\":1234,\"auto_inc_id_range\":[{\"column_id\":2,\"start\":10000,\"end\":20000}]}" }; } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java index 98ca31605..714835ce5 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java @@ -501,7 +501,7 @@ class KvTabletSnapshotTargetTest { executor, cancelStreamRegistry, testingSnapshotIdCounter, - logOffsetGenerator::get, + this::getCurrentTabletState, updateMinRetainOffsetConsumer::set, bucketLeaderEpochSupplier, coordinatorEpochSupplier, @@ -539,7 +539,7 @@ class KvTabletSnapshotTargetTest { executor, cancelStreamRegistry, testingSnapshotIdCounter, - logOffsetGenerator::get, + this::getCurrentTabletState, updateMinRetainOffsetConsumer::set, bucketLeaderEpochSupplier, coordinatorEpochSupplier, @@ -547,6 +547,10 @@ class KvTabletSnapshotTargetTest { 0L); } + private TabletState getCurrentTabletState() { + return new TabletState(logOffsetGenerator.get(), null, null); + } + private RocksIncrementalSnapshot createIncrementalSnapshot(SnapshotFailType snapshotFailType) throws IOException { long lastCompletedSnapshotId = -1L; @@ -626,10 +630,11 @@ class KvTabletSnapshotTargetTest { this.snapshotFailType = snapshotFailType; } + @Override public SnapshotResultSupplier asyncSnapshot( NativeRocksDBSnapshotResources snapshotResources, long snapshotId, - long logOffset, + TabletState tabletState, @Nonnull SnapshotLocation snapshotLocation) { if (snapshotFailType == SnapshotFailType.SYNC_PHASE) { throw new FlussRuntimeException("Fail in snapshot sync phase."); @@ -639,7 +644,7 @@ class KvTabletSnapshotTargetTest { }; } else { return super.asyncSnapshot( - snapshotResources, snapshotId, logOffset, snapshotLocation); + snapshotResources, snapshotId, tabletState, snapshotLocation); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java index 31e53490d..c719c9683 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java @@ -223,8 +223,8 @@ class PeriodicSnapshotManagerTest { if (exceptionMessage != null) { throw new FlussRuntimeException(exceptionMessage); } else { - final long logOffset = 0; - return new SnapshotResult(null, snapshotPath, logOffset); + return new SnapshotResult( + null, snapshotPath, new TabletState(0, null, null)); } }); int snapshotId = 1; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java index 2e4b4fbca..64398516a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java @@ -190,7 +190,11 @@ class RocksIncrementalSnapshotTest { incrementalSnapshot.syncPrepareResources(snapshotId); return incrementalSnapshot - .asyncSnapshot(nativeRocksDBSnapshotResources, snapshotId, 0L, snapshotLocation) + .asyncSnapshot( + nativeRocksDBSnapshotResources, + snapshotId, + new TabletState(0L, null, null), + snapshotLocation) .get(closeableRegistry) .getKvSnapshotHandle(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java index 273858d31..1ff32c7e7 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java @@ -129,7 +129,9 @@ public class KvTestUtils { tableBucket.getBucket(), snapshotId)), new KvSnapshotHandle(Collections.emptyList(), Collections.emptyList(), 0), - 0); + 0, + null, + null); } public static int getKeyCounts(RocksDB rocksDB) {
