wuchong commented on code in PR #2161:
URL: https://github.com/apache/fluss/pull/2161#discussion_r2679474574


##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncUpdater.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.record.BinaryValue;
+
+/** A updater to auto increment column . */
+public interface AutoIncUpdater {
+
+    /**
+     * Updates the auto-increment column in the given row by replacing its 
value with a new sequence
+     * number.
+     *
+     * <p>This method may return a new {@link BinaryValue} instance or the 
same instance if no
+     * update is needed (e.g., in a no-op implementation).
+     *
+     * @param rowValue the input row in binary form, must not be {@code null}
+     * @return a {@link BinaryValue} representing the updated row; never 
{@code null}
+     */
+    BinaryValue updateAutoInc(BinaryValue rowValue);

Review Comment:
   nit: `updateAutoIncColumns` to be more specific. 



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncUpdater.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.record.BinaryValue;
+
+/** A updater to auto increment column . */
+public interface AutoIncUpdater {

Review Comment:
   ditto. rename to `AutoIncrementUpdater`, and also the sub-classes. 



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncUpdater.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.types.DataType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * An {@link AutoIncUpdater} implementation that assigns auto-increment values 
to a specific column
+ * based on a fixed schema. It is bound to a particular schema version and 
assumes the
+ * auto-increment column position remains constant within that schema.
+ *
+ * <p>This class is not thread-safe and is intended to be used within a 
single-threaded execution
+ * context.
+ */
+@NotThreadSafe
+public class PerSchemaAutoIncUpdater implements AutoIncUpdater {
+    private final InternalRow.FieldGetter[] flussFieldGetters;
+    private final RowEncoder rowEncoder;
+    private final DataType[] fieldDataTypes;
+    private final int targetColumnIdx;
+    private final SequenceGenerator idGenerator;
+    private final short schemaId;
+
+    public PerSchemaAutoIncUpdater(
+            KvFormat kvFormat,
+            short schemaId,
+            Schema schema,
+            int autoIncColumnId,
+            SequenceGenerator sequenceGenerator) {
+        DataType[] fieldDataTypes = 
schema.getRowType().getChildren().toArray(new DataType[0]);
+
+        // getter for the fields in row
+        InternalRow.FieldGetter[] flussFieldGetters =
+                new InternalRow.FieldGetter[fieldDataTypes.length];
+        for (int i = 0; i < fieldDataTypes.length; i++) {
+            flussFieldGetters[i] = 
InternalRow.createFieldGetter(fieldDataTypes[i], i);
+        }
+        this.idGenerator = sequenceGenerator;
+        this.schemaId = schemaId;
+        this.targetColumnIdx = schema.getColumnIds().indexOf(autoIncColumnId);
+        if (targetColumnIdx == -1) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Auto-increment column ID %d not found in schema 
columns: %s",
+                            autoIncColumnId, schema.getColumnIds()));
+        }
+        this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
+        this.fieldDataTypes = fieldDataTypes;
+        this.flussFieldGetters = flussFieldGetters;
+    }
+
+    public BinaryValue updateAutoInc(BinaryValue rowValue) {
+        rowEncoder.startNewRow();
+        for (int i = 0; i < fieldDataTypes.length; i++) {

Review Comment:
   nit: just have a `fieldLength` member variable is enough instead of 
`fieldDataTypes`



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGenerator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.SequenceIDCounter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/** Segment ID generator, fetch ID with a batch size. */
+@NotThreadSafe
+public class SegmentSequenceGenerator implements SequenceGenerator {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SegmentSequenceGenerator.class);
+
+    private final SequenceIDCounter sequenceIDCounter;
+    private final TablePath tablePath;
+    private final int columnIdx;
+    private final String columnName;
+
+    private AutoIncIdSegment segment = new AutoIncIdSegment(0, 0);
+
+    private final long batchSize;
+
+    public SegmentSequenceGenerator(
+            TablePath tablePath,
+            int columnIdx,
+            String columnName,
+            SequenceIDCounter sequenceIDCounter,
+            Configuration properties) {
+        batchSize = 
properties.getLong(ConfigOptions.TABLE_AUTO_INC_BATCH_SIZE);

Review Comment:
   ```suggestion
           this.batchSize = 
properties.getLong(ConfigOptions.TABLE_AUTO_INC_BATCH_SIZE);
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGenerator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.SequenceIDCounter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/** Segment ID generator, fetch ID with a batch size. */
+@NotThreadSafe
+public class SegmentSequenceGenerator implements SequenceGenerator {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SegmentSequenceGenerator.class);
+
+    private final SequenceIDCounter sequenceIDCounter;
+    private final TablePath tablePath;
+    private final int columnIdx;
+    private final String columnName;
+
+    private AutoIncIdSegment segment = new AutoIncIdSegment(0, 0);
+
+    private final long batchSize;
+
+    public SegmentSequenceGenerator(
+            TablePath tablePath,
+            int columnIdx,
+            String columnName,
+            SequenceIDCounter sequenceIDCounter,
+            Configuration properties) {
+        batchSize = 
properties.getLong(ConfigOptions.TABLE_AUTO_INC_BATCH_SIZE);
+        this.columnName = columnName;
+        this.tablePath = tablePath;
+        this.columnIdx = columnIdx;
+        this.sequenceIDCounter = sequenceIDCounter;
+    }
+
+    private void fetchSegment() {
+        try {
+            long start = sequenceIDCounter.getAndAdd(batchSize);
+            LOG.info(
+                    "Successfully fetch auto-increment values range [{}, {}), 
table_path={}, column_idx={}, column_name={}.",
+                    start,
+                    start + batchSize,
+                    tablePath,
+                    columnIdx,
+                    columnName);
+            segment = new AutoIncIdSegment(start, batchSize);
+        } catch (Exception e) {
+            throw new FlussRuntimeException(
+                    String.format(
+                            "Failed to fetch auto-increment values, 
table_path=%s, column_idx=%d, column_name=%s.",

Review Comment:
   I don't see the needs to log the `column_idx`, seems it's verbose here, 
maybe we can omit it. 



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncUpdater.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.types.DataType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * An {@link AutoIncUpdater} implementation that assigns auto-increment values 
to a specific column
+ * based on a fixed schema. It is bound to a particular schema version and 
assumes the
+ * auto-increment column position remains constant within that schema.
+ *
+ * <p>This class is not thread-safe and is intended to be used within a 
single-threaded execution
+ * context.
+ */
+@NotThreadSafe
+public class PerSchemaAutoIncUpdater implements AutoIncUpdater {
+    private final InternalRow.FieldGetter[] flussFieldGetters;
+    private final RowEncoder rowEncoder;
+    private final DataType[] fieldDataTypes;
+    private final int targetColumnIdx;
+    private final SequenceGenerator idGenerator;
+    private final short schemaId;
+
+    public PerSchemaAutoIncUpdater(
+            KvFormat kvFormat,
+            short schemaId,
+            Schema schema,
+            int autoIncColumnId,
+            SequenceGenerator sequenceGenerator) {
+        DataType[] fieldDataTypes = 
schema.getRowType().getChildren().toArray(new DataType[0]);
+
+        // getter for the fields in row
+        InternalRow.FieldGetter[] flussFieldGetters =
+                new InternalRow.FieldGetter[fieldDataTypes.length];
+        for (int i = 0; i < fieldDataTypes.length; i++) {
+            flussFieldGetters[i] = 
InternalRow.createFieldGetter(fieldDataTypes[i], i);
+        }
+        this.idGenerator = sequenceGenerator;
+        this.schemaId = schemaId;
+        this.targetColumnIdx = schema.getColumnIds().indexOf(autoIncColumnId);
+        if (targetColumnIdx == -1) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Auto-increment column ID %d not found in schema 
columns: %s",
+                            autoIncColumnId, schema.getColumnIds()));
+        }
+        this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
+        this.fieldDataTypes = fieldDataTypes;
+        this.flussFieldGetters = flussFieldGetters;
+    }
+
+    public BinaryValue updateAutoInc(BinaryValue rowValue) {
+        rowEncoder.startNewRow();
+        for (int i = 0; i < fieldDataTypes.length; i++) {
+            if (targetColumnIdx == i) {
+                rowEncoder.encodeField(i, idGenerator.nextVal());

Review Comment:
   We should check int overflow here and throw a `ApiException`. We can setting 
an upper bounds in `SegmentSequenceGenerator` according the data types. And add 
test in `SegmentSequenceGeneratorTest`. 
   
   
   The exception can be `SequenceOverflowException` or 
`AutoIncrementOverflowException` with messages like 
   ```
   reached maximum value of sequence "<field_name>" (2147483647)
   ```



##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -454,6 +455,181 @@ void testInvalidPrefixLookup() throws Exception {
                                 + "because the lookup columns [b, a] must 
contain all bucket keys [a, b] in order.");
     }
 
+    @Test
+    void testSingleBucketPutAutoIncColumnAndLookup() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("col1", DataTypes.STRING())
+                        .withComment("col1 is first column")
+                        .column("col2", DataTypes.BIGINT())
+                        .withComment("col2 is second column, auto increment 
column")
+                        .column("col3", DataTypes.STRING())
+                        .withComment("col3 is third column")
+                        .enableAutoIncrement("col2")
+                        .primaryKey("col1")
+                        .build();
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder().schema(schema).distributedBy(1, 
"col1").build();
+        // create the table
+        TablePath tablePath =
+                TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), 
"test_pk_table_auto_inc");
+        createTable(tablePath, tableDescriptor, true);
+        Table autoIncTable = conn.getTable(tablePath);
+        Object[][] records = {
+            {"a", null, "batch1"},
+            {"b", null, "batch1"},
+            {"c", null, "batch1"},
+            {"d", null, "batch1"},
+            {"e", null, "batch1"},
+            {"d", null, "batch2"},
+            {"e", null, "batch2"}
+        };
+        partialUpdateRecords(new String[] {"col1", "col3"}, records, 
autoIncTable);
+
+        Object[][] expectedRecords = {
+            {"a", 0L, "batch1"},

Review Comment:
   Since `AUTO_INCREMENT` columns in StarRocks, MySQL, Postgres, they all start 
value from `1`, rather than `0`. I suggest also following them to start from 
`1`. We can introduce `table.auto-increment.start-value` in the future to allow 
users to specifc a different starting value, like zero. 
   
   
https://docs.starrocks.io/docs/sql-reference/sql-statements/table_bucket_part_index/auto_increment/#introduction



##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -1456,7 +1456,18 @@ public class ConfigOptions {
                                     + "The auto increment column can only be 
used in primary-key table."
                                     + "With an auto increment column in the 
table, whenever a new row is inserted into the table, the new row will be 
assigned with the next available value from the auto-increment sequence."
                                     + "The auto increment column can only be 
used in primary-key table. The data type of the auto increment column must be 
INT or BIGINT."
-                                    + "Currently a table can have only one 
auto-increment column.");
+                                    + "Currently a table can have only one 
auto-increment column."
+                                    + "Adding an auto increment column to an 
existing table is not supported.");
+
+    public static final ConfigOption<Long> TABLE_AUTO_INC_BATCH_SIZE =
+            key("table.auto-inc.batch-size")

Review Comment:
   Suggest renaming to `table.auto-increment.cache-size`.
   
   - Prefer full, descriptive names when the config key isn’t overly long, this 
reduces ambiguity and improves readability.
   - Both FIP and StarRocks (`auto_increment_cache_size`) use “cache size” 
instead of “batch size.” This is more accurate, as the setting controls the 
size of the pre-allocated ID cache, not the batch in which IDs are consumed or 
flushed.



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java:
##########
@@ -248,6 +249,10 @@ public KvTablet getOrCreateKv(
 
                     File tabletDir = getOrCreateTabletDir(tablePath, 
tableBucket);
                     RowMerger merger = RowMerger.create(tableConfig, kvFormat, 
schemaGetter);
+                    AutoIncManager autoIncManager =
+                            new AutoIncManager(
+                                    schemaGetter, tablePath.getTablePath(), 
conf, zkClient);

Review Comment:
   The `conf` is a cluster configuration, not the table config. It's better to 
pass in the `TableConfig` to avoid mistakes. 



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncManager.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Manages auto-increment logic for tables, providing schema-specific updaters 
that handle
+ * auto-increment column assignment during row writes.
+ */
+@NotThreadSafe
+public class AutoIncManager {

Review Comment:
   Rename to `AutoIncrementManager` as the class name is not too long, having 
the complete class name will help developers to understand it, as `inc` has 
many different meanings. 



##########
fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java:
##########
@@ -696,9 +710,7 @@ private static List<Column> normalizeColumns(
             }
 
             // primary key and auto increment column should not nullable
-            if ((pkSet.contains(column.getName())
-                            || 
autoIncrementColumnNames.contains(column.getName()))

Review Comment:
   Why remove this? Auto increment columns must be not nullable. 



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java:
##########
@@ -450,22 +462,30 @@ private long processUpsert(
             KvPreWriteBuffer.Key key,
             BinaryValue currentValue,
             RowMerger currentMerger,
+            AutoIncUpdater currentAutoIncUpdater,
             ValueDecoder valueDecoder,
             WalBuilder walBuilder,
             PaddingRow latestSchemaRow,
             long logOffset)
             throws Exception {
-        // Optimization: when using WAL mode and merger is DefaultRowMerger 
(full update, not
-        // partial update), we can skip fetching old value for better 
performance since it
-        // always returns new value. In this case, both INSERT and UPDATE will 
produce
-        // UPDATE_AFTER.
-        if (changelogImage == ChangelogImage.WAL && currentMerger instanceof 
DefaultRowMerger) {
+        // Optimization: IN WAL mode,when using DefaultRowMerger (full update, 
not partial update)
+        // and there is no auto-increment column, we can skip fetching old 
value for better
+        // performance since the result always reflects the new value. In this 
case, both INSERT and
+        // UPDATE will produce UPDATE_AFTER.
+        if (changelogImage == ChangelogImage.WAL
+                && !currentAutoIncUpdater.hasAutoIncrement()
+                && currentMerger instanceof DefaultRowMerger) {
             return applyUpdate(key, null, currentValue, walBuilder, 
latestSchemaRow, logOffset);
         }
 
         byte[] oldValueBytes = getFromBufferOrKv(key);
         if (oldValueBytes == null) {
-            return applyInsert(key, currentValue, walBuilder, latestSchemaRow, 
logOffset);
+            return applyInsert(
+                    key,
+                    currentAutoIncUpdater.updateAutoInc(currentValue),

Review Comment:
   I suggest making the updates within the `applyInsert` method so that the 
logic can be reused wherever `applyInsert` is called.



##########
fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java:
##########
@@ -128,6 +128,15 @@ public int[] getPrimaryKeyIndexes() {
                 .orElseGet(() -> new int[0]);
     }
 
+    /** Returns the auto-increment columnIds, if any, otherwise returns an 
empty array. */

Review Comment:
   We should also update `org.apache.fluss.metadata.Schema.Builder#fromSchema` 
to consider the autoIncrementFields. And add tests in `TableSchemaTest`



##########
fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java:
##########
@@ -128,6 +128,15 @@ public int[] getPrimaryKeyIndexes() {
                 .orElseGet(() -> new int[0]);
     }
 
+    /** Returns the auto-increment columnIds, if any, otherwise returns an 
empty array. */
+    public int[] getAutoIncColumnIds() {
+        Set<String> autoIncColSet = new 
HashSet<>(getAutoIncrementColumnNames());
+        return getColumns().stream()
+                .filter(column -> autoIncColSet.contains(column.getName()))
+                .mapToInt(Column::getColumnId)
+                .toArray();

Review Comment:
   ```suggestion
           if (autoIncrementColumnNames.isEmpty()) {
               return new int[0];
           } else {
               return getColumns().stream()
                       .filter(column -> 
autoIncrementColumnNames.contains(column.getName()))
                       .mapToInt(Column::getColumnId)
                       .toArray();
           }
   ```
   
   Improve the performance



##########
fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentIncIDGeneratorTest.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.SequenceIDCounter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test class for {@link SegmentSequenceGenerator}. */
+class SegmentSequenceGeneratorTest {
+
+    private static final TablePath TABLE_PATH = new TablePath("test_db", 
"test_table");
+    private static final int COLUMN_IDX = 0;
+    private static final String COLUMN_NAME = "id";
+    private static final long BATCH_SIZE = 100;
+
+    private AtomicLong snapshotIdGenerator;
+    private Configuration configuration;
+
+    @BeforeEach
+    void setUp() {
+        snapshotIdGenerator = new AtomicLong(0);
+        Map<String, String> map = new HashMap<>();
+        map.put(ConfigOptions.TABLE_AUTO_INC_BATCH_SIZE.key(), 
String.valueOf(BATCH_SIZE));
+        configuration = Configuration.fromMap(map);
+    }
+
+    @Test
+    void testNextValBasicContinuousId() {
+        SegmentSequenceGenerator generator =
+                new SegmentSequenceGenerator(
+                        TABLE_PATH,
+                        COLUMN_IDX,
+                        COLUMN_NAME,
+                        new TestingSnapshotIDCounter(snapshotIdGenerator),
+                        configuration);
+        for (long i = 0; i < BATCH_SIZE; i++) {
+            assertThat(generator.nextVal()).isEqualTo(i);
+        }
+
+        for (long i = BATCH_SIZE; i < 2 * BATCH_SIZE; i++) {
+            assertThat(generator.nextVal()).isEqualTo(i);
+        }
+    }
+
+    @Test
+    void testMultiGenerator() {
+        ConcurrentLinkedDeque<Long> linkedDeque = new 
ConcurrentLinkedDeque<>();
+
+        for (int i = 0; i < 20; i++) {
+            new Thread(
+                            () -> {
+                                SegmentSequenceGenerator generator =
+                                        new SegmentSequenceGenerator(
+                                                new TablePath("test_db", 
"table1"),
+                                                COLUMN_IDX,
+                                                COLUMN_NAME + "_table1",
+                                                new 
TestingSnapshotIDCounter(snapshotIdGenerator),
+                                                configuration);
+                                for (int j = 0; j < 130; j++) {
+                                    linkedDeque.add(generator.nextVal());
+                                }
+                            })
+                    .start();

Review Comment:
   We should `.join()` the threads before assersion? Besides, would be better 
to also verify the distinct values size should be `130 * 20`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to