This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit dc8cc6514e5b6a4a5673ad4dcde1617322fac554
Author: Junbo Wang <[email protected]>
AuthorDate: Tue Jan 6 14:50:55 2026 +0800

    [kv] Add basic implementation of AUTO_INCREMENT column (#2161)
---
 .../fluss/client/table/FlussTableITCase.java       | 176 ++++++++++++++++++++
 .../org/apache/fluss/config/ConfigOptions.java     |  19 +--
 .../java/org/apache/fluss/config/TableConfig.java  |   5 +
 .../fluss/exception/SequenceOverflowException.java |  30 ++++
 .../org/apache/fluss/metadata/ChangelogImage.java  |  11 +-
 .../java/org/apache/fluss/metadata/Schema.java     |  28 +++-
 .../org/apache/fluss/metadata/TableSchemaTest.java |  14 ++
 .../fluss/utils/json/SchemaJsonSerdeTest.java      |   2 +-
 .../apache/fluss/flink/FlinkConnectorOptions.java  |  18 +-
 .../fluss/flink/catalog/FlinkTableFactory.java     |   1 +
 .../apache/fluss/flink/utils/FlinkConversions.java |  17 +-
 .../fluss/flink/catalog/FlinkCatalogITCase.java    |   7 +
 .../fluss/flink/sink/FlinkTableSinkITCase.java     |  58 +++++++
 .../org/apache/fluss/server/SequenceIDCounter.java |   8 +
 .../fluss/server/coordinator/SchemaUpdate.java     |   5 +
 .../java/org/apache/fluss/server/kv/KvManager.java |  13 +-
 .../java/org/apache/fluss/server/kv/KvTablet.java  |  47 ++++--
 .../server/kv/autoinc/AutoIncrementManager.java    | 128 +++++++++++++++
 .../server/kv/autoinc/AutoIncrementUpdater.java    |  46 ++++++
 .../autoinc/BoundedSegmentSequenceGenerator.java   | 119 ++++++++++++++
 .../kv/autoinc/PerSchemaAutoIncrementUpdater.java  | 112 +++++++++++++
 .../fluss/server/kv/autoinc/SequenceGenerator.java |  32 ++++
 .../fluss/server/zk/ZkSequenceIDCounter.java       |  25 ++-
 .../org/apache/fluss/server/zk/data/ZkData.java    |  22 +++
 .../org/apache/fluss/server/kv/KvTabletTest.java   |  11 +-
 .../kv/autoinc/SegmentSequenceGeneratorTest.java   | 182 +++++++++++++++++++++
 .../kv/snapshot/KvTabletSnapshotTargetTest.java    |   5 +
 website/docs/engine-flink/options.md               |  55 ++++---
 28 files changed, 1125 insertions(+), 71 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index a65529648..c21c6a8bb 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -48,6 +48,7 @@ import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
 import org.apache.fluss.row.indexed.IndexedRow;
 import org.apache.fluss.types.BigIntType;
 import org.apache.fluss.types.DataTypes;
@@ -454,6 +455,181 @@ class FlussTableITCase extends ClientToServerITCaseBase {
                                 + "because the lookup columns [b, a] must 
contain all bucket keys [a, b] in order.");
     }
 
+    @Test
+    void testSingleBucketPutAutoIncrementColumnAndLookup() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("col1", DataTypes.STRING())
+                        .withComment("col1 is first column")
+                        .column("col2", DataTypes.BIGINT())
+                        .withComment("col2 is second column, auto increment 
column")
+                        .column("col3", DataTypes.STRING())
+                        .withComment("col3 is third column")
+                        .enableAutoIncrement("col2")
+                        .primaryKey("col1")
+                        .build();
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder().schema(schema).distributedBy(1, 
"col1").build();
+        // create the table
+        TablePath tablePath =
+                TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), 
"test_pk_table_auto_inc");
+        createTable(tablePath, tableDescriptor, true);
+        Table autoIncTable = conn.getTable(tablePath);
+        Object[][] records = {
+            {"a", null, "batch1"},
+            {"b", null, "batch1"},
+            {"c", null, "batch1"},
+            {"d", null, "batch1"},
+            {"e", null, "batch1"},
+            {"d", null, "batch2"},
+            {"e", null, "batch2"}
+        };
+        partialUpdateRecords(new String[] {"col1", "col3"}, records, 
autoIncTable);
+
+        Object[][] expectedRecords = {
+            {"a", 1L, "batch1"},
+            {"b", 2L, "batch1"},
+            {"c", 3L, "batch1"},
+            {"d", 4L, "batch2"},
+            {"e", 5L, "batch2"}
+        };
+        verifyRecords(expectedRecords, autoIncTable, schema);
+
+        admin.alterTable(
+                        tablePath,
+                        Collections.singletonList(
+                                TableChange.addColumn(
+                                        "col4",
+                                        DataTypes.INT(),
+                                        null,
+                                        TableChange.ColumnPosition.last())),
+                        false)
+                .get();
+        Table newSchemaTable = conn.getTable(tablePath);
+        Schema newSchema = newSchemaTable.getTableInfo().getSchema();
+
+        // schema change case1: read new data with new schema.
+        Object[][] expectedRecordsWithOldSchema = {
+            {"a", 1L, "batch1"},
+            {"b", 2L, "batch1"},
+            {"c", 3L, "batch1"},
+            {"d", 4L, "batch2"},
+            {"e", 5L, "batch2"}
+        };
+        verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema);
+
+        // schema change case2: update new data with new schema.
+
+        Object[][] recordsWithNewSchema = {
+            {"a", null, "batch3", 10},
+            {"b", null, "batch3", 11}
+        };
+        partialUpdateRecords(
+                new String[] {"col1", "col3", "col4"}, recordsWithNewSchema, 
newSchemaTable);
+
+        // schema change case3: read data with old schema.
+        expectedRecordsWithOldSchema[0][2] = "batch3";
+        expectedRecordsWithOldSchema[1][2] = "batch3";
+        verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema);
+
+        // schema change case4: read data with new schema.
+        Object[][] expectedRecordsWithNewSchema = {
+            {"a", 1L, "batch3", 10},
+            {"b", 2L, "batch3", 11},
+            {"c", 3L, "batch1", null},
+            {"d", 4L, "batch2", null},
+            {"e", 5L, "batch2", null}
+        };
+        verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema);
+
+        // kill and restart all tablet server
+        for (int i = 0; i < 3; i++) {
+            FLUSS_CLUSTER_EXTENSION.stopTabletServer(i);
+            FLUSS_CLUSTER_EXTENSION.startTabletServer(i);
+        }
+
+        // reconnect fluss server
+        conn = ConnectionFactory.createConnection(clientConf);
+        newSchemaTable = conn.getTable(tablePath);
+        verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema);
+
+        Object[][] restartWriteRecords = {{"f", null, "batch4", 12}};
+        partialUpdateRecords(
+                new String[] {"col1", "col3", "col4"}, restartWriteRecords, 
newSchemaTable);
+
+        // The auto-increment column should start from a new segment for now, 
and local cached
+        // IDs have been discarded.
+        Object[][] expectedRestartWriteRecords = {{"f", 100001L, "batch4", 
12}};
+        verifyRecords(expectedRestartWriteRecords, newSchemaTable, newSchema);
+    }
+
+    @Test
+    void testPutAutoIncrementColumnAndLookup() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("dt", DataTypes.STRING())
+                        .column("col1", DataTypes.STRING())
+                        .withComment("col1 is first column")
+                        .column("col2", DataTypes.BIGINT())
+                        .withComment("col2 is second column, auto increment 
column")
+                        .column("col3", DataTypes.STRING())
+                        .withComment("col3 is third column")
+                        .enableAutoIncrement("col2")
+                        .primaryKey("dt", "col1")
+                        .build();
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .partitionedBy("dt")
+                        .distributedBy(2, "col1")
+                        .build();
+        // create the table
+        TablePath tablePath =
+                TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), 
"test_pk_table_auto_inc");
+        createTable(tablePath, tableDescriptor, true);
+        Table autoIncTable = conn.getTable(tablePath);
+        Object[][] records = {
+            {"2026-01-06", "a", null, "batch1"},
+            {"2026-01-06", "b", null, "batch1"},
+            {"2026-01-06", "c", null, "batch1"},
+            {"2026-01-06", "d", null, "batch1"},
+            {"2026-01-07", "e", null, "batch1"},
+            {"2026-01-06", "a", null, "batch2"},
+            {"2026-01-06", "b", null, "batch2"},
+        };
+
+        // upsert records with auto inc column col1 null value
+        partialUpdateRecords(new String[] {"dt", "col1", "col3"}, records, 
autoIncTable);
+        Object[][] expectedRecords = {
+            {"2026-01-06", "a", 1L, "batch2"},
+            {"2026-01-06", "b", 100001L, "batch2"},
+            {"2026-01-06", "c", 2L, "batch1"},
+            {"2026-01-06", "d", 100002L, "batch1"},
+            {"2026-01-07", "e", 200001L, "batch1"}
+        };
+        verifyRecords(expectedRecords, autoIncTable, schema);
+    }
+
+    private void partialUpdateRecords(String[] targetColumns, Object[][] 
records, Table table) {
+        UpsertWriter upsertWriter = 
table.newUpsert().partialUpdate(targetColumns).createWriter();
+        for (Object[] record : records) {
+            upsertWriter.upsert(row(record));
+            // flush immediately to ensure auto-increment values are assigned 
sequentially across
+            // multiple buckets.
+            upsertWriter.flush();
+        }
+    }
+
+    private void verifyRecords(Object[][] records, Table table, Schema schema) 
throws Exception {
+        Lookuper lookuper = table.newLookup().createLookuper();
+        ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
+        for (Object[] record : records) {
+            assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record))))
+                    .withSchema(schema.getRowType())
+                    .isEqualTo(row(record));
+        }
+    }
+
     @Test
     void testLookupForNotReadyTable() throws Exception {
         TablePath tablePath = TablePath.of("test_db_1", 
"test_lookup_unready_table_t1");
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 9f4b603e9..d3f7cd2ff 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1447,16 +1447,15 @@ public class ConfigOptions {
                                     + "For tables with FIRST_ROW, VERSIONED, 
or AGGREGATION merge engines, this option defaults to `ignore`. "
                                     + "Note: For AGGREGATION merge engine, 
when set to `allow`, delete operations will remove the entire record.");
 
-    public static final ConfigOption<String> TABLE_AUTO_INCREMENT_FIELDS =
-            key("table.auto-increment.fields")
-                    .stringType()
-                    .noDefaultValue()
+    public static final ConfigOption<Long> TABLE_AUTO_INCREMENT_CACHE_SIZE =
+            key("table.auto-increment.cache-size")
+                    .longType()
+                    .defaultValue(100000L)
                     .withDescription(
-                            "Defines the auto increment columns. "
-                                    + "The auto increment column can only be 
used in primary-key table."
-                                    + "With an auto increment column in the 
table, whenever a new row is inserted into the table, the new row will be 
assigned with the next available value from the auto-increment sequence."
-                                    + "The auto increment column can only be 
used in primary-key table. The data type of the auto increment column must be 
INT or BIGINT."
-                                    + "Currently a table can have only one 
auto-increment column.");
+                            "The cache size of auto-increment IDs fetched from 
the distributed counter each time. "
+                                    + "This value determines the length of the 
locally cached ID segment. Default: 100000. "
+                                    + "A larger cache size may cause 
significant auto-increment ID gaps, especially when unused cached ID segments 
are discarded due to TabletServer restarts or abnormal terminations. "
+                                    + "Conversely, a smaller cache size 
increases the frequency of ID fetch requests to the distributed counter, 
introducing extra network overhead and reducing write throughput and 
performance.");
 
     public static final ConfigOption<ChangelogImage> TABLE_CHANGELOG_IMAGE =
             key("table.changelog.image")
@@ -1468,7 +1467,7 @@ public class ConfigOptions {
                                     + "The supported modes are `FULL` 
(default) and `WAL`. "
                                     + "The `FULL` mode produces both 
UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing 
complete information about updates and allowing tracking of previous values. "
                                     + "The `WAL` mode does not produce 
UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) 
records are emitted. "
-                                    + "When WAL mode is enabled with default 
merge engine (no merge engine configured) and full row updates (not partial 
update), an optimization is applied to skip looking up old values, "
+                                    + "When WAL mode is enabled, the default 
merge engine is used (no merge engine configured), updates are full row updates 
(not partial update), and there is no auto-increment column, an optimization is 
applied to skip looking up old values, "
                                     + "and in this case INSERT operations are 
converted to UPDATE_AFTER events. "
                                     + "This mode reduces storage and 
transmission costs but loses the ability to track previous values. "
                                     + "This option only affects primary key 
tables.");
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java 
b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index 80d2ee8f7..86604b6be 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -140,4 +140,9 @@ public class TableConfig {
     public AutoPartitionStrategy getAutoPartitionStrategy() {
         return AutoPartitionStrategy.from(config);
     }
+
+    /** Gets the number of auto-increment IDs cached per segment. */
+    public Long getAutoIncrementCacheSize() {
+        return config.get(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE);
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/SequenceOverflowException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/SequenceOverflowException.java
new file mode 100644
index 000000000..4a54cc0b8
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/SequenceOverflowException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/**
+ * Exception for increment number overflow.
+ *
+ * @since 0.9
+ */
+public class SequenceOverflowException extends ApiException {
+    public SequenceOverflowException(String message) {
+        super(message);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java
index 02f7a5750..094b4601e 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java
@@ -36,11 +36,12 @@ public enum ChangelogImage {
 
     /**
      * WAL mode does not produce UPDATE_BEFORE records. Only INSERT, 
UPDATE_AFTER (and DELETE if
-     * allowed) records are emitted. When WAL mode is enabled with default 
merge engine (no merge
-     * engine configured) and full row updates (not partial update), an 
optimization is applied to
-     * skip looking up old values, and in this case INSERT operations are 
converted to UPDATE_AFTER
-     * events, similar to database WAL (Write-Ahead Log) behavior. This mode 
reduces storage and
-     * transmission costs but loses the ability to track previous values.
+     * allowed) records are emitted. When WAL mode is enabled, the default 
merge engine (no merge
+     * engine configured) us used, updates are full row (not partial update), 
and there is no
+     * auto-increment column, an optimization is applied to skip looking up 
old values, and in this
+     * case INSERT operations are converted to UPDATE_AFTER events, similar to 
database WAL
+     * (Write-Ahead Log) behavior. This mode reduces storage and transmission 
costs but loses the
+     * ability to track previous values.
      */
     WAL;
 
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index 21a3a3c93..4c1f1828d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -134,6 +134,18 @@ public final class Schema implements Serializable {
                 .orElseGet(() -> new int[0]);
     }
 
+    /** Returns the auto-increment columnIds, if any, otherwise returns an 
empty array. */
+    public int[] getAutoIncrementColumnIds() {
+        if (autoIncrementColumnNames.isEmpty()) {
+            return new int[0];
+        } else {
+            return getColumns().stream()
+                    .filter(column -> 
autoIncrementColumnNames.contains(column.getName()))
+                    .mapToInt(Column::getColumnId)
+                    .toArray();
+        }
+    }
+
     /** Returns the primary key column names, if any, otherwise returns an 
empty array. */
     public List<String> getPrimaryKeyColumnNames() {
         return 
getPrimaryKey().map(PrimaryKey::getColumnNames).orElse(Collections.emptyList());
@@ -176,6 +188,11 @@ public final class Schema implements Serializable {
         return columnNames;
     }
 
+    /** Returns the column name in given column index. */
+    public String getColumnName(int columnIndex) {
+        return columns.get(columnIndex).columnName;
+    }
+
     /** Returns the indexes of the fields in the schema. */
     public int[] getColumnIndexes(List<String> keyNames) {
         int[] keyIndexes = new int[keyNames.size()];
@@ -250,6 +267,13 @@ public final class Schema implements Serializable {
             if (schema.primaryKey != null) {
                 primaryKeyNamed(schema.primaryKey.constraintName, 
schema.primaryKey.columnNames);
             }
+            if (schema.autoIncrementColumnNames != null
+                    && !schema.autoIncrementColumnNames.isEmpty()) {
+                checkState(
+                        schema.autoIncrementColumnNames.size() == 1,
+                        "Multiple auto increment columns are not supported 
yet.");
+                enableAutoIncrement(schema.autoIncrementColumnNames.get(0));
+            }
             this.highestFieldId = new AtomicInteger(schema.highestFieldId);
             return this;
         }
@@ -748,9 +772,7 @@ public final class Schema implements Serializable {
             }
 
             // primary key and auto increment column should not nullable
-            if ((pkSet.contains(column.getName())
-                            || 
autoIncrementColumnNames.contains(column.getName()))
-                    && column.getDataType().isNullable()) {
+            if (pkSet.contains(column.getName()) && 
column.getDataType().isNullable()) {
                 newColumns.add(
                         new Column(
                                 column.getName(),
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java 
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
index 82e2fd76e..fe317b53d 100644
--- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
@@ -358,6 +358,20 @@ class TableSchemaTest {
         
assertThat(copied.getAggFunction("max_val").get()).isEqualTo(AggFunctions.MAX());
     }
 
+    @Test
+    void testSchemaFromSchemaPreservesAutoIncrement() {
+        Schema original =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("value", DataTypes.BIGINT())
+                        .primaryKey("id")
+                        .enableAutoIncrement("value")
+                        .build();
+
+        Schema copied = Schema.newBuilder().fromSchema(original).build();
+        
assertThat(copied.getAutoIncrementColumnNames()).isEqualTo(Arrays.asList("value"));
+    }
+
     @Test
     void testListaggWithCustomDelimiter() {
         // Test LISTAGG with default delimiter (comma)
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
index 54c5f69ea..410be3cad 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
@@ -100,7 +100,7 @@ public class SchemaJsonSerdeTest extends 
JsonSerdeTestBase<Schema> {
             
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c
 is third column\",\"id\":2}],\"highest_field_id\":2}";
 
     static final String SCHEMA_JSON_4 =
-            
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
 is third 
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
+            
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
 is third 
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
 
     static final String SCHEMA_JSON_5 =
             
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"ROW\",\"fields\":[{\"name\":\"c\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a
 is first 
column\",\"field_id\":2},{\"name\":\"d\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a
 is first column\",\"field_id\":3}]},\"comment\":\"b is second 
column\",\"id\":1}],\"highest_field_id\":3}";
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
index 9c2f7aafa..56e9f9fce 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
@@ -36,6 +36,18 @@ import static 
org.apache.flink.configuration.description.TextElement.text;
 /** Options for flink connector. */
 public class FlinkConnectorOptions {
 
+    public static final ConfigOption<String> AUTO_INCREMENT_FIELDS =
+            ConfigOptions.key("auto-increment.fields")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the auto increment columns. "
+                                    + "The auto increment column can only be 
used in primary-key table."
+                                    + "With an auto increment column in the 
table, whenever a new row is inserted into the table, the new row will be 
assigned with the next available value from the auto-increment sequence."
+                                    + "The auto increment column can only be 
used in primary-key table. The data type of the auto increment column must be 
INT or BIGINT."
+                                    + "Currently a table can have only one 
auto-increment column."
+                                    + "Adding an auto increment column to an 
existing table is not supported.");
+
     public static final ConfigOption<Integer> BUCKET_NUMBER =
             ConfigOptions.key("bucket.num")
                     .intType()
@@ -144,7 +156,11 @@ public class FlinkConnectorOptions {
     // 
--------------------------------------------------------------------------------------------
 
     public static final List<String> ALTER_DISALLOW_OPTIONS =
-            Arrays.asList(BUCKET_NUMBER.key(), BUCKET_KEY.key(), 
BOOTSTRAP_SERVERS.key());
+            Arrays.asList(
+                    AUTO_INCREMENT_FIELDS.key(),
+                    BUCKET_NUMBER.key(),
+                    BUCKET_KEY.key(),
+                    BOOTSTRAP_SERVERS.key());
 
     // 
-------------------------------------------------------------------------------------------
     // Only used internally to support materialized table
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index aba27735c..a52852e44 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -206,6 +206,7 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
         HashSet<ConfigOption<?>> options =
                 new HashSet<>(
                         Arrays.asList(
+                                FlinkConnectorOptions.AUTO_INCREMENT_FIELDS,
                                 FlinkConnectorOptions.BUCKET_KEY,
                                 FlinkConnectorOptions.BUCKET_NUMBER,
                                 FlinkConnectorOptions.SCAN_STARTUP_MODE,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 9aef6c931..5fc721c40 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -67,8 +67,8 @@ import static 
org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
 import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
-import static 
org.apache.fluss.config.ConfigOptions.TABLE_AUTO_INCREMENT_FIELDS;
 import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
+import static 
org.apache.fluss.flink.FlinkConnectorOptions.AUTO_INCREMENT_FIELDS;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
 import static 
org.apache.fluss.flink.FlinkConnectorOptions.MATERIALIZED_TABLE_DEFINITION_QUERY;
@@ -209,17 +209,18 @@ public class FlinkConversions {
                                     
.withComment(column.getComment().orElse(null));
                         });
 
-        // convert some flink options to fluss table configs.
-        Map<String, String> storageProperties =
-                convertFlinkOptionsToFlussTableProperties(flinkTableConf);
-
-        if (storageProperties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
+        // Configure auto-increment columns based on the 
'auto-increment.fields' option.
+        if (flinkTableConf.containsKey(AUTO_INCREMENT_FIELDS.key())) {
             for (String autoIncrementColumn :
-                    
storageProperties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
-                schemBuilder.enableAutoIncrement(autoIncrementColumn);
+                    flinkTableConf.get(AUTO_INCREMENT_FIELDS).split(",")) {
+                schemBuilder.enableAutoIncrement(autoIncrementColumn.trim());
             }
         }
 
+        // convert some flink options to fluss table configs.
+        Map<String, String> storageProperties =
+                convertFlinkOptionsToFlussTableProperties(flinkTableConf);
+
         // serialize computed column and watermark spec to custom properties
         Map<String, String> customProperties =
                 extractCustomProperties(flinkTableConf, storageProperties);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index e544bfd1e..f42ba30ae 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -290,6 +290,13 @@ abstract class FlinkCatalogITCase {
                 .isInstanceOf(InvalidConfigException.class)
                 .hasMessage(
                         "Property 'paimon.file.format' is not supported to 
alter which is for datalake table.");
+
+        String unSupportedDml7 =
+                "alter table test_alter_table_append_only set 
('auto-increment.fields' = 'b')";
+        assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml7))
+                .rootCause()
+                .isInstanceOf(CatalogException.class)
+                .hasMessage("The option 'auto-increment.fields' is not 
supported to alter yet.");
     }
 
     @Test
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 15e59295c..80a01b62c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1453,4 +1453,62 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         // Collect results with timeout
         assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults);
     }
+
+    @Test
+    void testWalModeWithAutoIncrement() throws Exception {
+        // use single parallelism to make result ordering stable
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+
+        String tableName = "wal_mode_pk_table";
+        // Create a table with WAL mode and auto increment column
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " id int not null,"
+                                + " auto_increment_id bigint,"
+                                + " amount bigint,"
+                                + " primary key (id) not enforced"
+                                + ") with ('table.changelog.image' = 'wal', 
'auto-increment.fields'='auto_increment_id')",
+                        tableName));
+
+        // Insert initial data
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s (id, amount) VALUES "
+                                        + "(1, 100), "
+                                        + "(2, 200), "
+                                        + "(3, 150), "
+                                        + "(4, 250)",
+                                tableName))
+                .await();
+
+        // Use batch mode to update and delete records
+
+        // Upsert data, not support update/delete rows in table with auto-inc 
column for now.
+        // TODO: Support Batch Update
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s (id, amount) VALUES " + "(1, 
120), " + "(3, 180)",
+                                tableName))
+                .await();
+
+        List<String> expectedResults =
+                Arrays.asList(
+                        "+I[1, 1, 100]",
+                        "+I[2, 2, 200]",
+                        "+I[3, 3, 150]",
+                        "+I[4, 4, 250]",
+                        "-U[1, 1, 100]",
+                        "+U[1, 1, 120]",
+                        "-U[3, 3, 150]",
+                        "+U[3, 3, 180]");
+
+        // Collect results with timeout
+        assertQueryResultExactOrder(
+                tEnv,
+                String.format(
+                        "SELECT id, auto_increment_id, amount FROM %s /*+ 
OPTIONS('scan.startup.mode' = 'earliest') */",
+                        tableName),
+                expectedResults);
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java 
b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
index 7145d9d7f..a5d3ff025 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java
@@ -26,4 +26,12 @@ public interface SequenceIDCounter {
      * @return The previous sequence ID
      */
     long getAndIncrement() throws Exception;
+
+    /**
+     * Atomically adds the given delta to the sequence ID.
+     *
+     * @param delta The delta to add
+     * @return The previous sequence ID
+     */
+    long getAndAdd(Long delta) throws Exception;
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index eff79605a..e3527cb88 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -47,12 +47,14 @@ public class SchemaUpdate {
     private final AtomicInteger highestFieldId;
     private final List<String> primaryKeys;
     private final Map<String, Schema.Column> existedColumns;
+    private final List<String> autoIncrementColumns;
 
     public SchemaUpdate(TableInfo tableInfo) {
         this.columns = new ArrayList<>();
         this.existedColumns = new HashMap<>();
         this.highestFieldId = new 
AtomicInteger(tableInfo.getSchema().getHighestFieldId());
         this.primaryKeys = tableInfo.getPrimaryKeys();
+        this.autoIncrementColumns = 
tableInfo.getSchema().getAutoIncrementColumnNames();
         this.columns.addAll(tableInfo.getSchema().getColumns());
         for (Schema.Column column : columns) {
             existedColumns.put(column.getName(), column);
@@ -67,6 +69,9 @@ public class SchemaUpdate {
         if (!primaryKeys.isEmpty()) {
             builder.primaryKey(primaryKeys);
         }
+        for (String autoIncrementColumn : autoIncrementColumns) {
+            builder.enableAutoIncrement(autoIncrementColumn);
+        }
 
         return builder.build();
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
index 1637af39d..8d99b461e 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
@@ -36,6 +36,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.TabletManagerBase;
+import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
 import org.apache.fluss.server.kv.rowmerger.RowMerger;
 import org.apache.fluss.server.log.LogManager;
 import org.apache.fluss.server.log.LogTablet;
@@ -248,6 +249,10 @@ public final class KvManager extends TabletManagerBase 
implements ServerReconfig
 
                     File tabletDir = getOrCreateTabletDir(tablePath, 
tableBucket);
                     RowMerger merger = RowMerger.create(tableConfig, kvFormat, 
schemaGetter);
+                    AutoIncrementManager autoIncrementManager =
+                            new AutoIncrementManager(
+                                    schemaGetter, tablePath.getTablePath(), 
tableConfig, zkClient);
+
                     KvTablet tablet =
                             KvTablet.create(
                                     tablePath,
@@ -263,7 +268,8 @@ public final class KvManager extends TabletManagerBase 
implements ServerReconfig
                                     arrowCompressionInfo,
                                     schemaGetter,
                                     tableConfig.getChangelogImage(),
-                                    sharedRocksDBRateLimiter);
+                                    sharedRocksDBRateLimiter,
+                                    autoIncrementManager);
                     currentKvs.put(tableBucket, tablet);
 
                     LOG.info(
@@ -358,6 +364,8 @@ public final class KvManager extends TabletManagerBase 
implements ServerReconfig
         TableConfig tableConfig = tableInfo.getTableConfig();
         RowMerger rowMerger =
                 RowMerger.create(tableConfig, tableConfig.getKvFormat(), 
schemaGetter);
+        AutoIncrementManager autoIncrementManager =
+                new AutoIncrementManager(schemaGetter, tablePath, tableConfig, 
zkClient);
         KvTablet kvTablet =
                 KvTablet.create(
                         physicalTablePath,
@@ -373,7 +381,8 @@ public final class KvManager extends TabletManagerBase 
implements ServerReconfig
                         tableConfig.getArrowCompressionInfo(),
                         schemaGetter,
                         tableConfig.getChangelogImage(),
-                        sharedRocksDBRateLimiter);
+                        sharedRocksDBRateLimiter,
+                        autoIncrementManager);
         if (this.currentKvs.containsKey(tableBucket)) {
             throw new IllegalStateException(
                     String.format(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index cfa691e66..79ab8eb21 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -45,6 +45,8 @@ import org.apache.fluss.row.PaddingRow;
 import org.apache.fluss.row.arrow.ArrowWriterPool;
 import org.apache.fluss.row.arrow.ArrowWriterProvider;
 import org.apache.fluss.row.encode.ValueDecoder;
+import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
+import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
 import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
@@ -114,6 +116,7 @@ public final class KvTablet {
     // defines how to merge rows on the same primary key
     private final RowMerger rowMerger;
     private final ArrowCompressionInfo arrowCompressionInfo;
+    private final AutoIncrementManager autoIncrementManager;
 
     private final SchemaGetter schemaGetter;
 
@@ -148,7 +151,8 @@ public final class KvTablet {
             ArrowCompressionInfo arrowCompressionInfo,
             SchemaGetter schemaGetter,
             ChangelogImage changelogImage,
-            @Nullable RocksDBStatistics rocksDBStatistics) {
+            @Nullable RocksDBStatistics rocksDBStatistics,
+            AutoIncrementManager autoIncrementManager) {
         this.physicalPath = physicalPath;
         this.tableBucket = tableBucket;
         this.logTablet = logTablet;
@@ -166,6 +170,7 @@ public final class KvTablet {
         this.schemaGetter = schemaGetter;
         this.changelogImage = changelogImage;
         this.rocksDBStatistics = rocksDBStatistics;
+        this.autoIncrementManager = autoIncrementManager;
     }
 
     public static KvTablet create(
@@ -182,7 +187,8 @@ public final class KvTablet {
             ArrowCompressionInfo arrowCompressionInfo,
             SchemaGetter schemaGetter,
             ChangelogImage changelogImage,
-            RateLimiter sharedRateLimiter)
+            RateLimiter sharedRateLimiter,
+            AutoIncrementManager autoIncrementManager)
             throws IOException {
         RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, 
sharedRateLimiter);
 
@@ -214,7 +220,8 @@ public final class KvTablet {
                 arrowCompressionInfo,
                 schemaGetter,
                 changelogImage,
-                rocksDBStatistics);
+                rocksDBStatistics,
+                autoIncrementManager);
     }
 
     private static RocksDBKv buildRocksDBKv(
@@ -302,6 +309,8 @@ public final class KvTablet {
                     RowMerger currentMerger =
                             rowMerger.configureTargetColumns(
                                     targetColumns, latestSchemaId, 
latestSchema);
+                    AutoIncrementUpdater currentAutoIncrementUpdater =
+                            autoIncrementManager.getUpdaterForSchema(kvFormat, 
latestSchemaId);
 
                     RowType latestRowType = latestSchema.getRowType();
                     WalBuilder walBuilder = createWalBuilder(latestSchemaId, 
latestRowType);
@@ -318,6 +327,7 @@ public final class KvTablet {
                                 kvRecords,
                                 kvRecords.schemaId(),
                                 currentMerger,
+                                currentAutoIncrementUpdater,
                                 walBuilder,
                                 latestSchemaRow,
                                 logEndOffsetOfPrevBatch);
@@ -371,6 +381,7 @@ public final class KvTablet {
             KvRecordBatch kvRecords,
             short schemaIdOfNewData,
             RowMerger currentMerger,
+            AutoIncrementUpdater autoIncrementUpdater,
             WalBuilder walBuilder,
             PaddingRow latestSchemaRow,
             long startLogOffset)
@@ -403,6 +414,7 @@ public final class KvTablet {
                                 key,
                                 currentValue,
                                 currentMerger,
+                                autoIncrementUpdater,
                                 valueDecoder,
                                 walBuilder,
                                 latestSchemaRow,
@@ -452,22 +464,31 @@ public final class KvTablet {
             KvPreWriteBuffer.Key key,
             BinaryValue currentValue,
             RowMerger currentMerger,
+            AutoIncrementUpdater autoIncrementUpdater,
             ValueDecoder valueDecoder,
             WalBuilder walBuilder,
             PaddingRow latestSchemaRow,
             long logOffset)
             throws Exception {
-        // Optimization: when using WAL mode and merger is DefaultRowMerger 
(full update, not
-        // partial update), we can skip fetching old value for better 
performance since it
-        // always returns new value. In this case, both INSERT and UPDATE will 
produce
-        // UPDATE_AFTER.
-        if (changelogImage == ChangelogImage.WAL && currentMerger instanceof 
DefaultRowMerger) {
+        // Optimization: IN WAL mode,when using DefaultRowMerger (full update, 
not partial update)
+        // and there is no auto-increment column, we can skip fetching old 
value for better
+        // performance since the result always reflects the new value. In this 
case, both INSERT and
+        // UPDATE will produce UPDATE_AFTER.
+        if (changelogImage == ChangelogImage.WAL
+                && !autoIncrementUpdater.hasAutoIncrement()
+                && currentMerger instanceof DefaultRowMerger) {
             return applyUpdate(key, null, currentValue, walBuilder, 
latestSchemaRow, logOffset);
         }
 
         byte[] oldValueBytes = getFromBufferOrKv(key);
         if (oldValueBytes == null) {
-            return applyInsert(key, currentValue, walBuilder, latestSchemaRow, 
logOffset);
+            return applyInsert(
+                    key,
+                    currentValue,
+                    walBuilder,
+                    latestSchemaRow,
+                    logOffset,
+                    autoIncrementUpdater);
         }
 
         BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
@@ -498,10 +519,12 @@ public final class KvTablet {
             BinaryValue currentValue,
             WalBuilder walBuilder,
             PaddingRow latestSchemaRow,
-            long logOffset)
+            long logOffset,
+            AutoIncrementUpdater autoIncrementUpdater)
             throws Exception {
-        walBuilder.append(ChangeType.INSERT, 
latestSchemaRow.replaceRow(currentValue.row));
-        kvPreWriteBuffer.put(key, currentValue.encodeValue(), logOffset);
+        BinaryValue newValue = 
autoIncrementUpdater.updateAutoIncrementColumns(currentValue);
+        walBuilder.append(ChangeType.INSERT, 
latestSchemaRow.replaceRow(newValue.row));
+        kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset);
         return logOffset + 1;
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
new file mode 100644
index 000000000..1c1d36a42
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.types.DataTypeRoot;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Manages auto-increment logic for tables, providing schema-specific updaters 
that handle
+ * auto-increment column assignment during row writes.
+ */
+@NotThreadSafe
+public class AutoIncrementManager {
+    // No-op implementation that returns the input unchanged.
+    public static final AutoIncrementUpdater NO_OP_UPDATER = rowValue -> 
rowValue;
+
+    private final SchemaGetter schemaGetter;
+    private final Cache<Integer, AutoIncrementUpdater> 
autoIncrementUpdaterCache;
+    private final int autoIncrementColumnId;
+    private final SequenceGenerator sequenceGenerator;
+
+    public AutoIncrementManager(
+            SchemaGetter schemaGetter,
+            TablePath tablePath,
+            TableConfig tableConf,
+            ZooKeeperClient zkClient) {
+        this.autoIncrementUpdaterCache =
+                Caffeine.newBuilder()
+                        .maximumSize(5)
+                        .expireAfterAccess(Duration.ofMinutes(5))
+                        .build();
+        this.schemaGetter = schemaGetter;
+        int schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
+        Schema schema = schemaGetter.getSchema(schemaId);
+        int[] autoIncrementColumnIds = schema.getAutoIncrementColumnIds();
+
+        checkState(
+                autoIncrementColumnIds.length <= 1,
+                "Only support one auto increment column for a table, but got 
%d.",
+                autoIncrementColumnIds.length);
+
+        if (autoIncrementColumnIds.length == 1) {
+            autoIncrementColumnId = autoIncrementColumnIds[0];
+            boolean requiresIntOverflowCheck =
+                    schema.getRowType()
+                            
.getField(schema.getColumnName(autoIncrementColumnId))
+                            .getType()
+                            .is(DataTypeRoot.INTEGER);
+            sequenceGenerator =
+                    new BoundedSegmentSequenceGenerator(
+                            tablePath,
+                            schema.getColumnName(autoIncrementColumnId),
+                            new ZkSequenceIDCounter(
+                                    zkClient.getCuratorClient(),
+                                    ZkData.AutoIncrementColumnZNode.path(
+                                            tablePath, autoIncrementColumnId)),
+                            tableConf,
+                            requiresIntOverflowCheck ? Integer.MAX_VALUE : 
Long.MAX_VALUE);
+        } else {
+            autoIncrementColumnId = -1;
+            sequenceGenerator = null;
+        }
+    }
+
+    // Supports removing or reordering columns; does NOT support adding an 
auto-increment column to
+    // an existing table.
+    public AutoIncrementUpdater getUpdaterForSchema(KvFormat kvFormat, int 
latestSchemaId) {
+        return autoIncrementUpdaterCache.get(
+                latestSchemaId, k -> createAutoIncrementUpdater(kvFormat, k));
+    }
+
+    private AutoIncrementUpdater createAutoIncrementUpdater(KvFormat kvFormat, 
int schemaId) {
+        Schema schema = schemaGetter.getSchema(schemaId);
+        int[] autoIncrementColumnIds = schema.getAutoIncrementColumnIds();
+        if (autoIncrementColumnId == -1) {
+            checkState(
+                    autoIncrementColumnIds.length == 0,
+                    "Cannot add auto-increment column after table creation.");
+        } else {
+            checkState(
+                    autoIncrementColumnIds.length == 1
+                            && autoIncrementColumnIds[0] == 
autoIncrementColumnId,
+                    "Auto-increment column cannot be changed after table 
creation.");
+        }
+        if (autoIncrementColumnIds.length == 1) {
+            return new PerSchemaAutoIncrementUpdater(
+                    kvFormat,
+                    (short) schemaId,
+                    schema,
+                    autoIncrementColumnIds[0],
+                    sequenceGenerator);
+        } else {
+            return NO_OP_UPDATER;
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java
new file mode 100644
index 000000000..4451cc0f0
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.record.BinaryValue;
+
+/** A updater to auto increment column . */
+public interface AutoIncrementUpdater {
+
+    /**
+     * Updates the auto-increment column in the given row by replacing its 
value with a new sequence
+     * number.
+     *
+     * <p>This method may return a new {@link BinaryValue} instance or the 
same instance if no
+     * update is needed (e.g., in a no-op implementation).
+     *
+     * @param rowValue the input row in binary form, must not be {@code null}
+     * @return a {@link BinaryValue} representing the updated row; never 
{@code null}
+     */
+    BinaryValue updateAutoIncrementColumns(BinaryValue rowValue);
+
+    /**
+     * Returns whether this updater actually performs auto-increment logic.
+     *
+     * @return {@code true} if auto-increment is active; {@code false} 
otherwise.
+     */
+    default boolean hasAutoIncrement() {
+        return false;
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
new file mode 100644
index 000000000..f47831ae4
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.SequenceOverflowException;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.SequenceIDCounter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/** Segment ID generator, fetch ID with a batch size. */
+@NotThreadSafe
+public class BoundedSegmentSequenceGenerator implements SequenceGenerator {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BoundedSegmentSequenceGenerator.class);
+
+    private final SequenceIDCounter sequenceIDCounter;
+    private final TablePath tablePath;
+    private final String columnName;
+    private final long cacheSize;
+    private final long maxAllowedValue;
+
+    private AutoIncIdSegment segment;
+
+    public BoundedSegmentSequenceGenerator(
+            TablePath tablePath,
+            String columnName,
+            SequenceIDCounter sequenceIDCounter,
+            TableConfig tableConf,
+            long maxAllowedValue) {
+        this.cacheSize = tableConf.getAutoIncrementCacheSize();
+        this.columnName = columnName;
+        this.tablePath = tablePath;
+        this.sequenceIDCounter = sequenceIDCounter;
+        this.segment = AutoIncIdSegment.EMPTY;
+        this.maxAllowedValue = maxAllowedValue;
+    }
+
+    private void fetchSegment() {
+        try {
+            long start = sequenceIDCounter.getAndAdd(cacheSize);
+            if (start >= maxAllowedValue) {
+                throw new SequenceOverflowException(
+                        String.format(
+                                "Reached maximum value of sequence \"<%s>\" 
(%d).",
+                                columnName, maxAllowedValue));
+            }
+
+            long actualEnd = Math.min(start + cacheSize, maxAllowedValue - 1);
+            LOG.info(
+                    "Successfully fetch auto-increment values range ({}, {}], 
table_path={}, column_name={}.",
+                    start,
+                    actualEnd,
+                    tablePath,
+                    columnName);
+            segment = new AutoIncIdSegment(start, actualEnd - start);
+        } catch (SequenceOverflowException sequenceOverflowException) {
+            throw sequenceOverflowException;
+        } catch (Exception e) {
+            throw new FlussRuntimeException(
+                    String.format(
+                            "Failed to fetch auto-increment values, 
table_path=%s, column_name=%s.",
+                            tablePath, columnName),
+                    e);
+        }
+    }
+
+    @Override
+    public long nextVal() {
+        if (segment.remaining() <= 0) {
+            fetchSegment();
+        }
+        return segment.tryNextVal();
+    }
+
+    private static class AutoIncIdSegment {
+        private static final AutoIncIdSegment EMPTY = new AutoIncIdSegment(0, 
0);
+        private long current;
+        private final long end;
+
+        public AutoIncIdSegment(long start, long length) {
+            this.end = start + length;
+            this.current = start;
+        }
+
+        public long remaining() {
+            return end - current;
+        }
+
+        public long tryNextVal() {
+            long id = ++current;
+            if (id > end) {
+                throw new IllegalStateException("No more IDs available in 
current segment.");
+            }
+            return id;
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
new file mode 100644
index 000000000..ff70f5783
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.exception.SequenceOverflowException;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.function.LongSupplier;
+
+/**
+ * An {@link AutoIncrementUpdater} implementation that assigns auto-increment 
values to a specific
+ * column based on a fixed schema. It is bound to a particular schema version 
and assumes the
+ * auto-increment column position remains constant within that schema.
+ *
+ * <p>This class is not thread-safe and is intended to be used within a 
single-threaded execution
+ * context.
+ */
+@NotThreadSafe
+public class PerSchemaAutoIncrementUpdater implements AutoIncrementUpdater {
+    private final InternalRow.FieldGetter[] flussFieldGetters;
+    private final RowEncoder rowEncoder;
+    private final int fieldLength;
+    private final int targetColumnIdx;
+    private final LongSupplier idSupplier;
+    private final short schemaId;
+    private final String targetColumnName;
+
+    public PerSchemaAutoIncrementUpdater(
+            KvFormat kvFormat,
+            short schemaId,
+            Schema schema,
+            int autoIncrementColumnId,
+            SequenceGenerator sequenceGenerator) {
+        DataType[] fieldDataTypes = 
schema.getRowType().getChildren().toArray(new DataType[0]);
+
+        fieldLength = fieldDataTypes.length;
+        // getter for the fields in row
+        InternalRow.FieldGetter[] flussFieldGetters = new 
InternalRow.FieldGetter[fieldLength];
+        for (int i = 0; i < fieldLength; i++) {
+            flussFieldGetters[i] = 
InternalRow.createFieldGetter(fieldDataTypes[i], i);
+        }
+        this.schemaId = schemaId;
+        this.targetColumnIdx = 
schema.getColumnIds().indexOf(autoIncrementColumnId);
+        this.targetColumnName = schema.getColumnName(targetColumnIdx);
+        if (targetColumnIdx == -1) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Auto-increment column ID %d not found in schema 
columns: %s",
+                            autoIncrementColumnId, schema.getColumnIds()));
+        }
+
+        if (fieldDataTypes[targetColumnIdx].is(DataTypeRoot.INTEGER)) {
+            this.idSupplier = () -> 
checkedNextInt(sequenceGenerator.nextVal());
+        } else {
+            this.idSupplier = sequenceGenerator::nextVal;
+        }
+        this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
+        this.flussFieldGetters = flussFieldGetters;
+    }
+
+    private long checkedNextInt(long value) {
+        if (value > Integer.MAX_VALUE) {
+            throw new SequenceOverflowException(
+                    String.format(
+                            "Reached maximum value of sequence \"<%s>\" 
(2147483647).",
+                            targetColumnName));
+        }
+        return value;
+    }
+
+    public BinaryValue updateAutoIncrementColumns(BinaryValue rowValue) {
+        rowEncoder.startNewRow();
+        for (int i = 0; i < fieldLength; i++) {
+            if (targetColumnIdx == i) {
+                rowEncoder.encodeField(i, idSupplier.getAsLong());
+            } else {
+                // use the row value
+                rowEncoder.encodeField(i, 
flussFieldGetters[i].getFieldOrNull(rowValue.row));
+            }
+        }
+        return new BinaryValue(schemaId, rowEncoder.finishRow());
+    }
+
+    @Override
+    public boolean hasAutoIncrement() {
+        return true;
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java
new file mode 100644
index 000000000..d448ffa4c
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+/** SequenceGenerator is used to generate auto increment column ID. */
+public interface SequenceGenerator {
+
+    /**
+     * Retrieves the next sequential value for the auto-increment column.
+     *
+     * <p>This method provides the next available value for auto-increment 
columns.
+     *
+     * @return the next sequential value of the auto-increment column
+     */
+    long nextVal();
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
index acbecfa63..7781340bc 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java
@@ -34,9 +34,11 @@ public class ZkSequenceIDCounter implements 
SequenceIDCounter {
     private static final int BASE_SLEEP_MS = 100;
     private static final int MAX_SLEEP_MS = 1000;
 
+    private final String sequenceIDPath;
     private final DistributedAtomicLong sequenceIdCounter;
 
     public ZkSequenceIDCounter(CuratorFramework curatorClient, String 
sequenceIDPath) {
+        this.sequenceIDPath = sequenceIDPath;
         sequenceIdCounter =
                 new DistributedAtomicLong(
                         curatorClient,
@@ -56,7 +58,28 @@ public class ZkSequenceIDCounter implements 
SequenceIDCounter {
         if (incrementValue.succeeded()) {
             return incrementValue.preValue();
         } else {
-            throw new Exception("Failed to increment sequence id counter.");
+            throw new Exception(
+                    String.format(
+                            "Failed to increment sequence id counter. 
ZooKeeper sequence ID path: %s.",
+                            sequenceIDPath));
+        }
+    }
+
+    /**
+     * Atomically adds the given delta to the current sequence ID.
+     *
+     * @return The previous sequence ID
+     */
+    @Override
+    public long getAndAdd(Long delta) throws Exception {
+        AtomicValue<Long> incrementValue = sequenceIdCounter.add(delta);
+        if (incrementValue.succeeded()) {
+            return incrementValue.preValue();
+        } else {
+            throw new Exception(
+                    String.format(
+                            "Failed to increment sequence id counter. 
ZooKeeper sequence ID path: %s, Delta value: %d.",
+                            sequenceIDPath, delta));
         }
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
index 59a8644c8..467245567 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
@@ -252,6 +252,28 @@ public final class ZkData {
         }
     }
 
+    /**
+     * The znode for auto increment columns of a table. The znode path is:
+     *
+     * <p>/metadata/databases/[databaseName]/tables/[tableName]/auto_inc
+     */
+    public static final class AutoIncrementColumnsZNode {
+        public static String path(TablePath tablePath) {
+            return TableZNode.path(tablePath) + "/auto_inc";
+        }
+    }
+
+    /**
+     * The znode for auto increment column. The znode path is:
+     *
+     * 
<p>/metadata/databases/[databaseName]/tables/[tableName]/auto_inc/col_[columnId]
+     */
+    public static final class AutoIncrementColumnZNode {
+        public static String path(TablePath tablePath, int columnId) {
+            return AutoIncrementColumnsZNode.path(tablePath) + 
String.format("/col_%d", columnId);
+        }
+    }
+
     /**
      * The znode used to generate a sequence unique id for a partition. The 
znode path is:
      *
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 52b1080ed..8c0a6b864 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -45,6 +45,7 @@ import org.apache.fluss.record.TestingSchemaGetter;
 import org.apache.fluss.record.bytesview.MultiBytesView;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.encode.ValueEncoder;
+import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value;
@@ -181,6 +182,13 @@ class KvTabletTest {
             throws Exception {
         TableConfig tableConf = new 
TableConfig(Configuration.fromMap(tableConfig));
         RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, 
schemaGetter);
+        AutoIncrementManager autoIncrementManager =
+                new AutoIncrementManager(
+                        schemaGetter,
+                        tablePath.getTablePath(),
+                        new TableConfig(new Configuration()),
+                        null);
+
         return KvTablet.create(
                 tablePath,
                 tableBucket,
@@ -195,7 +203,8 @@ class KvTabletTest {
                 DEFAULT_COMPRESSION,
                 schemaGetter,
                 tableConf.getChangelogImage(),
-                KvManager.getDefaultRateLimiter());
+                KvManager.getDefaultRateLimiter(),
+                autoIncrementManager);
     }
 
     @Test
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
new file mode 100644
index 000000000..685c2d539
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.SequenceOverflowException;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.SequenceIDCounter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test class for {@link BoundedSegmentSequenceGenerator}. */
+class SegmentSequenceGeneratorTest {
+
+    private static final TablePath TABLE_PATH = new TablePath("test_db", 
"test_table");
+    private static final String COLUMN_NAME = "id";
+    private static final long CACHE_SIZE = 100;
+
+    private AtomicLong snapshotIdGenerator;
+    private Configuration configuration;
+    private TableConfig tableConfig;
+
+    @BeforeEach
+    void setUp() {
+        snapshotIdGenerator = new AtomicLong(0);
+        Map<String, String> map = new HashMap<>();
+        map.put(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE.key(), 
String.valueOf(CACHE_SIZE));
+        configuration = Configuration.fromMap(map);
+        tableConfig = new TableConfig(configuration);
+    }
+
+    @Test
+    void testNextValBasicContinuousId() {
+        BoundedSegmentSequenceGenerator generator =
+                new BoundedSegmentSequenceGenerator(
+                        TABLE_PATH,
+                        COLUMN_NAME,
+                        new TestingSnapshotIDCounter(snapshotIdGenerator),
+                        new TableConfig(configuration),
+                        Long.MAX_VALUE);
+        for (long i = 1; i <= CACHE_SIZE; i++) {
+            assertThat(generator.nextVal()).isEqualTo(i);
+        }
+
+        for (long i = CACHE_SIZE + 1; i <= 2 * CACHE_SIZE; i++) {
+            assertThat(generator.nextVal()).isEqualTo(i);
+        }
+    }
+
+    @Test
+    void testMultiGenerator() throws InterruptedException {
+        ConcurrentLinkedDeque<Long> linkedDeque = new 
ConcurrentLinkedDeque<>();
+        List<Thread> threads = new ArrayList<>();
+
+        for (int i = 0; i < 20; i++) {
+            Thread thread =
+                    new Thread(
+                            () -> {
+                                BoundedSegmentSequenceGenerator generator =
+                                        new BoundedSegmentSequenceGenerator(
+                                                new TablePath("test_db", 
"table1"),
+                                                COLUMN_NAME,
+                                                new 
TestingSnapshotIDCounter(snapshotIdGenerator),
+                                                tableConfig,
+                                                Long.MAX_VALUE);
+                                for (int j = 0; j < 130; j++) {
+                                    linkedDeque.add(generator.nextVal());
+                                }
+                            });
+            threads.add(thread);
+            thread.start();
+        }
+
+        for (Thread t : threads) {
+            t.join();
+        }
+
+        
assertThat(linkedDeque.stream().mapToLong(Long::longValue).max().orElse(0))
+                .isLessThanOrEqualTo(40 * CACHE_SIZE);
+        assertThat(linkedDeque.stream().distinct().count()).isEqualTo(130 * 
20);
+    }
+
+    @Test
+    void testFetchFailed() {
+        BoundedSegmentSequenceGenerator generator =
+                new BoundedSegmentSequenceGenerator(
+                        new TablePath("test_db", "table1"),
+                        COLUMN_NAME,
+                        new TestingSnapshotIDCounter(snapshotIdGenerator, 2),
+                        tableConfig,
+                        Long.MAX_VALUE);
+        for (int j = 1; j <= CACHE_SIZE; j++) {
+            assertThat(generator.nextVal()).isEqualTo(j);
+        }
+        assertThatThrownBy(generator::nextVal)
+                .isInstanceOf(FlussRuntimeException.class)
+                .hasMessage(
+                        String.format(
+                                "Failed to fetch auto-increment values, 
table_path=%s, column_name=%s.",
+                                "test_db.table1", COLUMN_NAME));
+    }
+
+    @Test
+    void testFetchIdOverFlow() {
+        BoundedSegmentSequenceGenerator generator =
+                new BoundedSegmentSequenceGenerator(
+                        new TablePath("test_db", "table1"),
+                        COLUMN_NAME,
+                        new TestingSnapshotIDCounter(snapshotIdGenerator),
+                        tableConfig,
+                        CACHE_SIZE + 9);
+        for (int j = 1; j < CACHE_SIZE + 9; j++) {
+            assertThat(generator.nextVal()).isEqualTo(j);
+        }
+        assertThatThrownBy(generator::nextVal)
+                .isInstanceOf(SequenceOverflowException.class)
+                .hasMessage(
+                        String.format(
+                                "Reached maximum value of sequence \"<%s>\" 
(%d).",
+                                COLUMN_NAME, CACHE_SIZE + 9));
+    }
+
+    private static class TestingSnapshotIDCounter implements SequenceIDCounter 
{
+
+        private final AtomicLong snapshotIdGenerator;
+        private int fetchTime;
+        private final int failedTrigger;
+
+        public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator) {
+            this(snapshotIdGenerator, Integer.MAX_VALUE);
+        }
+
+        public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator, int 
failedTrigger) {
+            this.snapshotIdGenerator = snapshotIdGenerator;
+            fetchTime = 0;
+            this.failedTrigger = failedTrigger;
+        }
+
+        @Override
+        public long getAndIncrement() {
+            return snapshotIdGenerator.getAndIncrement();
+        }
+
+        @Override
+        public long getAndAdd(Long delta) {
+            if (++fetchTime < failedTrigger) {
+                return snapshotIdGenerator.getAndAdd(delta);
+            }
+            throw new RuntimeException("Failed to get snapshot ID");
+        }
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
index 9f9ea078d..9f26bc772 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
@@ -659,6 +659,11 @@ class KvTabletSnapshotTargetTest {
         public long getAndIncrement() {
             return snapshotIdGenerator.getAndIncrement();
         }
+
+        @Override
+        public long getAndAdd(Long delta) {
+            return snapshotIdGenerator.getAndAdd(delta);
+        }
     }
 
     private enum SnapshotFailType {
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index fb456b45e..8734396f3 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -61,33 +61,34 @@ See more details about [ALTER TABLE ... 
SET](engine-flink/ddl.md#set-properties)
 
 ## Storage Options
 
-| Option                                  | Type     | Default                 
            | Description                                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|-----------------------------------------|----------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| bucket.num                              | int      | The bucket number of 
Fluss cluster. | The number of buckets of a Fluss table.                        
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
-| bucket.key                              | String   | (None)                  
            | Specific the distribution policy of the Fluss table. Data will be 
distributed to each bucket according to the hash value of bucket-key (It must 
be a subset of the primary keys excluding partition keys of the primary key 
table). If you specify multiple fields, delimiter is `,`. If the table has a 
primary key and a bucket key is not specified, the bucket key will be used as 
primary key(excluding th [...]
-| table.log.ttl                           | Duration | 7 days                  
            | The time to live for log segments. The configuration controls the 
maximum time we will retain a log before we will delete old segments to free up 
space. If set to -1, the log will not be deleted.                               
                                                                                
                                                                                
              [...]
-| table.auto-partition.enabled            | Boolean  | false                   
            | Whether enable auto partition for the table. Disable by default. 
When auto partition is enabled, the partitions of the table will be created 
automatically.                                                                  
                                                                                
                                                                                
                   [...]
-| table.auto-partition.key                | String   | (None)                  
            | This configuration defines the time-based partition key to be 
used for auto-partitioning when a table is partitioned with multiple keys. 
Auto-partitioning utilizes a time-based partition key to handle partitions 
automatically, including creating new ones and removing outdated ones, by 
comparing the time value of the partition with the current system time. In the 
case of a table using multiple par [...]
-| table.auto-partition.time-unit          | ENUM     | DAY                     
            | The time granularity for auto created partitions. The default 
value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If 
the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If 
the value is `DAY`, the partition format for auto created is yyyyMMdd. If the 
value is `MONTH`, the partition format for auto created is yyyyMM. If the value 
is `QUARTER`, the parti [...]
-| table.auto-partition.num-precreate      | Integer  | 2                       
            | The number of partitions to pre-create for auto created 
partitions in each check for auto partition. For example, if the current check 
time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 
20241112, 20241113 will be pre-created. If any one partition exists, it'll skip 
creating the partition. The default value is 2, which means 2 partitions will 
be pre-created. If the `tab [...]
-| table.auto-partition.num-retention      | Integer  | 7                       
            | The number of history partitions to retain for auto created 
partitions in each check for auto partition. For example, if the current check 
time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then 
the history partitions 20241108, 20241109, 20241110 will be retained. The 
partitions earlier than 20241108 will be deleted. The default value is 7, which 
means that 7 partitions will  [...]
-| table.auto-partition.time-zone          | String   | the system time zone    
            | The time zone for auto partitions, which is by default the same 
as the system time zone.                                                        
                                                                                
                                                                                
                                                                                
                [...]
-| table.replication.factor                | Integer  | (None)                  
            | The replication factor for the log of the new table. When it's 
not set, Fluss will use the cluster's default replication factor configured by 
default.replication.factor. It should be a positive number and not larger than 
the number of tablet servers in the Fluss cluster. A value larger than the 
number of tablet servers in Fluss cluster will result in an error when the new 
table is created.        [...]
-| table.log.format                        | Enum     | ARROW                   
            | The format of the log records in log store. The default value is 
`ARROW`. The supported formats are `ARROW`, `INDEXED` and `COMPACTED`.          
                                                                                
                                                                                
                                                                                
               [...]
-| table.log.arrow.compression.type        | Enum     | ZSTD                    
            | The compression type of the log records if the log format is set 
to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The 
default value is `ZSTD`.                                                        
                                                                                
                                                                                
                [...]
-| table.log.arrow.compression.zstd.level  | Integer  | 3                       
            | The compression level of the log records if the log format is set 
to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 
22. The default value is 3.                                                     
                                                                                
                                                                                
                [...]
-| table.kv.format                         | Enum     | COMPACTED               
            | The format of the kv records in kv store. The default value is 
`COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`.               
                                                                                
                                                                                
                                                                                
                 [...]
-| table.log.tiered.local-segments         | Integer  | 2                       
            | The number of log segments to retain in local for each table when 
log tiered storage is enabled. It must be greater that 0. The default is 2.     
                                                                                
                                                                                
                                                                                
              [...]
-| table.datalake.enabled                  | Boolean  | false                   
            | Whether enable lakehouse storage for the table. Disabled by 
default. When this option is set to ture and the datalake tiering service is 
up, the table will be tiered and compacted into datalake format stored on 
lakehouse storage.                                                              
                                                                                
                             [...]
-| table.datalake.format                   | Enum     | (None)                  
            | The data lake format of the table specifies the tiered Lakehouse 
storage format. Currently, supported formats are `paimon`, `iceberg`, and 
`lance`. In the future, more kinds of data lake format will be supported, such 
as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, 
Fluss adopts the key encoding and bucketing strategy used by the corresponding 
data lake format. This  [...]
-| table.datalake.freshness                | Duration | 3min                    
            | It defines the maximum amount of time that the datalake table's 
content should lag behind updates to the Fluss table. Based on this target 
freshness, the Fluss service automatically moves data from the Fluss table and 
updates to the datalake table, so that the data in the datalake table is kept 
up to date within this target. If the data does not need to be as fresh, you 
can specify a longer targe [...]
-| table.datalake.auto-compaction          | Boolean  | false                   
            | If true, compaction will be triggered automatically when tiering 
service writes to the datalake. It is disabled by default.                      
                                                                                
                                                                                
                                                                                
               [...]
-| table.datalake.auto-expire-snapshot     | Boolean  | false                   
            | If true, snapshot expiration will be triggered automatically when 
tiering service commits to the datalake. It is disabled by default.             
                                                                                
                                                                                
                                                                                
              [...]
-| table.merge-engine                      | Enum     | (None)                  
            | Defines the merge engine for the primary key table. By default, 
primary key table uses the [default merge 
engine(last_row)](table-design/merge-engines/default.md). It also supports two 
merge engines are `first_row`, `versioned` and `aggregation`. The [first_row 
merge engine](table-design/merge-engines/first-row.md) will keep the first row 
of the same primary key. The [versioned merge engine](tabl [...]
-| table.merge-engine.versioned.ver-column | String   | (None)                  
            | The column name of the version column for the `versioned` merge 
engine. If the merge engine is set to `versioned`, the version column must be 
set.                                                                            
                                                                                
                                                                                
                  [...]
-| table.delete.behavior                   | Enum     | ALLOW                   
            | Controls the behavior of delete operations on primary key tables. 
Three modes are supported: `ALLOW` (default for default merge engine) - allows 
normal delete operations; `IGNORE` - silently ignores delete requests without 
errors; `DISABLE` - rejects delete requests and throws explicit errors. This 
configuration provides system-level guarantees for some downstream pipelines 
(e.g., Flink Delta Joi [...]
-| table.changelog.image                   | Enum     | FULL                    
            | Defines the changelog image mode for primary key tables. This 
configuration is inspired by similar settings in database systems like MySQL's 
`binlog_row_image` and PostgreSQL's `replica identity`. Two modes are 
supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER 
records for update operations, capturing complete information about updates and 
allowing tracking of previous val [...]
-
+| Option                                  | Type     | Default                 
            | Description                                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|-----------------------------------------|----------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| auto-increment.fields                   | String   | (None)                  
            | Defines the auto increment columns. The auto increment column can 
only be used in primary-key table. With an auto increment column in the table, 
whenever a new row is inserted into the table, the new row will be assigned 
with the next available value from the auto-increment sequence. The auto 
increment column can only be used in primary-key table. The data type of the 
auto increment column must b [...]
+| bucket.num                              | int      | The bucket number of 
Fluss cluster. | The number of buckets of a Fluss table.                        
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| bucket.key                              | String   | (None)                  
            | Specific the distribution policy of the Fluss table. Data will be 
distributed to each bucket according to the hash value of bucket-key (It must 
be a subset of the primary keys excluding partition keys of the primary key 
table). If you specify multiple fields, delimiter is `,`. If the table has a 
primary key and a bucket key is not specified, the bucket key will be used as 
primary key(excluding th [...]
+| table.log.ttl                           | Duration | 7 days                  
            | The time to live for log segments. The configuration controls the 
maximum time we will retain a log before we will delete old segments to free up 
space. If set to -1, the log will not be deleted.                               
                                                                                
                                                                                
              [...]
+| table.auto-partition.enabled            | Boolean  | false                   
            | Whether enable auto partition for the table. Disable by default. 
When auto partition is enabled, the partitions of the table will be created 
automatically.                                                                  
                                                                                
                                                                                
                   [...]
+| table.auto-partition.key                | String   | (None)                  
            | This configuration defines the time-based partition key to be 
used for auto-partitioning when a table is partitioned with multiple keys. 
Auto-partitioning utilizes a time-based partition key to handle partitions 
automatically, including creating new ones and removing outdated ones, by 
comparing the time value of the partition with the current system time. In the 
case of a table using multiple par [...]
+| table.auto-partition.time-unit          | ENUM     | DAY                     
            | The time granularity for auto created partitions. The default 
value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If 
the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If 
the value is `DAY`, the partition format for auto created is yyyyMMdd. If the 
value is `MONTH`, the partition format for auto created is yyyyMM. If the value 
is `QUARTER`, the parti [...]
+| table.auto-partition.num-precreate      | Integer  | 2                       
            | The number of partitions to pre-create for auto created 
partitions in each check for auto partition. For example, if the current check 
time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 
20241112, 20241113 will be pre-created. If any one partition exists, it'll skip 
creating the partition. The default value is 2, which means 2 partitions will 
be pre-created. If the `tab [...]
+| table.auto-partition.num-retention      | Integer  | 7                       
            | The number of history partitions to retain for auto created 
partitions in each check for auto partition. For example, if the current check 
time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then 
the history partitions 20241108, 20241109, 20241110 will be retained. The 
partitions earlier than 20241108 will be deleted. The default value is 7, which 
means that 7 partitions will  [...]
+| table.auto-partition.time-zone          | String   | the system time zone    
            | The time zone for auto partitions, which is by default the same 
as the system time zone.                                                        
                                                                                
                                                                                
                                                                                
                [...]
+| table.replication.factor                | Integer  | (None)                  
            | The replication factor for the log of the new table. When it's 
not set, Fluss will use the cluster's default replication factor configured by 
default.replication.factor. It should be a positive number and not larger than 
the number of tablet servers in the Fluss cluster. A value larger than the 
number of tablet servers in Fluss cluster will result in an error when the new 
table is created.        [...]
+| table.log.format                        | Enum     | ARROW                   
            | The format of the log records in log store. The default value is 
`ARROW`. The supported formats are `ARROW`, `INDEXED` and `COMPACTED`.          
                                                                                
                                                                                
                                                                                
               [...]
+| table.log.arrow.compression.type        | Enum     | ZSTD                    
            | The compression type of the log records if the log format is set 
to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The 
default value is `ZSTD`.                                                        
                                                                                
                                                                                
                [...]
+| table.log.arrow.compression.zstd.level  | Integer  | 3                       
            | The compression level of the log records if the log format is set 
to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 
22. The default value is 3.                                                     
                                                                                
                                                                                
                [...]
+| table.kv.format                         | Enum     | COMPACTED               
            | The format of the kv records in kv store. The default value is 
`COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`.               
                                                                                
                                                                                
                                                                                
                 [...]
+| table.log.tiered.local-segments         | Integer  | 2                       
            | The number of log segments to retain in local for each table when 
log tiered storage is enabled. It must be greater that 0. The default is 2.     
                                                                                
                                                                                
                                                                                
              [...]
+| table.datalake.enabled                  | Boolean  | false                   
            | Whether enable lakehouse storage for the table. Disabled by 
default. When this option is set to ture and the datalake tiering service is 
up, the table will be tiered and compacted into datalake format stored on 
lakehouse storage.                                                              
                                                                                
                             [...]
+| table.datalake.format                   | Enum     | (None)                  
            | The data lake format of the table specifies the tiered Lakehouse 
storage format. Currently, supported formats are `paimon`, `iceberg`, and 
`lance`. In the future, more kinds of data lake format will be supported, such 
as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, 
Fluss adopts the key encoding and bucketing strategy used by the corresponding 
data lake format. This  [...]
+| table.datalake.freshness                | Duration | 3min                    
            | It defines the maximum amount of time that the datalake table's 
content should lag behind updates to the Fluss table. Based on this target 
freshness, the Fluss service automatically moves data from the Fluss table and 
updates to the datalake table, so that the data in the datalake table is kept 
up to date within this target. If the data does not need to be as fresh, you 
can specify a longer targe [...]
+| table.datalake.auto-compaction          | Boolean  | false                   
            | If true, compaction will be triggered automatically when tiering 
service writes to the datalake. It is disabled by default.                      
                                                                                
                                                                                
                                                                                
               [...]
+| table.datalake.auto-expire-snapshot     | Boolean  | false                   
            | If true, snapshot expiration will be triggered automatically when 
tiering service commits to the datalake. It is disabled by default.             
                                                                                
                                                                                
                                                                                
              [...]
+| table.merge-engine                      | Enum     | (None)                  
            | Defines the merge engine for the primary key table. By default, 
primary key table uses the [default merge 
engine(last_row)](table-design/merge-engines/default.md). It also supports two 
merge engines are `first_row`, `versioned` and `aggregation`. The [first_row 
merge engine](table-design/merge-engines/first-row.md) will keep the first row 
of the same primary key. The [versioned merge engine](tabl [...]
+| table.merge-engine.versioned.ver-column | String   | (None)                  
            | The column name of the version column for the `versioned` merge 
engine. If the merge engine is set to `versioned`, the version column must be 
set.                                                                            
                                                                                
                                                                                
                  [...]
+| table.delete.behavior                   | Enum     | ALLOW                   
            | Controls the behavior of delete operations on primary key tables. 
Three modes are supported: `ALLOW` (default for default merge engine) - allows 
normal delete operations; `IGNORE` - silently ignores delete requests without 
errors; `DISABLE` - rejects delete requests and throws explicit errors. This 
configuration provides system-level guarantees for some downstream pipelines 
(e.g., Flink Delta Joi [...]
+| table.changelog.image                   | Enum     | FULL                    
            | Defines the changelog image mode for primary key tables. This 
configuration is inspired by similar settings in database systems like MySQL's 
`binlog_row_image` and PostgreSQL's `replica identity`. Two modes are 
supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER 
records for update operations, capturing complete information about updates and 
allowing tracking of previous val [...]
+| table.auto-inc.batch-size               | Long     | 100000L                 
            | The batch size of auto-increment IDs fetched from the distributed 
counter each time. This value determines the length of the locally cached ID 
segment. Default: 100000. A larger batch size may cause significant 
auto-increment ID gaps, especially when unused cached ID segments are discarded 
due to TabletServer restarts or abnormal terminations. Conversely, a smaller 
batch size increases the freque [...]
 
 ## Read Options
 

Reply via email to