This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit be01cffa912b7ddffe8df677bd8adc4471be3390
Author: Jark Wu <[email protected]>
AuthorDate: Sat Jan 17 14:37:19 2026 +0800

    [kv] Improve the implementation of AUTO_INCREMENT column
---
 .../java/org/apache/fluss/config/TableConfig.java  |  2 +-
 .../java/org/apache/fluss/metadata/Schema.java     | 40 +++++++++---
 .../org/apache/fluss/metadata/TableSchemaTest.java |  2 +-
 .../apache/fluss/flink/FlinkConnectorOptions.java  |  5 +-
 .../fluss/flink/utils/FlinkConversionsTest.java    | 17 ++---
 .../java/org/apache/fluss/server/kv/KvManager.java | 12 +++-
 .../server/kv/autoinc/AutoIncrementManager.java    | 34 +++-------
 .../autoinc/BoundedSegmentSequenceGenerator.java   | 64 +++++++++----------
 .../kv/autoinc/PerSchemaAutoIncrementUpdater.java  | 34 ++++------
 .../kv/autoinc/SequenceGeneratorFactory.java       | 38 +++++++++++
 .../kv/autoinc/ZkSequenceGeneratorFactory.java     | 61 ++++++++++++++++++
 .../org/apache/fluss/server/kv/KvTabletTest.java   | 66 ++++++++++++++++++-
 .../kv/autoinc/SegmentSequenceGeneratorTest.java   | 73 ++++++----------------
 .../autoinc/TestingSequenceGeneratorFactory.java   | 40 ++++++++++++
 .../kv/autoinc/TestingSequenceIDCounter.java       | 54 ++++++++++++++++
 website/docs/engine-flink/ddl.md                   |  8 ++-
 website/docs/engine-flink/options.md               |  2 +-
 17 files changed, 386 insertions(+), 166 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java 
b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index 86604b6be..984a1def4 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -142,7 +142,7 @@ public class TableConfig {
     }
 
     /** Gets the number of auto-increment IDs cached per segment. */
-    public Long getAutoIncrementCacheSize() {
+    public long getAutoIncrementCacheSize() {
         return config.get(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE);
     }
 }
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index 4c1f1828d..e2960dcc7 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -100,6 +100,23 @@ public final class Schema implements Serializable {
         return columns;
     }
 
+    /**
+     * Gets a column by its name.
+     *
+     * @param columnName the column name
+     * @return the column with the given name
+     * @throws IllegalArgumentException if the column does not exist
+     */
+    public Column getColumn(String columnName) {
+        for (Column column : columns) {
+            if (column.getName().equals(columnName)) {
+                return column;
+            }
+        }
+        throw new IllegalArgumentException(
+                String.format("Column %s does not exist in schema.", 
columnName));
+    }
+
     public Optional<PrimaryKey> getPrimaryKey() {
         return Optional.ofNullable(primaryKey);
     }
