This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 0887cd0122136c24117597506abbb6b9ea815ad5
Author: Jark Wu <[email protected]>
AuthorDate: Wed Feb 11 17:09:44 2026 +0800

    [kv] Introduce TabletState to support persistence of auto-inc buffer and 
row count (#2651)
---
 .../fluss/client/table/FlussTableITCase.java       |  49 ++++-
 .../apache/fluss/server/kv/KvRecoverHelper.java    | 242 ++++++++++++++++++++-
 .../java/org/apache/fluss/server/kv/KvTablet.java  |  42 +++-
 .../fluss/server/kv/autoinc/AutoIncIDRange.java    |  84 +++++++
 .../server/kv/autoinc/AutoIncrementManager.java    |  28 ++-
 .../autoinc/BoundedSegmentSequenceGenerator.java   |  21 +-
 .../fluss/server/kv/autoinc/SequenceGenerator.java |  14 ++
 .../kv/autoinc/ZkSequenceGeneratorFactory.java     |  33 +--
 .../fluss/server/kv/prewrite/KvPreWriteBuffer.java |   8 +-
 .../server/kv/snapshot/CompletedSnapshot.java      |  52 ++++-
 .../kv/snapshot/CompletedSnapshotJsonSerde.java    |  60 ++++-
 .../server/kv/snapshot/KvTabletSnapshotTarget.java |  26 ++-
 .../kv/snapshot/RocksIncrementalSnapshot.java      |  12 +-
 .../fluss/server/kv/snapshot/SnapshotResult.java   |  12 +-
 .../fluss/server/kv/snapshot/SnapshotRunner.java   |   4 +-
 .../fluss/server/kv/snapshot/TabletState.java      |  74 +++++++
 .../org/apache/fluss/server/replica/Replica.java   |  28 ++-
 .../kv/autoinc/SegmentSequenceGeneratorTest.java   |  38 ++++
 .../autoinc/TestingSequenceGeneratorFactory.java   |   1 +
 .../snapshot/CompletedSnapshotJsonSerdeTest.java   |  12 +-
 .../kv/snapshot/KvTabletSnapshotTargetTest.java    |  13 +-
 .../kv/snapshot/PeriodicSnapshotManagerTest.java   |   4 +-
 .../kv/snapshot/RocksIncrementalSnapshotTest.java  |   6 +-
 .../apache/fluss/server/testutils/KvTestUtils.java |   4 +-
 24 files changed, 788 insertions(+), 79 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index 1fbd78ff7..8fd0c138d 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -498,7 +498,13 @@ class FlussTableITCase extends ClientToServerITCaseBase {
                         .primaryKey("col1")
                         .build();
         TableDescriptor tableDescriptor =
-                TableDescriptor.builder().schema(schema).distributedBy(1, 
"col1").build();
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(1, "col1")
+                        // set a small cache size to test the auto-increment 
buffer rollover and
+                        // recovery logic in case of server failure and schema 
change.
+                        
.property(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE, 5L)
+                        .build();
         // create the table
         TablePath tablePath =
                 TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), 
"test_pk_table_auto_inc");
@@ -547,11 +553,15 @@ class FlussTableITCase extends ClientToServerITCaseBase {
         };
         verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema);
 
+        // trigger snapshot to make sure the auto-inc buffer is snapshotted
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
         // schema change case2: update new data with new schema.
 
         Object[][] recordsWithNewSchema = {
             {"a", null, "batch3", 10},
-            {"b", null, "batch3", 11}
+            {"b", null, "batch3", 11},
+            {"f", null, "batch3", 12}
         };
         partialUpdateRecords(
                 new String[] {"col1", "col3", "col4"}, recordsWithNewSchema, 
newSchemaTable);
@@ -567,7 +577,8 @@ class FlussTableITCase extends ClientToServerITCaseBase {
             {"b", 2L, "batch3", 11},
             {"c", 3L, "batch1", null},
             {"d", 4L, "batch2", null},
-            {"e", 5L, "batch2", null}
+            {"e", 5L, "batch2", null},
+            {"f", 6L, "batch3", 12}
         };
         verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema);
 
@@ -582,14 +593,42 @@ class FlussTableITCase extends ClientToServerITCaseBase {
         newSchemaTable = conn.getTable(tablePath);
         verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema);
 
-        Object[][] restartWriteRecords = {{"f", null, "batch4", 12}};
+        Object[][] restartWriteRecords = {{"g", null, "batch4", 13}};
         partialUpdateRecords(
                 new String[] {"col1", "col3", "col4"}, restartWriteRecords, 
newSchemaTable);
 
         // The auto-increment column should start from a new segment for now, 
and local cached
         // IDs have been discarded.
-        Object[][] expectedRestartWriteRecords = {{"f", 100001L, "batch4", 
12}};
+        Object[][] expectedRestartWriteRecords = {{"g", 7L, "batch4", 13}};
         verifyRecords(expectedRestartWriteRecords, newSchemaTable, newSchema);
+
+        // trigger snapshot to make sure the auto-inc buffer is snapshotted 
again
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
+        Object[][] writeRecordsAgain = {
+            {"h", null, "batch5", 14},
+            {"i", null, "batch5", 15},
+            {"j", null, "batch5", 16}
+        };
+        partialUpdateRecords(
+                new String[] {"col1", "col3", "col4"}, writeRecordsAgain, 
newSchemaTable);
+
+        // kill and restart all tablet server, the recovered id range will hit 
10 cache limit
+        for (int i = 0; i < 3; i++) {
+            FLUSS_CLUSTER_EXTENSION.stopTabletServer(i);
+            FLUSS_CLUSTER_EXTENSION.startTabletServer(i);
+        }
+
+        Object[][] finalWriteRecords = {{"k", null, "batch6", 17}};
+        partialUpdateRecords(
+                new String[] {"col1", "col3", "col4"}, finalWriteRecords, 
newSchemaTable);
+        Object[][] expectedFinalRecords = {
+            {"h", 8L, "batch5", 14},
+            {"i", 9L, "batch5", 15},
+            {"j", 10L, "batch5", 16},
+            {"k", 11L, "batch6", 17}
+        };
+        verifyRecords(expectedFinalRecords, newSchemaTable, newSchema);
     }
 
     @Test
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
index 437eb49fd..5d32e413e 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
@@ -19,8 +19,8 @@ package org.apache.fluss.server.kv;
 
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaGetter;
-import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.ChangeType;
@@ -35,24 +35,34 @@ import org.apache.fluss.row.encode.KeyEncoder;
 import org.apache.fluss.row.encode.RowEncoder;
 import org.apache.fluss.row.encode.ValueEncoder;
 import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
 import org.apache.fluss.server.log.FetchIsolation;
 import org.apache.fluss.server.log.LogTablet;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.CloseableIterator;
 import org.apache.fluss.utils.function.ThrowingConsumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
+import java.util.List;
+
 import static org.apache.fluss.server.TabletManagerBase.getTableInfo;
 
 /** A helper for recovering Kv from log. */
 public class KvRecoverHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvRecoverHelper.class);
 
     private final KvTablet kvTablet;
     private final LogTablet logTablet;
     private final long recoverPointOffset;
+    @Nullable private final Long recoverPointRowCount;
+    @Nullable private final AutoIncIDRange autoIncRange;
     private final KvRecoverContext recoverContext;
     private final KvFormat kvFormat;
     private final LogFormat logFormat;