@@ -209,13 +226,16 @@ public final class Schema implements Serializable {
 
     @Override
     public String toString() {
-        final List<Object> components = new ArrayList<>(columns);
-        if (primaryKey != null) {
-            components.add(primaryKey);
-        }
-        return components.stream()
-                .map(Objects::toString)
-                .collect(Collectors.joining(",", "(", ")"));
+        return "Schema{"
+                + "columns="
+                + columns
+                + ", primaryKey="
+                + primaryKey
+                + ", autoIncrementColumnNames="
+                + autoIncrementColumnNames
+                + ", highestFieldId="
+                + highestFieldId
+                + '}';
     }
 
     @Override
@@ -228,12 +248,14 @@ public final class Schema implements Serializable {
         }
         Schema schema = (Schema) o;
         return Objects.equals(columns, schema.columns)
-                && Objects.equals(primaryKey, schema.primaryKey);
+                && Objects.equals(autoIncrementColumnNames, 
schema.autoIncrementColumnNames)
+                && Objects.equals(primaryKey, schema.primaryKey)
+                && highestFieldId == schema.highestFieldId;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(columns, primaryKey);
+        return Objects.hash(columns, primaryKey, autoIncrementColumnNames, 
highestFieldId);
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java 
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
index fe317b53d..a5ef1fed7 100644
--- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
@@ -369,7 +369,7 @@ class TableSchemaTest {
                         .build();
 
         Schema copied = Schema.newBuilder().fromSchema(original).build();
-        
assertThat(copied.getAutoIncrementColumnNames()).isEqualTo(Arrays.asList("value"));
+        assertThat(copied).isEqualTo(original);
     }
 
     @Test
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
index 56e9f9fce..6e6f0921d 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
@@ -43,8 +43,9 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "Defines the auto increment columns. "
                                     + "The auto increment column can only be 
used in primary-key table."
-                                    + "With an auto increment column in the 
table, whenever a new row is inserted into the table, the new row will be 
assigned with the next available value from the auto-increment sequence."
-                                    + "The auto increment column can only be 
used in primary-key table. The data type of the auto increment column must be 
INT or BIGINT."
+                                    + "With an auto increment column in the 
table, whenever a new row is inserted into the table, "
+                                    + "the new row will be assigned with the 
next available value from the auto-increment sequence."
+                                    + "The data type of the auto increment 
column must be INT or BIGINT."
                                     + "Currently a table can have only one 
auto-increment column."
                                     + "Adding an auto increment column to an 
existing table is not supported.");
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
index ad142a6c5..b4546701e 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
@@ -217,12 +217,9 @@ public class FlinkConversionsTest {
         TableDescriptor flussTable =
                 FlinkConversions.toFlussTable(new 
ResolvedCatalogTable(flinkTable, schema));
         String expectFlussTableString =
-                "TableDescriptor{schema=("
-                        + "order_id STRING NOT NULL,"
-                        + "item ROW<`item_id` STRING, `item_price` STRING, 
`item_details` ROW<`category` STRING, `specifications` STRING>>,"
-                        + "orig_ts TIMESTAMP(6),"
-                        + "CONSTRAINT PK_order_id PRIMARY KEY (order_id)"
-                        + "), comment='test comment', partitionKeys=[], "
+                "TableDescriptor{schema=Schema{columns=[order_id STRING NOT 
NULL, item ROW<`item_id` STRING, `item_price` STRING, `item_details` 
ROW<`category` STRING, `specifications` STRING>>, orig_ts TIMESTAMP(6)], "
+                        + "primaryKey=CONSTRAINT PK_order_id PRIMARY KEY 
(order_id), "
+                        + "autoIncrementColumnNames=[], highestFieldId=7}, 
comment='test comment', partitionKeys=[], "
                         + "tableDistribution={bucketKeys=[order_id] 
bucketCount=null}, "
                         + "properties={}, "
                         + "customProperties={"
@@ -424,11 +421,9 @@ public class FlinkConversionsTest {
                 FlinkConversions.toFlussTable(
                         new 
ResolvedCatalogMaterializedTable(flinkMaterializedTable, schema));
         String expectFlussTableString =
-                "TableDescriptor{schema=("
-                        + "order_id STRING NOT NULL,"
-                        + "orig_ts TIMESTAMP(6),"
-                        + "CONSTRAINT PK_order_id PRIMARY KEY (order_id)"
-                        + "), comment='test comment', partitionKeys=[], "
+                "TableDescriptor{schema=Schema{columns=[order_id STRING NOT 
NULL, orig_ts TIMESTAMP(6)], "
+                        + "primaryKey=CONSTRAINT PK_order_id PRIMARY KEY 
(order_id), "
+                        + "autoIncrementColumnNames=[], highestFieldId=1}, 
comment='test comment', partitionKeys=[], "
                         + "tableDistribution={bucketKeys=[order_id] 
bucketCount=null}, "
                         + "properties={}, "
                         + 
"customProperties={materialized-table.definition-query=select order_id, orig_ts 
from t, "
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
index 8d99b461e..1f0c9c268 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
@@ -37,6 +37,7 @@ import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.TabletManagerBase;
 import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
+import org.apache.fluss.server.kv.autoinc.ZkSequenceGeneratorFactory;
 import org.apache.fluss.server.kv.rowmerger.RowMerger;
 import org.apache.fluss.server.log.LogManager;
 import org.apache.fluss.server.log.LogTablet;