@@ -71,6 +81,8 @@ public class KvRecoverHelper {
             KvTablet kvTablet,
             LogTablet logTablet,
             long recoverPointOffset,
+            @Nullable Long recoverPointRowCount,
+            @Nullable AutoIncIDRange autoIncRange,
             KvRecoverContext recoverContext,
             KvFormat kvFormat,
             LogFormat logFormat,
@@ -78,6 +90,8 @@ public class KvRecoverHelper {
         this.kvTablet = kvTablet;
         this.logTablet = logTablet;
         this.recoverPointOffset = recoverPointOffset;
+        this.recoverPointRowCount = recoverPointRowCount;
+        this.autoIncRange = autoIncRange;
         this.recoverContext = recoverContext;
         this.kvFormat = kvFormat;
         this.logFormat = logFormat;
@@ -97,7 +111,18 @@ public class KvRecoverHelper {
 
         initSchema(schemaGetter.getLatestSchemaInfo().getSchemaId());
 
+        Schema schema = schemaGetter.getLatestSchemaInfo().getSchema();
+
         long nextLogOffset = recoverPointOffset;
+        RowCountUpdater rowCountUpdater =
+                recoverPointRowCount != null
+                        ? new RowCountUpdaterImpl(recoverPointRowCount)
+                        : new NoOpRowCountUpdater();
+        AutoIncIDRangeUpdater autoIncIdRangeUpdater =
+                autoIncRange != null
+                        ? new AutoIncIDRangeUpdaterImpl(
+                                schema, autoIncRange, 
kvTablet.getAutoIncrementCacheSize())
+                        : new NoOpAutoIncIDRangeUpdater();
         // read to high watermark
         try (KvBatchWriter kvBatchWriter = kvTablet.createKvBatchWriter()) {
             ThrowingConsumer<KeyValueAndLogOffset, Exception> 
resumeRecordApplier =
@@ -111,22 +136,59 @@ public class KvRecoverHelper {
 
             nextLogOffset =
                     readLogRecordsAndApply(
-                            nextLogOffset, FetchIsolation.HIGH_WATERMARK, 
resumeRecordApplier);
+                            nextLogOffset,
+                            rowCountUpdater,
+                            autoIncIdRangeUpdater,
+                            FetchIsolation.HIGH_WATERMARK,
+                            resumeRecordApplier);
         }
 
         // the all data up to nextLogOffset has been flush into kv
         kvTablet.setFlushedLogOffset(nextLogOffset);
+        // if we have valid row count (means this table supports row count),
+        // update the row count in kv tablet
+        if (recoverPointRowCount != null) {
+            kvTablet.setRowCount(rowCountUpdater.getRowCount());
+            LOG.info(
+                    "Updated row count to {} for table '{}' after recovering 
from log",
+                    rowCountUpdater.getRowCount(),
+                    kvTablet.getTablePath());
+        } else {
+            LOG.info(
+                    "Skipping row count update after recovering from log, 
because this table '{}' doesn't support row count.",
+                    kvTablet.getTablePath());
+        }
 
         // read to log end offset
         ThrowingConsumer<KeyValueAndLogOffset, Exception> resumeRecordApplier =
                 (resumeRecord) ->
                         kvTablet.putToPreWriteBuffer(
                                 resumeRecord.key, resumeRecord.value, 
resumeRecord.logOffset);
-        readLogRecordsAndApply(nextLogOffset, FetchIsolation.LOG_END, 
resumeRecordApplier);
+        readLogRecordsAndApply(
+                nextLogOffset,
+                // records in pre-write-buffer shouldn't affect the row count, 
the high-watermark
+                // of pre-write-buffer records will update the row-count async
+                new NoOpRowCountUpdater(),
+                // we should update auto-inc-id for pre-write-buffer, because 
id has been used.
+                autoIncIdRangeUpdater,
+                FetchIsolation.LOG_END,
+                resumeRecordApplier);
+
+        if (autoIncRange != null) {
+            AutoIncIDRange newRange = autoIncIdRangeUpdater.getNewRange();
+            kvTablet.updateAutoIncrementIDRange(newRange);
+            LOG.info(
+                    "Updated auto inc id range to [{}, {}] for table '{}' 
after recovering from log",
+                    newRange.getStart(),
+                    newRange.getEnd(),
+                    kvTablet.getTablePath());
+        }
     }
 
     private long readLogRecordsAndApply(
             long startFetchOffset,
+            RowCountUpdater rowCountUpdater,
+            AutoIncIDRangeUpdater autoIncIdRangeUpdater,
             FetchIsolation fetchIsolation,
             ThrowingConsumer<KeyValueAndLogOffset, Exception> 
resumeRecordConsumer)
             throws Exception {
@@ -151,14 +213,17 @@ public class KvRecoverHelper {
                             logRecordBatch.records(readContext)) {
                         while (logRecordIter.hasNext()) {
                             LogRecord logRecord = logRecordIter.next();
-                            if (logRecord.getChangeType() != 
ChangeType.UPDATE_BEFORE) {
+                            ChangeType changeType = logRecord.getChangeType();
+                            rowCountUpdater.applyChange(changeType);
+
+                            if (changeType != ChangeType.UPDATE_BEFORE) {
                                 InternalRow logRow = logRecord.getRow();
                                 byte[] key = keyEncoder.encodeKey(logRow);
                                 byte[] value = null;
-                                if (logRecord.getChangeType() != 
ChangeType.DELETE) {
+                                if (changeType != ChangeType.DELETE) {
                                     // the log row format may not compatible 
with kv row format,
                                     // e.g, arrow vs. compacted, thus needs a 
conversion here.
-                                    BinaryRow row = 
toKvRow(logRecord.getRow());
+                                    BinaryRow row = toKvRow(logRow);
                                     value =
                                             ValueEncoder.encodeValue(
                                                     
currentSchemaId.shortValue(), row);
@@ -166,6 +231,10 @@ public class KvRecoverHelper {
                                 resumeRecordConsumer.accept(
                                         new KeyValueAndLogOffset(
                                                 key, value, 
logRecord.logOffset()));
+
+                                // reuse the logRow instance which is usually 
a CompactedRow which
+                                // has been deserialized during toKvRow(..)
+                                autoIncIdRangeUpdater.applyRecord(changeType, 
logRow);
                             }
                         }
                     }
@@ -247,20 +316,169 @@ public class KvRecoverHelper {
     public static class KvRecoverContext {
 
         private final TablePath tablePath;
-        private final TableBucket tableBucket;
 
         private final ZooKeeperClient zkClient;
         private final int maxFetchLogSizeInRecoverKv;
 
         public KvRecoverContext(
-                TablePath tablePath,
-                TableBucket tableBucket,
-                ZooKeeperClient zkClient,
-                int maxFetchLogSizeInRecoverKv) {
+                TablePath tablePath, ZooKeeperClient zkClient, int 
maxFetchLogSizeInRecoverKv) {
             this.tablePath = tablePath;
-            this.tableBucket = tableBucket;
             this.zkClient = zkClient;
             this.maxFetchLogSizeInRecoverKv = maxFetchLogSizeInRecoverKv;
         }
     }
+
+    // 
------------------------------------------------------------------------------------------
+    // Below are some helpers for recovering tablet state from log
+    // 
------------------------------------------------------------------------------------------
+
+    /** A helper to update the latest row count during recovering from log. */
+    private interface RowCountUpdater {
+
+        /** Apply the change to the row count according to the change type. */
+        void applyChange(ChangeType changeType);
+
+        /** Get the latest row count. Returns -1 if this table doesn't support 
row count. */
+        long getRowCount();
+    }
+
+    /**
+     * A simple implementation of {@link RowCountUpdater} which maintains a 
row count by applying
+     * the change type of each log record.
+     */
+    private static class RowCountUpdaterImpl implements RowCountUpdater {
+        private long rowCount;
+
+        public RowCountUpdaterImpl(long initialRowCount) {
+            this.rowCount = initialRowCount;
+        }
+
+        @Override
+        public void applyChange(ChangeType changeType) {
+            if (changeType == ChangeType.INSERT || changeType == 
ChangeType.UPDATE_AFTER) {
+                rowCount++;
+            } else if (changeType == ChangeType.DELETE || changeType == 
ChangeType.UPDATE_BEFORE) {
+                rowCount--;
+            }
+        }
+
+        @Override
+        public long getRowCount() {
+            return rowCount;
+        }
+    }
+
+    /**
+     * A no-op implementation of {@link RowCountUpdater} for the table which 
doesn't support row
+     * count.
+     */
+    private static class NoOpRowCountUpdater implements RowCountUpdater {
+
+        @Override
+        public void applyChange(ChangeType changeType) {
+            // do nothing
+        }
+
+        @Override
+        public long getRowCount() {
+            return -1;
+        }
+    }
+
+    /** A helper to update the auto inc id range during recovering from log. */
+    private interface AutoIncIDRangeUpdater {
+        /**
+         * Apply the change to the auto inc id range according to the change 
type and log record.
+         */
+        void applyRecord(ChangeType changeType, InternalRow changelog);
+
+        /**
+         * Get a new auto inc id range for the next pre-write buffer. Returns 
null if this table
+         * doesn't support auto inc id.
+         */
+        AutoIncIDRange getNewRange();
+    }
+
+    /**
+     * A simple implementation of {@link AutoIncIDRangeUpdater} which 
maintains a auto inc id range
+     * by applying the change type and log record of each log record. It 
assumes the auto inc column
+     * is always increasing when new record inserted, and the auto inc 
column's value won't be
+     * updated once it's inserted.
+     */
+    private static class AutoIncIDRangeUpdaterImpl implements 
AutoIncIDRangeUpdater {
+        private final long autoIncrementCacheSize;
+        private final boolean isLongType;
+        private final int columnIndex;
+        private final int columnId;
+        private final long end;
+
+        private long current;
+
+        private AutoIncIDRangeUpdaterImpl(
+                Schema schema, AutoIncIDRange autoIncRange, long 
autoIncrementCacheSize) {
+            this.autoIncrementCacheSize = autoIncrementCacheSize;
+            this.end = autoIncRange.getEnd();
+            this.columnId = autoIncRange.getColumnId();
+            this.columnIndex = autoIncColumnIndex(schema, 
autoIncRange.getColumnId());
+            this.isLongType =
+                    
schema.getColumns().get(columnIndex).getDataType().is(DataTypeRoot.BIGINT);
+            // initialize current to be one less than the start of the range
+            this.current = autoIncRange.getStart() - 1;
+        }
+
+        private int autoIncColumnIndex(Schema schema, int columnId) {
+            List<Schema.Column> columns = schema.getColumns();
+            for (int i = 0; i < columns.size(); i++) {
+                if (columns.get(i).getColumnId() == columnId) {
+                    return i;
+                }
+            }
+            throw new IllegalArgumentException(
+                    "Auto inc column with column id " + columnId + " not found 
in schema");
+        }
+
+        @Override
+        public void applyRecord(ChangeType changeType, InternalRow changelog) {
+            // only INSERT records will have new sequence id, and it must be 
not null
+            if (changeType == ChangeType.INSERT) {
+                if (isLongType) {
+                    current = changelog.getLong(columnIndex);
+                } else {
+                    // currently, we only support long and int type for auto 
inc column, and they
+                    // must be not null
+                    current = changelog.getInt(columnIndex);
+                }
+            }
+        }
+
+        @Override
+        public AutoIncIDRange getNewRange() {
+            long newStart = current + 1;
+            // id range is starting from [1, end], so if newStart is greater 
than end, it means we
+            // have exhausted the id range, and the kv tablet has allocate a 
new range after the
+            // snapshot, so we calculate the new range based on the current 
new start
+            long newEnd = end;
+            while (newStart > newEnd) {
+                newEnd = newEnd + autoIncrementCacheSize;
+            }
+            return new AutoIncIDRange(columnId, newStart, newEnd);
+        }
+    }
+
+    /**
+     * A no-op implementation of {@link AutoIncIDRangeUpdater} for the table 
which doesn't support
+     * auto inc column.
+     */
+    private static class NoOpAutoIncIDRangeUpdater implements 
AutoIncIDRangeUpdater {
+
+        @Override
+        public void applyRecord(ChangeType changeType, InternalRow changelog) {
+            // do nothing
+        }
+
+        @Override
+        public AutoIncIDRange getNewRange() {
+            return null;
+        }
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 8d279555a..ca89fe829 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -46,6 +46,7 @@ import org.apache.fluss.row.arrow.ArrowWriterPool;
 import org.apache.fluss.row.arrow.ArrowWriterProvider;
 import org.apache.fluss.row.encode.ValueDecoder;
 import org.apache.fluss.rpc.protocol.MergeMode;
+import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
 import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
 import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
@@ -59,6 +60,7 @@ import org.apache.fluss.server.kv.rowmerger.RowMerger;
 import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
 import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader;
 import org.apache.fluss.server.kv.snapshot.RocksIncrementalSnapshot;
+import org.apache.fluss.server.kv.snapshot.TabletState;
 import org.apache.fluss.server.kv.wal.ArrowWalBuilder;
 import org.apache.fluss.server.kv.wal.CompactedWalBuilder;
 import org.apache.fluss.server.kv.wal.IndexWalBuilder;
@@ -96,6 +98,7 @@ import static 
org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
 @ThreadSafe
 public final class KvTablet {
     private static final Logger LOG = LoggerFactory.getLogger(KvTablet.class);
+    private static final long ROW_COUNT_DISABLED = -1;
 
     private final PhysicalTablePath physicalPath;
     private final TableBucket tableBucket;
@@ -136,6 +139,8 @@ public final class KvTablet {
      */
     private volatile long flushedLogOffset = 0;
 
+    private volatile long rowCount;
+
     @GuardedBy("kvLock")
     private volatile boolean isClosed = false;
 
@@ -178,6 +183,8 @@ public final class KvTablet {
         this.changelogImage = changelogImage;
         this.rocksDBStatistics = rocksDBStatistics;
         this.autoIncrementManager = autoIncrementManager;
+        // disable row count for WAL image mode.
+        this.rowCount = changelogImage == ChangelogImage.WAL ? 
ROW_COUNT_DISABLED : 0L;
     }
 
     public static KvTablet create(
@@ -253,6 +260,14 @@ public final class KvTablet {
         return physicalPath.getTablePath();
     }
 
+    public long getAutoIncrementCacheSize() {
+        return autoIncrementManager.getAutoIncrementCacheSize();
+    }
+
+    public void updateAutoIncrementIDRange(AutoIncIDRange newRange) {
+        autoIncrementManager.updateIDRange(newRange);
+    }
+
     @Nullable
     public String getPartitionName() {
         return physicalPath.getPartitionName();
@@ -276,8 +291,24 @@ public final class KvTablet {
         this.flushedLogOffset = flushedLogOffset;
     }
 
-    public long getFlushedLogOffset() {
-        return flushedLogOffset;
+    void setRowCount(long rowCount) {
+        this.rowCount = rowCount;
+    }
+
+    /**
+     * Get the current state of the tablet, including the log offset, row 
count and auto-increment
+     * ID range. This is used for snapshot and recovery to capture the state 
of the tablet at a
+     * specific log offset.
+     *
+     * <p>Note: this method must be called under the kvLock to ensure the 
consistency between the
+     * returned state and the log offset.
+     */
+    @GuardedBy("kvLock")
+    public TabletState getTabletState() {
+        return new TabletState(
+                flushedLogOffset,
+                rowCount == ROW_COUNT_DISABLED ? null : rowCount,
+                autoIncrementManager.getCurrentIDRanges());
     }
 
     /**
@@ -632,8 +663,13 @@ public final class KvTablet {
                                 tableBucket);
                     } else {
                         try {
-                            kvPreWriteBuffer.flush(exclusiveUpToLogOffset);
+                            int rowCountDiff = 
kvPreWriteBuffer.flush(exclusiveUpToLogOffset);
                             flushedLogOffset = exclusiveUpToLogOffset;
+                            if (rowCount != ROW_COUNT_DISABLED) {
+                                // row count is enabled, we update the row 
count after flush.
+                                long currentRowCount = rowCount;
+                                rowCount = currentRowCount + rowCountDiff;
+                            }
                         } catch (Throwable t) {
                             fatalErrorHandler.onFatalError(
                                     new KvStorageException("Failed to flush kv 
pre-write buffer."));
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncIDRange.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncIDRange.java
new file mode 100644
index 000000000..3c1297c67
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncIDRange.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of IDs allocated for auto-increment purposes. This class 
encapsulates the
+ * auto-increment column id, the start and end of the ID range. The range is 
[start, end]. There is
+ * possible that the start > end which means the range is empty.
+ */
+public class AutoIncIDRange {
+
+    private final int columnId;
+    private final long start;
+    private final long end;
+
+    public AutoIncIDRange(int columnId, long start, long end) {
+        this.columnId = columnId;
+        this.start = start;
+        this.end = end;
+    }
+
+    /**
+     * Returns the column ID of the auto-increment column that associated with 
this auto-increment
+     * ID range.
+     */
+    public int getColumnId() {
+        return columnId;
+    }
+
+    /** Returns the starting ID of the range (inclusive). */
+    public long getStart() {
+        return start;
+    }
+
+    /** Returns the ending ID of the range (inclusive). */
+    public long getEnd() {
+        return end;
+    }
+
+    /** Checks if the ID range is empty (i.e., start is greater than end). */
+    public boolean isEmpty() {
+        return start > end;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AutoIncIDRange autoIncIdRange = (AutoIncIDRange) o;
+        return columnId == autoIncIdRange.getColumnId()
+                && start == autoIncIdRange.start
+                && end == autoIncIdRange.end;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(columnId, start, end);
+    }
+
+    @Override
+    public String toString() {
+        return "IDRange{" + "columnId=" + columnId + ", start=" + start + ", 
end=" + end + '}';
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
index fca383c8f..8a69e1f1b 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
@@ -27,9 +27,11 @@ import org.apache.fluss.metadata.TablePath;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.fluss.utils.Preconditions.checkState;
@@ -43,6 +45,7 @@ public class AutoIncrementManager {
     // No-op implementation that returns the input unchanged.
     public static final AutoIncrementUpdater NO_OP_UPDATER = rowValue -> 
rowValue;
 
+    private final long autoIncrementCacheSize;
     private final SchemaGetter schemaGetter;
     private final Cache<Integer, AutoIncrementUpdater> 
autoIncrementUpdaterCache;
     private final int autoIncrementColumnId;
@@ -54,6 +57,7 @@ public class AutoIncrementManager {
             TableConfig tableConf,
             SequenceGeneratorFactory seqGeneratorFactory) {
         this.schemaGetter = schemaGetter;
+        this.autoIncrementCacheSize = tableConf.getAutoIncrementCacheSize();
         int schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
         Schema schema = schemaGetter.getSchema(schemaId);
         List<String> autoIncrementColumnNames = 
schema.getAutoIncrementColumnNames();
@@ -68,7 +72,7 @@ public class AutoIncrementManager {
             autoIncrementColumnId = autoIncrementColumn.getColumnId();
             sequenceGenerator =
                     seqGeneratorFactory.createSequenceGenerator(
-                            tablePath, autoIncrementColumn, 
tableConf.getAutoIncrementCacheSize());
+                            tablePath, autoIncrementColumn, 
autoIncrementCacheSize);
             autoIncrementUpdaterCache =
                     Caffeine.newBuilder()
                             .maximumSize(5)
@@ -93,6 +97,28 @@ public class AutoIncrementManager {
         }
     }
 
+    @Nullable
+    public List<AutoIncIDRange> getCurrentIDRanges() {
+        if (autoIncrementColumnId == -1) {
+            return null;
+        } else {
+            return 
Collections.singletonList(sequenceGenerator.currentSequenceRange());
+        }
+    }
+
+    public void updateIDRange(AutoIncIDRange newRange) {
+        if (autoIncrementColumnId != -1) {
+            sequenceGenerator.updateSequenceRange(newRange);
+        } else {
+            throw new IllegalStateException(
+                    "Cannot update ID range for a table without auto-increment 
column.");
+        }
+    }
+
+    public long getAutoIncrementCacheSize() {
+        return autoIncrementCacheSize;
+    }
+
     private AutoIncrementUpdater createAutoIncrementUpdater(KvFormat kvFormat, 
int schemaId) {
         Schema schema = schemaGetter.getSchema(schemaId);
         int[] autoIncrementColumnIds = schema.getAutoIncrementColumnIds();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
index 44afe717e..1198f6b85 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
@@ -36,6 +36,7 @@ public class BoundedSegmentSequenceGenerator implements 
SequenceGenerator {
 
     private final SequenceIDCounter sequenceIDCounter;
     private final TablePath tablePath;
+    private final int columnId;
     private final String columnName;
     private final long cacheSize;
     private final long maxAllowedValue;
@@ -44,16 +45,18 @@ public class BoundedSegmentSequenceGenerator implements 
SequenceGenerator {
 
     public BoundedSegmentSequenceGenerator(
             TablePath tablePath,
+            int columnId,
             String columnName,
             SequenceIDCounter sequenceIDCounter,
             long idCacheSize,
             long maxAllowedValue) {
         this.cacheSize = idCacheSize;
+        this.columnId = columnId;
         this.columnName = columnName;
         this.tablePath = tablePath;
         this.sequenceIDCounter = sequenceIDCounter;
-        this.segment = IdSegment.EMPTY;
         this.maxAllowedValue = maxAllowedValue;
+        this.segment = IdSegment.EMPTY;
     }
 
     private void fetchSegment() {
@@ -93,6 +96,22 @@ public class BoundedSegmentSequenceGenerator implements 
SequenceGenerator {
         return id;
     }
 
+    @Override
+    public AutoIncIDRange currentSequenceRange() {
+        return new AutoIncIDRange(columnId, segment.current, segment.end);
+    }
+
+    @Override
+    public void updateSequenceRange(AutoIncIDRange newRange) {
+        if (newRange.getColumnId() != columnId) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Column ID mismatch when updating sequence range. 
Expected column ID: %d, but got: %d.",
+                            columnId, newRange.getColumnId()));
+        }
+        this.segment = new IdSegment(newRange.getStart(), newRange.getEnd());
+    }
+
     private static class IdSegment {
         private static final IdSegment EMPTY = new IdSegment(0, -1);
         final long end;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java
index d448ffa4c..648479376 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java
@@ -29,4 +29,18 @@ public interface SequenceGenerator {
      * @return the next sequential value of the auto-increment column
      */
     long nextVal();
+
+    /**
+     * Returns the current upper bound of the sequence range that has been 
allocated. This is useful
+     * for monitoring the current state of the sequence generator.
+     */
+    AutoIncIDRange currentSequenceRange();
+
+    /**
+     * Updates the sequence range with a new range. This method is typically 
called when the kv
+     * tablet is restored from snapshot.
+     *
+     * @param newRange the new sequence range to be set
+     */
+    void updateSequenceRange(AutoIncIDRange newRange);
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java
index 1842f47f6..998893aaf 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java
@@ -21,6 +21,7 @@ package org.apache.fluss.server.kv.autoinc;
 
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.SequenceIDCounter;
 import org.apache.fluss.server.zk.ZkSequenceIDCounter;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.ZkData;
@@ -38,24 +39,30 @@ public class ZkSequenceGeneratorFactory implements 
SequenceGeneratorFactory {
     @Override
     public SequenceGenerator createSequenceGenerator(
             TablePath tablePath, Schema.Column autoIncrementColumn, long 
idCacheSize) {
-        DataTypeRoot typeRoot = 
autoIncrementColumn.getDataType().getTypeRoot();
-        final long maxAllowedValue;
+        return new BoundedSegmentSequenceGenerator(
+                tablePath,
+                autoIncrementColumn.getColumnId(),
+                autoIncrementColumn.getName(),
+                createSequenceIDCounter(tablePath, 
autoIncrementColumn.getColumnId()),
+                idCacheSize,
+                checkMaxAllowedValue(autoIncrementColumn));
+    }
+
+    private SequenceIDCounter createSequenceIDCounter(TablePath tablePath, int 
columnId) {
+        return new ZkSequenceIDCounter(
+                zkClient.getCuratorClient(),
+                ZkData.AutoIncrementColumnZNode.path(tablePath, columnId));
+    }
+
+    private static long checkMaxAllowedValue(Schema.Column column) {
+        DataTypeRoot typeRoot = column.getDataType().getTypeRoot();
         if (typeRoot == DataTypeRoot.INTEGER) {
-            maxAllowedValue = Integer.MAX_VALUE;
+            return Integer.MAX_VALUE;
         } else if (typeRoot == DataTypeRoot.BIGINT) {
-            maxAllowedValue = Long.MAX_VALUE;
+            return Long.MAX_VALUE;
         } else {
             throw new IllegalArgumentException(
                     "Auto-increment column must be of type INTEGER or BIGINT");
         }
-        return new BoundedSegmentSequenceGenerator(
-                tablePath,
-                autoIncrementColumn.getName(),
-                new ZkSequenceIDCounter(
-                        zkClient.getCuratorClient(),
-                        ZkData.AutoIncrementColumnZNode.path(
-                                tablePath, autoIncrementColumn.getColumnId())),
-                idCacheSize,
-                maxAllowedValue);
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
index d040967f3..5908ba4b1 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
@@ -200,8 +200,10 @@ public class KvPreWriteBuffer implements AutoCloseable {
      *
      * @param exclusiveUpToLogSequenceNumber the exclusive upper bound of the 
log sequence number to
      *     be flushed
+     * @return the row count difference of the kv entries in the buffer after 
flushing.
      */
-    public void flush(long exclusiveUpToLogSequenceNumber) throws IOException {
+    public int flush(long exclusiveUpToLogSequenceNumber) throws IOException {
+        int rowCountDiff = 0;
         int flushedCount = 0;
         for (Iterator<KvEntry> it = allKvEntries.iterator(); it.hasNext(); ) {
             KvEntry entry = it.next();
@@ -218,9 +220,11 @@ public class KvPreWriteBuffer implements AutoCloseable {
             Value value = entry.getValue();
             if (value.value != null) {
                 flushedCount += 1;
+                rowCountDiff += 1;
                 kvBatchWriter.put(entry.getKey().key, value.value);
             } else {
                 flushedCount += 1;
+                rowCountDiff -= 1;
                 kvBatchWriter.delete(entry.getKey().key);
             }
 
@@ -233,6 +237,8 @@ public class KvPreWriteBuffer implements AutoCloseable {
         if (flushedCount > 0) {
             kvBatchWriter.flush();
         }
+
+        return rowCountDiff;
     }
 
     @VisibleForTesting
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java
index b58e1728c..86cd25858 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java
@@ -21,12 +21,15 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.fs.FileSystem;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
 import org.apache.fluss.utils.concurrent.FutureUtils;
 
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -68,6 +71,18 @@ public class CompletedSnapshot {
     /** The next log offset when the snapshot is triggered. */
     private final long logOffset;
 
+    /**
+     * The row count of the snapshot, null for legacy tables that doesn't 
support row count
+     * statistics.
+     */
+    @Nullable private final Long rowCount;
+
+    /**
+     * The auto-increment ID ranges of the snapshot, null for legacy tables 
that doesn't support or
+     * doesn't have auto-increment columns.
+     */
+    @Nullable private final List<AutoIncIDRange> autoIncIDRanges;
+
     /** The location where the snapshot is stored. */
     private final FsPath snapshotLocation;
 
@@ -78,12 +93,16 @@ public class CompletedSnapshot {
             long snapshotID,
             FsPath snapshotLocation,
             KvSnapshotHandle kvSnapshotHandle,
-            long logOffset) {
+            long logOffset,
+            @Nullable Long rowCount,
+            @Nullable List<AutoIncIDRange> autoIncIDRanges) {
         this.tableBucket = tableBucket;
         this.snapshotID = snapshotID;
         this.snapshotLocation = snapshotLocation;
         this.kvSnapshotHandle = kvSnapshotHandle;
         this.logOffset = logOffset;
+        this.rowCount = rowCount;
+        this.autoIncIDRanges = autoIncIDRanges;
     }
 
     @VisibleForTesting
@@ -92,7 +111,7 @@ public class CompletedSnapshot {
             long snapshotID,
             FsPath snapshotLocation,
             KvSnapshotHandle kvSnapshotHandle) {
-        this(tableBucket, snapshotID, snapshotLocation, kvSnapshotHandle, 0);
+        this(tableBucket, snapshotID, snapshotLocation, kvSnapshotHandle, 0, 
null, null);
     }
 
     public long getSnapshotID() {
@@ -111,6 +130,24 @@ public class CompletedSnapshot {
         return logOffset;
     }
 
+    @Nullable
+    public Long getRowCount() {
+        return rowCount;
+    }
+
+    @Nullable
+    public List<AutoIncIDRange> getAutoIncIDRanges() {
+        return autoIncIDRanges;
+    }
+
+    @Nullable
+    public AutoIncIDRange getFirstAutoIncIDRange() {
+        if (autoIncIDRanges == null || autoIncIDRanges.isEmpty()) {
+            return null;
+        }
+        return autoIncIDRanges.get(0);
+    }
+
     public long getSnapshotSize() {
         return kvSnapshotHandle.getSnapshotSize();
     }
@@ -188,11 +225,20 @@ public class CompletedSnapshot {
                 && logOffset == that.logOffset
                 && Objects.equals(tableBucket, that.tableBucket)
                 && Objects.equals(kvSnapshotHandle, that.kvSnapshotHandle)
+                && Objects.equals(rowCount, that.rowCount)
+                && Objects.equals(autoIncIDRanges, that.autoIncIDRanges)
                 && Objects.equals(snapshotLocation, that.snapshotLocation);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(tableBucket, snapshotID, kvSnapshotHandle, 
logOffset, snapshotLocation);
+        return Objects.hash(
+                tableBucket,
+                snapshotID,
+                kvSnapshotHandle,
+                logOffset,
+                rowCount,
+                autoIncIDRanges,
+                snapshotLocation);
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java
index 6b79966da..99a7c3d23 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.kv.snapshot;
 
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
 import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.fluss.utils.json.JsonDeserializer;
@@ -55,8 +56,17 @@ public class CompletedSnapshotJsonSerde
     private static final String KV_FILE_LOCAL_PATH = "local_path";
     private static final String SNAPSHOT_INCREMENTAL_SIZE = 
"snapshot_incremental_size";
 
-    // -- for the next log offset when the snapshot is triggered;
+    // 
---------------------------------------------------------------------------------
+    // kv tablet state for the snapshot
+    // 
---------------------------------------------------------------------------------
+
+    // for the next log offset when the snapshot is triggered;
     private static final String LOG_OFFSET = "log_offset";
+    private static final String ROW_COUNT = "row_count";
+    private static final String AUTO_INC_ID_RANGE = "auto_inc_id_range";
+    private static final String AUTO_INC_COLUMN_ID = "column_id";
+    private static final String AUTO_INC_ID_START = "start";
+    private static final String AUTO_INC_ID_END = "end";
 
     @Override
     public void serialize(CompletedSnapshot completedSnapshot, JsonGenerator 
generator)
@@ -103,6 +113,29 @@ public class CompletedSnapshotJsonSerde
         // serialize log offset
         generator.writeNumberField(LOG_OFFSET, 
completedSnapshot.getLogOffset());
 
+        // ROW_COUNT and AUTO_INC_ID_RANGE are added in v0.9, but they are 
nullable and optional, so
+        // we don't bump JSON version here to guarantee the RPC protocol 
compatibility between
+        // TabletServer and CoordinatorServer. See 
CoordinatorGateway#commitKvSnapshot RPC.
+
+        // serialize row count if exists
+        if (completedSnapshot.getRowCount() != null) {
+            generator.writeNumberField(ROW_COUNT, 
completedSnapshot.getRowCount());
+        }
+
+        // serialize auto-increment id range for each auto-increment column
+        if (completedSnapshot.getAutoIncIDRanges() != null
+                && !completedSnapshot.getAutoIncIDRanges().isEmpty()) {
+            generator.writeArrayFieldStart(AUTO_INC_ID_RANGE);
+            for (AutoIncIDRange autoIncIDRange : 
completedSnapshot.getAutoIncIDRanges()) {
+                generator.writeStartObject();
+                generator.writeNumberField(AUTO_INC_COLUMN_ID, 
autoIncIDRange.getColumnId());
+                generator.writeNumberField(AUTO_INC_ID_START, 
autoIncIDRange.getStart());
+                generator.writeNumberField(AUTO_INC_ID_END, 
autoIncIDRange.getEnd());
+                generator.writeEndObject();
+            }
+            generator.writeEndArray();
+        }
+
         generator.writeEndObject();
     }
 
@@ -161,8 +194,31 @@ public class CompletedSnapshotJsonSerde
         // construct CompletedSnapshot
         KvSnapshotHandle kvSnapshotHandle =
                 new KvSnapshotHandle(sharedFileHandles, privateFileHandles, 
incrementalSize);
+
+        Long rowCount = null;
+        if (node.has(ROW_COUNT)) {
+            rowCount = node.get(ROW_COUNT).asLong();
+        }
+
+        List<AutoIncIDRange> autoIncIDRanges = null;
+        if (node.has(AUTO_INC_ID_RANGE)) {
+            autoIncIDRanges = new ArrayList<>();
+            for (JsonNode autoIncIDRangeNode : node.get(AUTO_INC_ID_RANGE)) {
+                int columnId = 
autoIncIDRangeNode.get(AUTO_INC_COLUMN_ID).asInt();
+                long start = 
autoIncIDRangeNode.get(AUTO_INC_ID_START).asLong();
+                long end = autoIncIDRangeNode.get(AUTO_INC_ID_END).asLong();
+                autoIncIDRanges.add(new AutoIncIDRange(columnId, start, end));
+            }
+        }
+
         return new CompletedSnapshot(
-                tableBucket, snapshotId, new FsPath(snapshotLocation), 
kvSnapshotHandle, logOffset);
+                tableBucket,
+                snapshotId,
+                new FsPath(snapshotLocation),
+                kvSnapshotHandle,
+                logOffset,
+                rowCount,
+                autoIncIDRanges);
     }
 
     private List<KvFileHandleAndLocalPath> deserializeKvFileHandles(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
index e61a4999a..569d75c6e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java
@@ -67,7 +67,7 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
     private final int snapshotWriteBufferSize;
     private final FileSystem remoteFileSystem;
     private final SequenceIDCounter snapshotIdCounter;
-    private final Supplier<Long> logOffsetSupplier;
+    private final Supplier<TabletState> tabletStateSupplier;
     private final Consumer<Long> updateMinRetainOffset;
     private final Supplier<Integer> bucketLeaderEpochSupplier;
     private final Supplier<Integer> coordinatorEpochSupplier;
@@ -94,7 +94,7 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
             Executor ioExecutor,
             CloseableRegistry cancelStreamRegistry,
             SequenceIDCounter snapshotIdCounter,
-            Supplier<Long> logOffsetSupplier,
+            Supplier<TabletState> tabletStateSupplier,
             Consumer<Long> updateMinRetainOffset,
             Supplier<Integer> bucketLeaderEpochSupplier,
             Supplier<Integer> coordinatorEpochSupplier,
@@ -111,7 +111,7 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
                 ioExecutor,
                 cancelStreamRegistry,
                 snapshotIdCounter,
-                logOffsetSupplier,
+                tabletStateSupplier,
                 updateMinRetainOffset,
                 bucketLeaderEpochSupplier,
                 coordinatorEpochSupplier,
@@ -129,7 +129,7 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
             Executor ioExecutor,
             CloseableRegistry cancelStreamRegistry,
             SequenceIDCounter snapshotIdCounter,
-            Supplier<Long> logOffsetSupplier,
+            Supplier<TabletState> tabletStateSupplier,
             Consumer<Long> updateMinRetainOffset,
             Supplier<Integer> bucketLeaderEpochSupplier,
             Supplier<Integer> coordinatorEpochSupplier,
@@ -145,7 +145,7 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
         this.snapshotWriteBufferSize = snapshotWriteBufferSize;
         this.remoteFileSystem = remoteKvTabletDir.getFileSystem();
         this.snapshotIdCounter = snapshotIdCounter;
-        this.logOffsetSupplier = logOffsetSupplier;
+        this.tabletStateSupplier = tabletStateSupplier;
         this.updateMinRetainOffset = updateMinRetainOffset;
         this.bucketLeaderEpochSupplier = bucketLeaderEpochSupplier;
         this.coordinatorEpochSupplier = coordinatorEpochSupplier;
@@ -168,7 +168,8 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
 
     @Override
     public Optional<PeriodicSnapshotManager.SnapshotRunnable> initSnapshot() 
throws Exception {
-        long logOffset = logOffsetSupplier.get();
+        TabletState tabletState = tabletStateSupplier.get();
+        long logOffset = tabletState.getFlushedLogOffset();
         if (logOffset <= logOffsetOfLatestSnapshot) {
             LOG.debug(
                     "The current offset for the log whose kv data is flushed 
is {}, "
@@ -187,7 +188,8 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
         try {
             PeriodicSnapshotManager.SnapshotRunnable snapshotRunnable =
                     new PeriodicSnapshotManager.SnapshotRunnable(
-                            snapshotRunner.snapshot(currentSnapshotId, 
logOffset, snapshotLocation),
+                            snapshotRunner.snapshot(
+                                    currentSnapshotId, tabletState, 
snapshotLocation),
                             currentSnapshotId,
                             coordinatorEpoch,
                             bucketLeaderEpoch,
@@ -220,13 +222,16 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
             SnapshotLocation snapshotLocation,
             SnapshotResult snapshotResult)
             throws Throwable {
+        TabletState tabletState = snapshotResult.getTabletState();
         CompletedSnapshot completedSnapshot =
                 new CompletedSnapshot(
                         tableBucket,
                         snapshotId,
                         snapshotResult.getSnapshotPath(),
                         snapshotResult.getKvSnapshotHandle(),
-                        snapshotResult.getLogOffset());
+                        tabletState.getFlushedLogOffset(),
+                        tabletState.getRowCount(),
+                        tabletState.getAutoIncIDRanges());
         try {
             // commit the completed snapshot
             completedKvSnapshotCommitter.commitKvSnapshot(
@@ -270,12 +275,13 @@ public class KvTabletSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotT
      * minimum offset to retain.
      */
     private void updateStateOnCommitSuccess(long snapshotId, SnapshotResult 
snapshotResult) {
+        long flushedLogOffset = 
snapshotResult.getTabletState().getFlushedLogOffset();
         // notify the snapshot complete
         rocksIncrementalSnapshot.notifySnapshotComplete(snapshotId);
-        logOffsetOfLatestSnapshot = snapshotResult.getLogOffset();
+        logOffsetOfLatestSnapshot = flushedLogOffset;
         snapshotSize = snapshotResult.getSnapshotSize();
         // update LogTablet to notify the lowest offset that should be retained
-        updateMinRetainOffset.accept(snapshotResult.getLogOffset());
+        updateMinRetainOffset.accept(flushedLogOffset);
     }
 
     /**
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
index fbfa42e1f..3ae7318f6 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java
@@ -98,11 +98,11 @@ public class RocksIncrementalSnapshot implements 
AutoCloseable {
     public SnapshotResultSupplier asyncSnapshot(
             NativeRocksDBSnapshotResources snapshotResources,
             long snapshotId,
-            long logOffset,
+            TabletState tabletState,
             @Nonnull SnapshotLocation snapshotLocation) {
         return new RocksDBIncrementalSnapshotOperation(
                 snapshotId,
-                logOffset,
+                tabletState,
                 snapshotLocation,
                 snapshotResources.previousSnapshot,
                 snapshotResources.snapshotDirectory);
@@ -185,7 +185,7 @@ public class RocksIncrementalSnapshot implements 
AutoCloseable {
         private final File localSnapshotDirectory;
 
         private final long snapshotId;
-        private final long logOffset;
+        private final TabletState tabletState;
 
         /** The target snapshot location and factory that creates the output 
streams to DFS. */
         @Nonnull private final SnapshotLocation snapshotLocation;
@@ -194,12 +194,12 @@ public class RocksIncrementalSnapshot implements 
AutoCloseable {
 
         public RocksDBIncrementalSnapshotOperation(
                 long snapshotId,
-                long logOffset,
+                TabletState tabletState,
                 @Nonnull SnapshotLocation snapshotLocation,
                 PreviousSnapshot previousSnapshot,
                 File localSnapshotDirectory) {
             this.snapshotId = snapshotId;
-            this.logOffset = logOffset;
+            this.tabletState = tabletState;
             this.snapshotLocation = snapshotLocation;
             this.previousSnapshot = previousSnapshot;
             this.localSnapshotDirectory = localSnapshotDirectory;
@@ -223,7 +223,7 @@ public class RocksIncrementalSnapshot implements 
AutoCloseable {
                 final KvSnapshotHandle kvSnapshotHandle =
                         new KvSnapshotHandle(sstFiles, miscFiles, 
snapshotIncrementalSize);
                 return new SnapshotResult(
-                        kvSnapshotHandle, 
snapshotLocation.getSnapshotDirectory(), logOffset);
+                        kvSnapshotHandle, 
snapshotLocation.getSnapshotDirectory(), tabletState);
             } finally {
                 if (!completed) {
                     cleanupIncompleteSnapshot(tmpResourcesRegistry);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotResult.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotResult.java
index 13d1e75aa..b8bdafddb 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotResult.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotResult.java
@@ -33,12 +33,14 @@ public class SnapshotResult implements Serializable {
     private final KvSnapshotHandle kvSnapshotHandle;
 
     private final FsPath snapshotPath;
-    private final long logOffset;
 
-    public SnapshotResult(KvSnapshotHandle kvSnapshotHandle, FsPath 
snapshotPath, long logOffset) {
+    private final TabletState tabletState;
+
+    public SnapshotResult(
+            KvSnapshotHandle kvSnapshotHandle, FsPath snapshotPath, 
TabletState tabletState) {
         this.kvSnapshotHandle = kvSnapshotHandle;
         this.snapshotPath = snapshotPath;
-        this.logOffset = logOffset;
+        this.tabletState = tabletState;
     }
 
     public KvSnapshotHandle getKvSnapshotHandle() {
@@ -49,8 +51,8 @@ public class SnapshotResult implements Serializable {
         return snapshotPath;
     }
 
-    public long getLogOffset() {
-        return logOffset;
+    public TabletState getTabletState() {
+        return tabletState;
     }
 
     public long getSnapshotSize() {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotRunner.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotRunner.java
index 82a8cb291..830dd944a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotRunner.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotRunner.java
@@ -50,7 +50,7 @@ public class SnapshotRunner {
     }
 
     public RunnableFuture<SnapshotResult> snapshot(
-            long snapshotId, long logOffset, @Nonnull SnapshotLocation 
snapshotLocation)
+            long snapshotId, TabletState tabletState, @Nonnull 
SnapshotLocation snapshotLocation)
             throws Exception {
         long startTime = System.currentTimeMillis();
         RocksIncrementalSnapshot.NativeRocksDBSnapshotResources 
snapshotResources =
@@ -59,7 +59,7 @@ public class SnapshotRunner {
 
         SnapshotResultSupplier asyncSnapshot =
                 rocksIncrementalSnapshot.asyncSnapshot(
-                        snapshotResources, snapshotId, logOffset, 
snapshotLocation);
+                        snapshotResources, snapshotId, tabletState, 
snapshotLocation);
 
         return new AsyncSnapshotCallable<SnapshotResult>() {
             @Override
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/TabletState.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/TabletState.java
new file mode 100644
index 000000000..a5f19cee7
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/TabletState.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.snapshot;
+
+import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * TabletState represents the state of a kv tablet at a certain log offset. It 
contains the flushed
+ * log offset, the row count of the tablet at that log offset, and the 
auto-increment ID ranges of
+ * the tablet at that log offset. The row count and auto-increment ID ranges 
are optional, and may
+ * be null if the information is not available or not needed for a particular 
use case.
+ */
+public class TabletState {
+
+    private final long flushedLogOffset;
+    @Nullable private final Long rowCount;
+    @Nullable private final List<AutoIncIDRange> autoIncIDRanges;
+
+    public TabletState(
+            long flushedLogOffset,
+            @Nullable Long rowCount,
+            @Nullable List<AutoIncIDRange> autoIncIDRanges) {
+        this.flushedLogOffset = flushedLogOffset;
+        this.rowCount = rowCount;
+        this.autoIncIDRanges = autoIncIDRanges;
+    }
+
+    public long getFlushedLogOffset() {
+        return flushedLogOffset;
+    }
+
+    @Nullable
+    public Long getRowCount() {
+        return rowCount;
+    }
+
+    @Nullable
+    public List<AutoIncIDRange> getAutoIncIDRanges() {
+        return autoIncIDRanges;
+    }
+
+    @Override
+    public String toString() {
+        return "TabletState{"
+                + "flushedLogOffset="
+                + flushedLogOffset
+                + ", rowCount="
+                + rowCount
+                + ", autoIncIDRanges="
+                + autoIncIDRanges
+                + '}';
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 536f8b2be..8a1621a84 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -31,6 +31,7 @@ import org.apache.fluss.exception.NonPrimaryKeyTableException;
 import org.apache.fluss.exception.NotEnoughReplicasException;
 import org.apache.fluss.exception.NotLeaderOrFollowerException;
 import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.ChangelogImage;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.SchemaGetter;
@@ -52,6 +53,7 @@ import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
 import org.apache.fluss.server.kv.KvManager;
 import org.apache.fluss.server.kv.KvRecoverHelper;
 import org.apache.fluss.server.kv.KvTablet;
+import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
 import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
 import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -697,6 +699,8 @@ public final class Replica {
         long restoreStartOffset = 0;
         Optional<CompletedSnapshot> optCompletedSnapshot = 
getLatestSnapshot(tableBucket);
         try {
+            Long rowCount;
+            AutoIncIDRange autoIncIDRange;
             if (optCompletedSnapshot.isPresent()) {
                 LOG.info(
                         "Use snapshot {} to restore kv tablet for {} of table 
{}.",
@@ -714,6 +718,9 @@ public final class Replica {
 
                 checkNotNull(kvTablet, "kv tablet should not be null.");
                 restoreStartOffset = completedSnapshot.getLogOffset();
+                rowCount = completedSnapshot.getRowCount();
+                // currently, we only support one auto-increment column.
+                autoIncIDRange = completedSnapshot.getFirstAutoIncIDRange();
             } else {
                 LOG.info(
                         "No snapshot found for {} of {}, restore from log.",
@@ -731,10 +738,19 @@ public final class Replica {
                                 schemaGetter,
                                 tableConfig,
                                 arrowCompressionInfo);
+
+                // we don't support rowCount
+                rowCount = tableConfig.getChangelogImage() == 
ChangelogImage.WAL ? null : 0L;
+                // TODO: it is possible that this is a recovered kv tablet 
without kv snapshot but
+                //  with changelogs, in this case, the kv tablet should also 
have the
+                //  autoIncIDRange, we may need to get it from the changelog 
in the future.
+                //  but currently, we set it to null for simplification, as 
the auto-inc id
+                //  will be fetched from zk if not exist in local.
+                autoIncIDRange = null;
             }
 
             logTablet.updateMinRetainOffset(restoreStartOffset);
-            recoverKvTablet(restoreStartOffset);
+            recoverKvTablet(restoreStartOffset, rowCount, autoIncIDRange);
         } catch (Exception e) {
             throw new KvStorageException(
                     String.format(
@@ -800,14 +816,16 @@ public final class Replica {
         return Optional.empty();
     }
 
-    private void recoverKvTablet(long startRecoverLogOffset) {
+    private void recoverKvTablet(
+            long startRecoverLogOffset,
+            @Nullable Long rowCount,
+            @Nullable AutoIncIDRange autoIncIDRange) {
         long start = clock.milliseconds();
         checkNotNull(kvTablet, "kv tablet should not be null.");
         try {
             KvRecoverHelper.KvRecoverContext recoverContext =
                     new KvRecoverHelper.KvRecoverContext(
                             getTablePath(),
-                            tableBucket,
                             snapshotContext.getZooKeeperClient(),
                             snapshotContext.maxFetchLogSizeInRecoverKv());
             KvRecoverHelper kvRecoverHelper =
@@ -815,6 +833,8 @@ public final class Replica {
                             kvTablet,
                             logTablet,
                             startRecoverLogOffset,
+                            rowCount,
+                            autoIncIDRange,
                             recoverContext,
                             tableConfig.getKvFormat(),
                             tableConfig.getLogFormat(),
@@ -900,7 +920,7 @@ public final class Replica {
                             snapshotContext.getAsyncOperationsThreadPool(),
                             closeableRegistryForKv,
                             snapshotIDCounter,
-                            kvTablet::getFlushedLogOffset,
+                            kvTablet::getTabletState,
                             logTablet::updateMinRetainOffset,
                             bucketLeaderEpochSupplier,
                             coordinatorEpochSupplier,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
index c7c90ab6c..0070f1563 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
@@ -37,6 +37,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 class SegmentSequenceGeneratorTest {
 
     private static final TablePath TABLE_PATH = new TablePath("test_db", 
"test_table");
+    private static final int COLUMN_ID = 1;
     private static final String COLUMN_NAME = "id";
     private static final long CACHE_SIZE = 100;
 
@@ -52,6 +53,7 @@ class SegmentSequenceGeneratorTest {
         BoundedSegmentSequenceGenerator generator =
                 new BoundedSegmentSequenceGenerator(
                         TABLE_PATH,
+                        COLUMN_ID,
                         COLUMN_NAME,
                         new TestingSequenceIDCounter(snapshotIdGenerator),
                         CACHE_SIZE,
@@ -77,6 +79,7 @@ class SegmentSequenceGeneratorTest {
                                 BoundedSegmentSequenceGenerator generator =
                                         new BoundedSegmentSequenceGenerator(
                                                 new TablePath("test_db", 
"table1"),
+                                                COLUMN_ID,
                                                 COLUMN_NAME,
                                                 new 
TestingSequenceIDCounter(snapshotIdGenerator),
                                                 CACHE_SIZE,
@@ -103,6 +106,7 @@ class SegmentSequenceGeneratorTest {
         BoundedSegmentSequenceGenerator generator =
                 new BoundedSegmentSequenceGenerator(
                         new TablePath("test_db", "table1"),
+                        COLUMN_ID,
                         COLUMN_NAME,
                         new TestingSequenceIDCounter(snapshotIdGenerator, 2),
                         CACHE_SIZE,
@@ -125,6 +129,7 @@ class SegmentSequenceGeneratorTest {
         BoundedSegmentSequenceGenerator generator =
                 new BoundedSegmentSequenceGenerator(
                         new TablePath("test_db", "table1"),
+                        COLUMN_ID,
                         COLUMN_NAME,
                         new TestingSequenceIDCounter(snapshotIdGenerator),
                         CACHE_SIZE,
@@ -144,4 +149,37 @@ class SegmentSequenceGeneratorTest {
                                 "Reached maximum value of sequence \"<%s>\" 
(%d).",
                                 COLUMN_NAME, Integer.MAX_VALUE));
     }
+
+    @Test
+    void testUpdateIDRange() {
+        BoundedSegmentSequenceGenerator generator =
+                new BoundedSegmentSequenceGenerator(
+                        TABLE_PATH,
+                        COLUMN_ID,
+                        COLUMN_NAME,
+                        new TestingSequenceIDCounter(snapshotIdGenerator),
+                        CACHE_SIZE,
+                        Long.MAX_VALUE);
+        for (long i = 1; i <= CACHE_SIZE; i++) {
+            assertThat(generator.nextVal()).isEqualTo(i);
+        }
+
+        assertThat(generator.currentSequenceRange())
+                .isEqualTo(new AutoIncIDRange(COLUMN_ID, 101, CACHE_SIZE));
+
+        // update the ID range to [1000, 1100]
+        generator.updateSequenceRange(new AutoIncIDRange(COLUMN_ID, 1000, 
1100));
+
+        for (long i = 1000; i <= 1050; i++) {
+            assertThat(generator.nextVal()).isEqualTo(i);
+        }
+        assertThat(generator.currentSequenceRange())
+                .isEqualTo(new AutoIncIDRange(COLUMN_ID, 1051, 1100));
+
+        assertThatThrownBy(
+                        () -> generator.updateSequenceRange(new 
AutoIncIDRange(9999, 1000, 1100)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Column ID mismatch when updating sequence range. 
Expected column ID: 1, but got: 9999.");
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java
index 2bafcd670..bd2d764a7 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java
@@ -32,6 +32,7 @@ public class TestingSequenceGeneratorFactory implements 
SequenceGeneratorFactory
             TablePath tablePath, Schema.Column autoIncrementColumn, long 
idCacheSize) {
         return new BoundedSegmentSequenceGenerator(
                 tablePath,
+                autoIncrementColumn.getColumnId(),
                 autoIncrementColumn.getName(),
                 new TestingSequenceIDCounter(new AtomicLong(0)),
                 idCacheSize,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java
index d89087ab2..2bd933977 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java
@@ -19,9 +19,11 @@ package org.apache.fluss.server.kv.snapshot;
 
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
 import org.apache.fluss.utils.json.JsonSerdeTestBase;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /** Test for {@link 
org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde}. */
@@ -55,14 +57,18 @@ class CompletedSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<CompletedSnapshot
                         1,
                         new FsPath("oss://bucket/snapshot"),
                         new KvSnapshotHandle(sharedFileHandles, 
privateFileHandles, 5),
-                        10);
+                        10,
+                        null,
+                        null);
         CompletedSnapshot completedSnapshot2 =
                 new CompletedSnapshot(
                         new TableBucket(1, 10L, 1),
                         1,
                         new FsPath("oss://bucket/snapshot"),
                         new KvSnapshotHandle(sharedFileHandles, 
privateFileHandles, 5),
-                        10);
+                        10,
+                        1234L,
+                        Collections.singletonList(new AutoIncIDRange(2, 10000, 
20000)));
         return new CompletedSnapshot[] {completedSnapshot1, 
completedSnapshot2};
     }
 
@@ -88,7 +94,7 @@ class CompletedSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<CompletedSnapshot
                     + 
"{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/shared/t2.sst\",\"size\":2},\"local_path\":\"localPath2\"}],"
                     + 
"\"private_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t3\",\"size\":3},\"local_path\":\"localPath3\"},"
                     + 
"{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t4\",\"size\":4},\"local_path\":\"localPath4\"}],"
-                    + "\"snapshot_incremental_size\":5},\"log_offset\":10}"
+                    + 
"\"snapshot_incremental_size\":5},\"log_offset\":10,\"row_count\":1234,\"auto_inc_id_range\":[{\"column_id\":2,\"start\":10000,\"end\":20000}]}"
         };
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
index 98ca31605..714835ce5 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
@@ -501,7 +501,7 @@ class KvTabletSnapshotTargetTest {
                 executor,
                 cancelStreamRegistry,
                 testingSnapshotIdCounter,
-                logOffsetGenerator::get,
+                this::getCurrentTabletState,
                 updateMinRetainOffsetConsumer::set,
                 bucketLeaderEpochSupplier,
                 coordinatorEpochSupplier,
@@ -539,7 +539,7 @@ class KvTabletSnapshotTargetTest {
                 executor,
                 cancelStreamRegistry,
                 testingSnapshotIdCounter,
-                logOffsetGenerator::get,
+                this::getCurrentTabletState,
                 updateMinRetainOffsetConsumer::set,
                 bucketLeaderEpochSupplier,
                 coordinatorEpochSupplier,
@@ -547,6 +547,10 @@ class KvTabletSnapshotTargetTest {
                 0L);
     }
 
+    private TabletState getCurrentTabletState() {
+        return new TabletState(logOffsetGenerator.get(), null, null);
+    }
+
     private RocksIncrementalSnapshot 
createIncrementalSnapshot(SnapshotFailType snapshotFailType)
             throws IOException {
         long lastCompletedSnapshotId = -1L;
@@ -626,10 +630,11 @@ class KvTabletSnapshotTargetTest {
             this.snapshotFailType = snapshotFailType;
         }
 
+        @Override
         public SnapshotResultSupplier asyncSnapshot(
                 NativeRocksDBSnapshotResources snapshotResources,
                 long snapshotId,
-                long logOffset,
+                TabletState tabletState,
                 @Nonnull SnapshotLocation snapshotLocation) {
             if (snapshotFailType == SnapshotFailType.SYNC_PHASE) {
                 throw new FlussRuntimeException("Fail in snapshot sync 
phase.");
@@ -639,7 +644,7 @@ class KvTabletSnapshotTargetTest {
                 };
             } else {
                 return super.asyncSnapshot(
-                        snapshotResources, snapshotId, logOffset, 
snapshotLocation);
+                        snapshotResources, snapshotId, tabletState, 
snapshotLocation);
             }
         }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
index 31e53490d..c719c9683 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
@@ -223,8 +223,8 @@ class PeriodicSnapshotManagerTest {
                                 if (exceptionMessage != null) {
                                     throw new 
FlussRuntimeException(exceptionMessage);
                                 } else {
-                                    final long logOffset = 0;
-                                    return new SnapshotResult(null, 
snapshotPath, logOffset);
+                                    return new SnapshotResult(
+                                            null, snapshotPath, new 
TabletState(0, null, null));
                                 }
                             });
             int snapshotId = 1;
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java
index 2e4b4fbca..64398516a 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java
@@ -190,7 +190,11 @@ class RocksIncrementalSnapshotTest {
                 incrementalSnapshot.syncPrepareResources(snapshotId);
 
         return incrementalSnapshot
-                .asyncSnapshot(nativeRocksDBSnapshotResources, snapshotId, 0L, 
snapshotLocation)
+                .asyncSnapshot(
+                        nativeRocksDBSnapshotResources,
+                        snapshotId,
+                        new TabletState(0L, null, null),
+                        snapshotLocation)
                 .get(closeableRegistry)
                 .getKvSnapshotHandle();
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java
index 273858d31..1ff32c7e7 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java
@@ -129,7 +129,9 @@ public class KvTestUtils {
                                         tableBucket.getBucket(),
                                         snapshotId)),
                 new KvSnapshotHandle(Collections.emptyList(), 
Collections.emptyList(), 0),
-                0);
+                0,
+                null,
+                null);
     }
 
     public static int getKeyCounts(RocksDB rocksDB) {

Reply via email to