@@ -251,7 +252,10 @@ public final class KvManager extends TabletManagerBase 
implements ServerReconfig
                     RowMerger merger = RowMerger.create(tableConfig, kvFormat, 
schemaGetter);
                     AutoIncrementManager autoIncrementManager =
                             new AutoIncrementManager(
-                                    schemaGetter, tablePath.getTablePath(), 
tableConfig, zkClient);
+                                    schemaGetter,
+                                    tablePath.getTablePath(),
+                                    tableConfig,
+                                    new ZkSequenceGeneratorFactory(zkClient));
 
                     KvTablet tablet =
                             KvTablet.create(
@@ -365,7 +369,11 @@ public final class KvManager extends TabletManagerBase 
implements ServerReconfig
         RowMerger rowMerger =
                 RowMerger.create(tableConfig, tableConfig.getKvFormat(), 
schemaGetter);
         AutoIncrementManager autoIncrementManager =
-                new AutoIncrementManager(schemaGetter, tablePath, tableConfig, 
zkClient);
+                new AutoIncrementManager(
+                        schemaGetter,
+                        tablePath,
+                        tableConfig,
+                        new ZkSequenceGeneratorFactory(zkClient));
         KvTablet kvTablet =
                 KvTablet.create(
                         physicalTablePath,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
index 1c1d36a42..971ca4fa8 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
@@ -23,10 +23,6 @@ import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaGetter;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.server.zk.ZkSequenceIDCounter;
-import org.apache.fluss.server.zk.ZooKeeperClient;
-import org.apache.fluss.server.zk.data.ZkData;
-import org.apache.fluss.types.DataTypeRoot;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -34,6 +30,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.time.Duration;
+import java.util.List;
 
 import static org.apache.fluss.utils.Preconditions.checkState;
 
@@ -55,7 +52,7 @@ public class AutoIncrementManager {
             SchemaGetter schemaGetter,
             TablePath tablePath,
             TableConfig tableConf,
-            ZooKeeperClient zkClient) {
+            SequenceGeneratorFactory seqGeneratorFactory) {
         this.autoIncrementUpdaterCache =
                 Caffeine.newBuilder()
                         .maximumSize(5)
@@ -64,30 +61,19 @@ public class AutoIncrementManager {
         this.schemaGetter = schemaGetter;
         int schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
         Schema schema = schemaGetter.getSchema(schemaId);
-        int[] autoIncrementColumnIds = schema.getAutoIncrementColumnIds();
+        List<String> autoIncrementColumnNames = 
schema.getAutoIncrementColumnNames();
 
         checkState(
-                autoIncrementColumnIds.length <= 1,
+                autoIncrementColumnNames.size() <= 1,
                 "Only support one auto increment column for a table, but got 
%d.",
-                autoIncrementColumnIds.length);
+                autoIncrementColumnNames.size());
 
-        if (autoIncrementColumnIds.length == 1) {
-            autoIncrementColumnId = autoIncrementColumnIds[0];
-            boolean requiresIntOverflowCheck =
-                    schema.getRowType()
-                            
.getField(schema.getColumnName(autoIncrementColumnId))
-                            .getType()
-                            .is(DataTypeRoot.INTEGER);
+        if (autoIncrementColumnNames.size() == 1) {
+            Schema.Column autoIncrementColumn = 
schema.getColumn(autoIncrementColumnNames.get(0));
+            autoIncrementColumnId = autoIncrementColumn.getColumnId();
             sequenceGenerator =
-                    new BoundedSegmentSequenceGenerator(
-                            tablePath,
-                            schema.getColumnName(autoIncrementColumnId),
-                            new ZkSequenceIDCounter(
-                                    zkClient.getCuratorClient(),
-                                    ZkData.AutoIncrementColumnZNode.path(
-                                            tablePath, autoIncrementColumnId)),
-                            tableConf,
-                            requiresIntOverflowCheck ? Integer.MAX_VALUE : 
Long.MAX_VALUE);
+                    seqGeneratorFactory.createSequenceGenerator(
+                            tablePath, autoIncrementColumn, 
tableConf.getAutoIncrementCacheSize());
         } else {
             autoIncrementColumnId = -1;
             sequenceGenerator = null;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
index f47831ae4..44afe717e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java
@@ -18,7 +18,6 @@
 
 package org.apache.fluss.server.kv.autoinc;
 
-import org.apache.fluss.config.TableConfig;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.SequenceOverflowException;
 import org.apache.fluss.metadata.TablePath;
@@ -41,40 +40,33 @@ public class BoundedSegmentSequenceGenerator implements 
SequenceGenerator {
     private final long cacheSize;
     private final long maxAllowedValue;
 
-    private AutoIncIdSegment segment;
+    private IdSegment segment;
 
     public BoundedSegmentSequenceGenerator(
             TablePath tablePath,
             String columnName,
             SequenceIDCounter sequenceIDCounter,
-            TableConfig tableConf,
+            long idCacheSize,
             long maxAllowedValue) {
-        this.cacheSize = tableConf.getAutoIncrementCacheSize();
+        this.cacheSize = idCacheSize;
         this.columnName = columnName;
         this.tablePath = tablePath;
         this.sequenceIDCounter = sequenceIDCounter;
-        this.segment = AutoIncIdSegment.EMPTY;
+        this.segment = IdSegment.EMPTY;
         this.maxAllowedValue = maxAllowedValue;
     }
 
     private void fetchSegment() {
         try {
             long start = sequenceIDCounter.getAndAdd(cacheSize);
-            if (start >= maxAllowedValue) {
-                throw new SequenceOverflowException(
-                        String.format(
-                                "Reached maximum value of sequence \"<%s>\" 
(%d).",
-                                columnName, maxAllowedValue));
-            }
-
-            long actualEnd = Math.min(start + cacheSize, maxAllowedValue - 1);
+            // the initial value of ZNode is 0, but we start ID from 1
+            segment = new IdSegment(start + 1, start + cacheSize);
             LOG.info(
-                    "Successfully fetch auto-increment values range ({}, {}], 
table_path={}, column_name={}.",
-                    start,
-                    actualEnd,
+                    "Successfully fetch auto-increment values range [{}, {}], 
table_path={}, column_name={}.",
+                    segment.current,
+                    segment.end,
                     tablePath,
                     columnName);
-            segment = new AutoIncIdSegment(start, actualEnd - start);
         } catch (SequenceOverflowException sequenceOverflowException) {
             throw sequenceOverflowException;
         } catch (Exception e) {
@@ -88,32 +80,36 @@ public class BoundedSegmentSequenceGenerator implements 
SequenceGenerator {
 
     @Override
     public long nextVal() {
-        if (segment.remaining() <= 0) {
+        if (!segment.hasNext()) {
             fetchSegment();
         }
-        return segment.tryNextVal();
+        long id = segment.nextVal();
+        if (id > maxAllowedValue) {
+            throw new SequenceOverflowException(
+                    String.format(
+                            "Reached maximum value of sequence \"<%s>\" (%d).",
+                            columnName, maxAllowedValue));
+        }
+        return id;
     }
 
-    private static class AutoIncIdSegment {
-        private static final AutoIncIdSegment EMPTY = new AutoIncIdSegment(0, 
0);
-        private long current;
-        private final long end;
+    private static class IdSegment {
+        private static final IdSegment EMPTY = new IdSegment(0, -1);
+        final long end;
+        long current;
 
-        public AutoIncIdSegment(long start, long length) {
-            this.end = start + length;
-            this.current = start;
+        /** ID range from min (inclusive) to max (inclusive). */
+        public IdSegment(long min, long max) {
+            this.current = min;
+            this.end = max;
         }
 
-        public long remaining() {
-            return end - current;
+        public boolean hasNext() {
+            return current <= end;
         }
 
-        public long tryNextVal() {
-            long id = ++current;
-            if (id > end) {
-                throw new IllegalStateException("No more IDs available in 
current segment.");
-            }
-            return id;
+        public long nextVal() {
+            return current++;
         }
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
index ff70f5783..b3e98fb9d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
@@ -18,7 +18,6 @@
 
 package org.apache.fluss.server.kv.autoinc;
 
-import org.apache.fluss.exception.SequenceOverflowException;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.record.BinaryValue;
@@ -29,8 +28,6 @@ import org.apache.fluss.types.DataTypeRoot;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.util.function.LongSupplier;
-
 /**
  * An {@link AutoIncrementUpdater} implementation that assigns auto-increment 
values to a specific
  * column based on a fixed schema. It is bound to a particular schema version 
and assumes the
@@ -45,9 +42,9 @@ public class PerSchemaAutoIncrementUpdater implements 
AutoIncrementUpdater {
     private final RowEncoder rowEncoder;
     private final int fieldLength;
     private final int targetColumnIdx;
-    private final LongSupplier idSupplier;
+    private final SequenceGenerator sequenceGenerator;
     private final short schemaId;
-    private final String targetColumnName;
+    private final boolean requireInteger;
 
     public PerSchemaAutoIncrementUpdater(
             KvFormat kvFormat,
@@ -63,40 +60,31 @@ public class PerSchemaAutoIncrementUpdater implements 
AutoIncrementUpdater {
         for (int i = 0; i < fieldLength; i++) {
             flussFieldGetters[i] = 
InternalRow.createFieldGetter(fieldDataTypes[i], i);
         }
+        this.sequenceGenerator = sequenceGenerator;
         this.schemaId = schemaId;
         this.targetColumnIdx = 
schema.getColumnIds().indexOf(autoIncrementColumnId);
-        this.targetColumnName = schema.getColumnName(targetColumnIdx);
         if (targetColumnIdx == -1) {
             throw new IllegalStateException(
                     String.format(
                             "Auto-increment column ID %d not found in schema 
columns: %s",
                             autoIncrementColumnId, schema.getColumnIds()));
         }
-
-        if (fieldDataTypes[targetColumnIdx].is(DataTypeRoot.INTEGER)) {
-            this.idSupplier = () -> 
checkedNextInt(sequenceGenerator.nextVal());
-        } else {
-            this.idSupplier = sequenceGenerator::nextVal;
-        }
+        this.requireInteger = 
fieldDataTypes[targetColumnIdx].is(DataTypeRoot.INTEGER);
         this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
         this.flussFieldGetters = flussFieldGetters;
     }
 
-    private long checkedNextInt(long value) {
-        if (value > Integer.MAX_VALUE) {
-            throw new SequenceOverflowException(
-                    String.format(
-                            "Reached maximum value of sequence \"<%s>\" 
(2147483647).",
-                            targetColumnName));
-        }
-        return value;
-    }
-
     public BinaryValue updateAutoIncrementColumns(BinaryValue rowValue) {
         rowEncoder.startNewRow();
         for (int i = 0; i < fieldLength; i++) {
             if (targetColumnIdx == i) {
-                rowEncoder.encodeField(i, idSupplier.getAsLong());
+                long seq = sequenceGenerator.nextVal();
+                // cast to integer if needed
+                if (requireInteger) {
+                    rowEncoder.encodeField(i, (int) seq);
+                } else {
+                    rowEncoder.encodeField(i, seq);
+                }
             } else {
                 // use the row value
                 rowEncoder.encodeField(i, 
flussFieldGetters[i].getFieldOrNull(rowValue.row));
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGeneratorFactory.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGeneratorFactory.java
new file mode 100644
index 000000000..a519c3f66
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGeneratorFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TablePath;
+
+/** Factory interface for creating {@link SequenceGenerator} instances. */
+public interface SequenceGeneratorFactory {
+
+    /**
+     * Creates a {@link SequenceGenerator} for the specified table and 
auto-increment column.
+     *
+     * @param tablePath the path of the table
+     * @param autoIncrementColumn the auto-increment column schema
+     * @param idCacheSize the size of ID cache for optimization
+     * @return a new instance of {@link SequenceGenerator}
+     */
+    SequenceGenerator createSequenceGenerator(
+            TablePath tablePath, Schema.Column autoIncrementColumn, long 
idCacheSize);
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java
new file mode 100644
index 000000000..1842f47f6
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.types.DataTypeRoot;
+
+/** ZooKeeper-based implementation of {@link SequenceGeneratorFactory}. */
+public class ZkSequenceGeneratorFactory implements SequenceGeneratorFactory {
+
+    private final ZooKeeperClient zkClient;
+
+    public ZkSequenceGeneratorFactory(ZooKeeperClient zkClient) {
+        this.zkClient = zkClient;
+    }
+
+    @Override
+    public SequenceGenerator createSequenceGenerator(
+            TablePath tablePath, Schema.Column autoIncrementColumn, long 
idCacheSize) {
+        DataTypeRoot typeRoot = 
autoIncrementColumn.getDataType().getTypeRoot();
+        final long maxAllowedValue;
+        if (typeRoot == DataTypeRoot.INTEGER) {
+            maxAllowedValue = Integer.MAX_VALUE;
+        } else if (typeRoot == DataTypeRoot.BIGINT) {
+            maxAllowedValue = Long.MAX_VALUE;
+        } else {
+            throw new IllegalArgumentException(
+                    "Auto-increment column must be of type INTEGER or BIGINT");
+        }
+        return new BoundedSegmentSequenceGenerator(
+                tablePath,
+                autoIncrementColumn.getName(),
+                new ZkSequenceIDCounter(
+                        zkClient.getCuratorClient(),
+                        ZkData.AutoIncrementColumnZNode.path(
+                                tablePath, autoIncrementColumn.getColumnId())),
+                idCacheSize,
+                maxAllowedValue);
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 8c0a6b864..34f50691a 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -36,6 +36,9 @@ import org.apache.fluss.record.FileLogProjection;
 import org.apache.fluss.record.KvRecord;
 import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.KvRecordTestUtils;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.record.LogRecordBatch;
+import org.apache.fluss.record.LogRecordReadContext;
 import org.apache.fluss.record.LogRecords;
 import org.apache.fluss.record.LogTestBase;
 import org.apache.fluss.record.MemoryLogRecords;
@@ -46,6 +49,7 @@ import org.apache.fluss.record.bytesview.MultiBytesView;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.encode.ValueEncoder;
 import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
+import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value;
@@ -61,6 +65,7 @@ import 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.types.StringType;
+import org.apache.fluss.utils.CloseableIterator;
 import org.apache.fluss.utils.clock.SystemClock;
 import org.apache.fluss.utils.concurrent.FlussScheduler;
 
@@ -84,6 +89,8 @@ import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
 import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
@@ -122,7 +129,7 @@ class KvTabletTest {
 
     @BeforeEach
     void beforeEach() {
-        executor = Executors.newFixedThreadPool(2);
+        executor = Executors.newFixedThreadPool(3);
     }
 
     @AfterEach
@@ -187,7 +194,7 @@ class KvTabletTest {
                         schemaGetter,
                         tablePath.getTablePath(),
                         new TableConfig(new Configuration()),
-                        null);
+                        new TestingSequenceGeneratorFactory());
 
         return KvTablet.create(
                 tablePath,
@@ -600,6 +607,61 @@ class KvTabletTest {
         }
     }
 
+    @Test
+    void testAutoIncrementWithMultiThread() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("user_name", DataTypes.STRING())
+                        .column("uid", DataTypes.INT())
+                        .primaryKey("user_name")
+                        .enableAutoIncrement("uid")
+                        .build();
+        initLogTabletAndKvTablet(schema, new HashMap<>());
+        KvRecordTestUtils.KvRecordFactory recordFactory =
+                KvRecordTestUtils.KvRecordFactory.of(schema.getRowType());
+
+        // start threads to put records
+        List<Future<LogAppendInfo>> putFutures = new ArrayList<>();
+        for (int i = 1; i <= 30; ) {
+            String k1 = "k" + i++;
+            String k2 = "k" + i++;
+            String k3 = "k" + i++;
+            KvRecordBatch kvRecordBatch1 =
+                    kvRecordBatchFactory.ofRecords(
+                            Arrays.asList(
+                                    recordFactory.ofRecord(k1.getBytes(), new 
Object[] {k1, null}),
+                                    recordFactory.ofRecord(k2.getBytes(), new 
Object[] {k2, null}),
+                                    recordFactory.ofRecord(
+                                            k3.getBytes(), new Object[] {k3, 
null})));
+            // test concurrent putting to test thread-safety of 
AutoIncrementManager
+            putFutures.add(executor.submit(() -> 
kvTablet.putAsLeader(kvRecordBatch1, null)));
+        }
+
+        // wait for all putting finished
+        for (Future<LogAppendInfo> future : putFutures) {
+            future.get();
+        }
+
+        LogRecords actualLogRecords = readLogRecords(logTablet, 0L, null);
+        LogRecordReadContext context =
+                LogRecordReadContext.createArrowReadContext(
+                        schema.getRowType(), schemaId, schemaGetter);
+        List<Integer> actualUids = new ArrayList<>();
+        for (LogRecordBatch actualNext : actualLogRecords.batches()) {
+            CloseableIterator<LogRecord> iterator = 
actualNext.records(context);
+            while (iterator.hasNext()) {
+                LogRecord record = iterator.next();
+                
assertThat(record.getChangeType()).isEqualTo(ChangeType.INSERT);
+                assertThat(record.getRow().isNullAt(1)).isFalse();
+                actualUids.add(record.getRow().getInt(1));
+            }
+        }
+        // create a List from 1 to 30
+        List<Integer> expectedUids =
+                IntStream.rangeClosed(1, 
30).boxed().collect(Collectors.toList());
+        assertThat(actualUids).isEqualTo(expectedUids);
+    }
+
     @Test
     void testPutAsLeaderWithOutOfOrderSequenceException() throws Exception {
         initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
index 685c2d539..c7c90ab6c 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java
@@ -18,21 +18,15 @@
 
 package org.apache.fluss.server.kv.autoinc;
 
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.config.TableConfig;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.SequenceOverflowException;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.server.SequenceIDCounter;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -47,16 +41,10 @@ class SegmentSequenceGeneratorTest {
     private static final long CACHE_SIZE = 100;
 
     private AtomicLong snapshotIdGenerator;
-    private Configuration configuration;
-    private TableConfig tableConfig;
 
     @BeforeEach
     void setUp() {
         snapshotIdGenerator = new AtomicLong(0);
-        Map<String, String> map = new HashMap<>();
-        map.put(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE.key(), 
String.valueOf(CACHE_SIZE));
-        configuration = Configuration.fromMap(map);
-        tableConfig = new TableConfig(configuration);
     }
 
     @Test
@@ -65,8 +53,8 @@ class SegmentSequenceGeneratorTest {
                 new BoundedSegmentSequenceGenerator(
                         TABLE_PATH,
                         COLUMN_NAME,
-                        new TestingSnapshotIDCounter(snapshotIdGenerator),
-                        new TableConfig(configuration),
+                        new TestingSequenceIDCounter(snapshotIdGenerator),
+                        CACHE_SIZE,
                         Long.MAX_VALUE);
         for (long i = 1; i <= CACHE_SIZE; i++) {
             assertThat(generator.nextVal()).isEqualTo(i);
@@ -90,8 +78,8 @@ class SegmentSequenceGeneratorTest {
                                         new BoundedSegmentSequenceGenerator(
                                                 new TablePath("test_db", 
"table1"),
                                                 COLUMN_NAME,
-                                                new 
TestingSnapshotIDCounter(snapshotIdGenerator),
-                                                tableConfig,
+                                                new 
TestingSequenceIDCounter(snapshotIdGenerator),
+                                                CACHE_SIZE,
                                                 Long.MAX_VALUE);
                                 for (int j = 0; j < 130; j++) {
                                     linkedDeque.add(generator.nextVal());
@@ -116,8 +104,8 @@ class SegmentSequenceGeneratorTest {
                 new BoundedSegmentSequenceGenerator(
                         new TablePath("test_db", "table1"),
                         COLUMN_NAME,
-                        new TestingSnapshotIDCounter(snapshotIdGenerator, 2),
-                        tableConfig,
+                        new TestingSequenceIDCounter(snapshotIdGenerator, 2),
+                        CACHE_SIZE,
                         Long.MAX_VALUE);
         for (int j = 1; j <= CACHE_SIZE; j++) {
             assertThat(generator.nextVal()).isEqualTo(j);
@@ -132,51 +120,28 @@ class SegmentSequenceGeneratorTest {
 
     @Test
     void testFetchIdOverFlow() {
+        int initialValue = Integer.MAX_VALUE - 10;
+        snapshotIdGenerator = new AtomicLong(initialValue);
         BoundedSegmentSequenceGenerator generator =
                 new BoundedSegmentSequenceGenerator(
                         new TablePath("test_db", "table1"),
                         COLUMN_NAME,
-                        new TestingSnapshotIDCounter(snapshotIdGenerator),
-                        tableConfig,
-                        CACHE_SIZE + 9);
-        for (int j = 1; j < CACHE_SIZE + 9; j++) {
-            assertThat(generator.nextVal()).isEqualTo(j);
+                        new TestingSequenceIDCounter(snapshotIdGenerator),
+                        CACHE_SIZE,
+                        Integer.MAX_VALUE);
+
+        int lastValue = 0;
+        for (int j = 1; j <= 10; j++) {
+            lastValue = (int) generator.nextVal();
+            assertThat(lastValue).isEqualTo(initialValue + j);
         }
+        assertThat(lastValue).isEqualTo(Integer.MAX_VALUE);
+
         assertThatThrownBy(generator::nextVal)
                 .isInstanceOf(SequenceOverflowException.class)
                 .hasMessage(
                         String.format(
                                 "Reached maximum value of sequence \"<%s>\" 
(%d).",
-                                COLUMN_NAME, CACHE_SIZE + 9));
-    }
-
-    private static class TestingSnapshotIDCounter implements SequenceIDCounter 
{
-
-        private final AtomicLong snapshotIdGenerator;
-        private int fetchTime;
-        private final int failedTrigger;
-
-        public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator) {
-            this(snapshotIdGenerator, Integer.MAX_VALUE);
-        }
-
-        public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator, int 
failedTrigger) {
-            this.snapshotIdGenerator = snapshotIdGenerator;
-            fetchTime = 0;
-            this.failedTrigger = failedTrigger;
-        }
-
-        @Override
-        public long getAndIncrement() {
-            return snapshotIdGenerator.getAndIncrement();
-        }
-
-        @Override
-        public long getAndAdd(Long delta) {
-            if (++fetchTime < failedTrigger) {
-                return snapshotIdGenerator.getAndAdd(delta);
-            }
-            throw new RuntimeException("Failed to get snapshot ID");
-        }
+                                COLUMN_NAME, Integer.MAX_VALUE));
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java
new file mode 100644
index 000000000..2bafcd670
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TablePath;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Factory for creating {@link SequenceGenerator} instances for testing. */
+public class TestingSequenceGeneratorFactory implements 
SequenceGeneratorFactory {
+
+    @Override
+    public SequenceGenerator createSequenceGenerator(
+            TablePath tablePath, Schema.Column autoIncrementColumn, long 
idCacheSize) {
+        return new BoundedSegmentSequenceGenerator(
+                tablePath,
+                autoIncrementColumn.getName(),
+                new TestingSequenceIDCounter(new AtomicLong(0)),
+                idCacheSize,
+                Long.MAX_VALUE);
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
new file mode 100644
index 000000000..72f10f5d4
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.server.SequenceIDCounter;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A testing implementation of {@link SequenceIDCounter} based Java {@link 
AtomicLong}. */
+public class TestingSequenceIDCounter implements SequenceIDCounter {
+    private final AtomicLong idGenerator;
+    private int fetchTime;
+    private final int failedTrigger;
+
+    public TestingSequenceIDCounter(AtomicLong idGenerator) {
+        this(idGenerator, Integer.MAX_VALUE);
+    }
+
+    public TestingSequenceIDCounter(AtomicLong idGenerator, int failedTrigger) 
{
+        this.idGenerator = idGenerator;
+        fetchTime = 0;
+        this.failedTrigger = failedTrigger;
+    }
+
+    @Override
+    public long getAndIncrement() {
+        return idGenerator.getAndIncrement();
+    }
+
+    @Override
+    public long getAndAdd(Long delta) {
+        if (++fetchTime < failedTrigger) {
+            return idGenerator.getAndAdd(delta);
+        }
+        throw new RuntimeException("Failed to get snapshot ID");
+    }
+}
diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md
index b333a1400..40e12a9c1 100644
--- a/website/docs/engine-flink/ddl.md
+++ b/website/docs/engine-flink/ddl.md
@@ -236,9 +236,13 @@ Currently, this feature has the following characteristics:
 - **Position**: New columns are always appended to the end of the existing 
column list.
 - **Nullability**: Only nullable columns can be added to an existing table to 
ensure compatibility with existing data.
 - **Type Support**: You can add columns of any data type, including complex 
types such as `ROW`, `MAP`, and `ARRAY`.
-- **Nested Fields**: Currently, adding fields within an existing nested `ROW` 
is not supported. Such operations are categorized as "updating column types" 
and will be supported in future versions.
 
-You can add a single column or multiple columns using the `ALTER TABLE 
statement.
+The following limitations currently apply but will be supported in the future:
+
+- **Nested Fields**: Adding fields within an existing nested `ROW` is not 
supported. Such operations are categorized as "updating column types" and will 
be supported in future versions.
+- **AUTO INCREMENT**: Adding `AUTO_INCREMENT` columns by using `ALTER TABLE` 
is not supported; such columns must be defined when the table is created.
+
+You can add a single column or multiple columns using the `ALTER TABLE` 
statement.
 
 ```sql title="Flink SQL"
 -- Add a single column at the end of the table
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index 8734396f3..33cf9e68b 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -63,7 +63,7 @@ See more details about [ALTER TABLE ... 
SET](engine-flink/ddl.md#set-properties)
 
 | Option                                  | Type     | Default                 
            | Description                                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 
|-----------------------------------------|----------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| auto-increment.fields                   | String   | (None)                  
            | Defines the auto increment columns. The auto increment column can 
only be used in primary-key table. With an auto increment column in the table, 
whenever a new row is inserted into the table, the new row will be assigned 
with the next available value from the auto-increment sequence. The auto 
increment column can only be used in primary-key table. The data type of the 
auto increment column must b [...]
+| auto-increment.fields                   | String   | (None)                  
            | Defines the auto increment columns. The auto increment column can 
only be used in primary-key table. With an auto increment column in the table, 
whenever a new row is inserted into the table, the new row will be assigned 
with the next available value from the auto-increment sequence. The data type 
of the auto increment column must be INT or BIGINT. Currently a table can have 
only one auto-increme [...]
 | bucket.num                              | int      | The bucket number of 
Fluss cluster. | The number of buckets of a Fluss table.                        
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
 | bucket.key                              | String   | (None)                  
            | Specific the distribution policy of the Fluss table. Data will be 
distributed to each bucket according to the hash value of bucket-key (It must 
be a subset of the primary keys excluding partition keys of the primary key 
table). If you specify multiple fields, delimiter is `,`. If the table has a 
primary key and a bucket key is not specified, the bucket key will be used as 
primary key(excluding th [...]
 | table.log.ttl                           | Duration | 7 days                  
            | The time to live for log segments. The configuration controls the 
maximum time we will retain a log before we will delete old segments to free up 
space. If set to -1, the log will not be deleted.                               
                                                                                
                                                                                
              [...]

Reply via email to