This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 84b204421d7af24db28b9582626c6e06423ffd48
Author: Jark Wu <[email protected]>
AuthorDate: Tue Feb 3 17:21:56 2026 +0800

    [kv] Improve validation for target columns on auto-increment tables
---
 .../java/org/apache/fluss/metadata/Schema.java     | 11 ---
 .../row/decode/paimon/PaimonKeyDecoderTest.java    |  3 +-
 .../fluss/flink/sink/FlinkTableSinkITCase.java     | 93 ++++++++++++++++++++++
 .../java/org/apache/fluss/server/kv/KvTablet.java  | 17 +---
 .../server/kv/autoinc/AutoIncrementManager.java    | 20 +++--
 .../server/kv/autoinc/AutoIncrementUpdater.java    | 14 ++++
 .../kv/autoinc/PerSchemaAutoIncrementUpdater.java  | 33 ++++++++
 .../server/kv/partialupdate/PartialUpdater.java    |  7 ++
 .../fluss/server/replica/ReplicaManager.java       | 32 ++++----
 .../org/apache/fluss/server/kv/KvTabletTest.java   | 60 ++++++++++----
 .../fluss/server/replica/ReplicaManagerTest.java   | 50 +++---------
 11 files changed, 239 insertions(+), 101 deletions(-)

diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index a97f09ee9..92b675c42 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -151,17 +151,6 @@ public final class Schema implements Serializable {
                 .orElseGet(() -> new int[0]);
     }
 
-    /**
-     * Returns column indexes excluding auto-increment columns, used for 
partial update operations.
-     */
-    public @Nullable int[] getNonAutoIncrementColumnIndexes() {
-        if (autoIncrementColumnNames.isEmpty()) {
-            return null;
-        }
-        int autoIncIdx = getColumnIndexes(autoIncrementColumnNames)[0];
-        return IntStream.range(0, columns.size()).filter(i -> i != 
autoIncIdx).toArray();
-    }
-
     /** Returns the auto-increment columnIds, if any, otherwise returns an 
empty array. */
     public int[] getAutoIncrementColumnIds() {
         if (autoIncrementColumnNames.isEmpty()) {
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java
index 6066d827f..76c8065ea 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java
@@ -141,7 +141,8 @@ class PaimonKeyDecoderTest {
         PaimonKeyDecoder decoder = new PaimonKeyDecoder(rowType, 
rowType.getFieldNames());
 
         // Test short strings (≤7 bytes with 0x80 marker) and long strings (>7 
bytes)
-        for (String testStr : Arrays.asList("", "a", "1234567", "12345678", 
"Hello, Paimon!")) {
+        for (String testStr :
+                Arrays.asList("", "a", "1234567", "12345678", "Hello, 
Paimon!", "你好,世界!")) {
             InternalRow original = createStringRow(testStr);
             byte[] encoded = encoder.encodeKey(original);
             InternalRow decoded = decoder.decodeKey(encoded);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index f342379b1..73324a6ab 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1670,6 +1670,99 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults);
     }
 
+    @Test
+    void testAutoIncrementWithTargetColumns() throws Exception {
+        // use single parallelism to make result ordering stable
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+
+        String tableName = "auto_increment_pk_table";
+        // Create a table with auto increment column
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " id int not null,"
+                                + " auto_increment_id bigint,"
+                                + " amount bigint,"
+                                + " primary key (id) not enforced"
+                                + ") with 
('auto-increment.fields'='auto_increment_id')",
+                        tableName));
+
+        // Insert initial data specifying only id and amount
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s (id, amount) VALUES "
+                                        + "(1, 100), "
+                                        + "(2, 200), "
+                                        + "(3, 150), "
+                                        + "(4, 250)",
+                                tableName))
+                .await();
+
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s (id, amount) VALUES " + "(3, 
350), " + "(5, 500)",
+                                tableName))
+                .await();
+
+        List<String> expectedResults =
+                Arrays.asList(
+                        "+I[1, 1, 100]",
+                        "+I[2, 2, 200]",
+                        "+I[3, 3, 150]",
+                        "+I[4, 4, 250]",
+                        "-U[3, 3, 150]",
+                        "+U[3, 3, 350]",
+                        "+I[5, 5, 500]");
+
+        // Collect results with timeout
+        assertQueryResultExactOrder(
+                tEnv,
+                String.format(
+                        "SELECT id, auto_increment_id, amount FROM %s /*+ 
OPTIONS('scan.startup.mode' = 'earliest') */",
+                        tableName),
+                expectedResults);
+
+        // verify invalid target columns that include auto-increment column
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "INSERT INTO %s (id, 
auto_increment_id) VALUES "
+                                                                + "(6, 10)",
+                                                        tableName))
+                                        .await())
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Explicitly specifying values for the auto increment 
column auto_increment_id is not allowed.");
+
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "INSERT INTO %s VALUES 
" + "(6, 10, 600)",
+                                                        tableName))
+                                        .await())
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "This table has auto increment column 
[auto_increment_id]. "
+                                + "Explicitly specifying values for an auto 
increment column is not allowed. "
+                                + "Please specify non-auto-increment columns 
as target columns using partialUpdate first.");
+
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "INSERT INTO %s (id, 
amount, auto_increment_id) VALUES "
+                                                                + "(6, 600, 
10)",
+                                                        tableName))
+                                        .await())
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "This table has auto increment column 
[auto_increment_id]. "
+                                + "Explicitly specifying values for an auto 
increment column is not allowed. "
+                                + "Please specify non-auto-increment columns 
as target columns using partialUpdate first.");
+    }
+
     @Test
     void testWalModeWithAutoIncrement() throws Exception {
         // use single parallelism to make result ordering stable
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 0c711f4f3..8d279555a 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -332,7 +332,7 @@ public final class KvTablet {
                             autoIncrementManager.getUpdaterForSchema(kvFormat, 
latestSchemaId);
 
                     // Validate targetColumns doesn't contain auto-increment 
column
-                    validateTargetColumns(targetColumns, 
currentAutoIncrementUpdater, latestSchema);
+                    
currentAutoIncrementUpdater.validateTargetColumns(targetColumns);
 
                     // Determine the row merger based on mergeMode:
                     // - DEFAULT: Use the configured merge engine (rowMerger)
@@ -411,21 +411,6 @@ public final class KvTablet {
         }
     }
 
-    private void validateTargetColumns(
-            int[] targetColumns, AutoIncrementUpdater autoIncrementUpdater, 
Schema schema) {
-        if (!autoIncrementUpdater.hasAutoIncrement() || targetColumns == null) 
{
-            return;
-        }
-        List<String> autoIncrementColumnNames = 
schema.getAutoIncrementColumnNames();
-        for (int colIdx : targetColumns) {
-            if 
(autoIncrementColumnNames.contains(schema.getColumnName(colIdx))) {
-                throw new IllegalArgumentException(
-                        "targetColumns must not include auto-increment column 
name: "
-                                + schema.getColumnName(colIdx));
-            }
-        }
-    }
-
     private void processKvRecords(
             KvRecordBatch kvRecords,
             short schemaIdOfNewData,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
index 971ca4fa8..fca383c8f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java
@@ -53,11 +53,6 @@ public class AutoIncrementManager {
             TablePath tablePath,
             TableConfig tableConf,
             SequenceGeneratorFactory seqGeneratorFactory) {
-        this.autoIncrementUpdaterCache =
-                Caffeine.newBuilder()
-                        .maximumSize(5)
-                        .expireAfterAccess(Duration.ofMinutes(5))
-                        .build();
         this.schemaGetter = schemaGetter;
         int schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
         Schema schema = schemaGetter.getSchema(schemaId);
@@ -74,17 +69,28 @@ public class AutoIncrementManager {
             sequenceGenerator =
                     seqGeneratorFactory.createSequenceGenerator(
                             tablePath, autoIncrementColumn, 
tableConf.getAutoIncrementCacheSize());
+            autoIncrementUpdaterCache =
+                    Caffeine.newBuilder()
+                            .maximumSize(5)
+                            .expireAfterAccess(Duration.ofMinutes(5))
+                            .build();
         } else {
             autoIncrementColumnId = -1;
             sequenceGenerator = null;
+            autoIncrementUpdaterCache = null;
         }
     }
 
     // Supports removing or reordering columns; does NOT support adding an 
auto-increment column to
     // an existing table.
     public AutoIncrementUpdater getUpdaterForSchema(KvFormat kvFormat, int 
latestSchemaId) {
-        return autoIncrementUpdaterCache.get(
-                latestSchemaId, k -> createAutoIncrementUpdater(kvFormat, k));
+        if (autoIncrementColumnId == -1) {
+            // return no-op updater directly if there is no auto-increment 
column
+            return NO_OP_UPDATER;
+        } else {
+            return autoIncrementUpdaterCache.get(
+                    latestSchemaId, k -> createAutoIncrementUpdater(kvFormat, 
k));
+        }
     }
 
     private AutoIncrementUpdater createAutoIncrementUpdater(KvFormat kvFormat, 
int schemaId) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java
index 4451cc0f0..aa555661a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java
@@ -18,8 +18,11 @@
 
 package org.apache.fluss.server.kv.autoinc;
 
+import org.apache.fluss.exception.InvalidTargetColumnException;
 import org.apache.fluss.record.BinaryValue;
 
+import javax.annotation.Nullable;
+
 /** A updater to auto increment column . */
 public interface AutoIncrementUpdater {
 
@@ -35,6 +38,17 @@ public interface AutoIncrementUpdater {
      */
     BinaryValue updateAutoIncrementColumns(BinaryValue rowValue);
 
+    /**
+     * Validates the target column indexes for auto-increment updates.
+     *
+     * @param targetColumnIndexes the indexes of the target columns to be 
validated, may be {@code
+     *     null}
+     * @throws InvalidTargetColumnException if the target columns are invalid
+     */
+    default void validateTargetColumns(@Nullable int[] targetColumnIndexes) {
+        // no op
+    }
+
     /**
      * Returns whether this updater actually performs auto-increment logic.
      *
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
index b3e98fb9d..a07698a65 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java
@@ -18,6 +18,7 @@
 
 package org.apache.fluss.server.kv.autoinc;
 
+import org.apache.fluss.exception.InvalidTargetColumnException;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.record.BinaryValue;
@@ -26,6 +27,7 @@ import org.apache.fluss.row.encode.RowEncoder;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypeRoot;
 
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 
 /**
@@ -42,6 +44,7 @@ public class PerSchemaAutoIncrementUpdater implements 
AutoIncrementUpdater {
     private final RowEncoder rowEncoder;
     private final int fieldLength;
     private final int targetColumnIdx;
+    private final String autoIncrementColumnName;
     private final SequenceGenerator sequenceGenerator;
     private final short schemaId;
     private final boolean requireInteger;
@@ -69,11 +72,13 @@ public class PerSchemaAutoIncrementUpdater implements 
AutoIncrementUpdater {
                             "Auto-increment column ID %d not found in schema 
columns: %s",
                             autoIncrementColumnId, schema.getColumnIds()));
         }
+        this.autoIncrementColumnName = 
schema.getAutoIncrementColumnNames().get(0);
         this.requireInteger = 
fieldDataTypes[targetColumnIdx].is(DataTypeRoot.INTEGER);
         this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
         this.flussFieldGetters = flussFieldGetters;
     }
 
+    @Override
     public BinaryValue updateAutoIncrementColumns(BinaryValue rowValue) {
         rowEncoder.startNewRow();
         for (int i = 0; i < fieldLength; i++) {
@@ -93,6 +98,34 @@ public class PerSchemaAutoIncrementUpdater implements 
AutoIncrementUpdater {
         return new BinaryValue(schemaId, rowEncoder.finishRow());
     }
 
+    @Override
+    public void validateTargetColumns(@Nullable int[] targetColumnIndexes) {
+        if (targetColumnIndexes != null) {
+            boolean found = false;
+            for (int idx : targetColumnIndexes) {
+                if (idx == targetColumnIdx) {
+                    found = true;
+                    break;
+                }
+            }
+            if (found) {
+                throw new InvalidTargetColumnException(
+                        String.format(
+                                "Auto-increment column [%s] at index %d must 
not be included in target columns.",
+                                autoIncrementColumnName, targetColumnIdx));
+            }
+        } else {
+            // The current table contains an auto-increment column, but no 
target columns have been
+            // specified, which implies all columns (including the 
auto-increment one) would be
+            // updated. This is not allowed.
+            throw new InvalidTargetColumnException(
+                    String.format(
+                            "The table contains an auto-increment column [%s], 
but update target columns are not explicitly specified. "
+                                    + "Please specify the update target 
columns and exclude the auto-increment column from them.",
+                            autoIncrementColumnName));
+        }
+    }
+
     @Override
     public boolean hasAutoIncrement() {
         return true;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
index d8741e010..426bd37e6 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
@@ -41,6 +41,7 @@ public class PartialUpdater {
 
     private final BitSet partialUpdateCols = new BitSet();
     private final BitSet primaryKeyCols = new BitSet();
+    private final boolean updatePrimaryKeyOnly;
     private final DataType[] fieldDataTypes;
 
     public PartialUpdater(KvFormat kvFormat, short schemaId, Schema schema, 
int[] targetColumns) {
@@ -60,6 +61,7 @@ public class PartialUpdater {
             flussFieldGetters[i] = 
InternalRow.createFieldGetter(fieldDataTypes[i], i);
         }
         this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
+        this.updatePrimaryKeyOnly = partialUpdateCols.equals(primaryKeyCols);
     }
 
     private void sanityCheck(Schema schema, int[] targetColumns) {
@@ -100,6 +102,11 @@ public class PartialUpdater {
      * @return the updated value (schema id + row bytes)
      */
     public BinaryValue updateRow(@Nullable BinaryValue oldValue, BinaryValue 
partialValue) {
+        if (updatePrimaryKeyOnly && oldValue != null) {
+            // only primary key columns are updated, return the old value 
directly
+            return oldValue;
+        }
+
         rowEncoder.startNewRow();
         // write each field
         for (int i = 0; i < fieldDataTypes.length; i++) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index d1010918e..dda8cef8a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -142,6 +142,7 @@ import static 
org.apache.fluss.config.ConfigOptions.KV_FORMAT_VERSION_2;
 import static org.apache.fluss.server.TabletManagerBase.getTableInfo;
 import static org.apache.fluss.utils.FileUtils.isDirectoryEmpty;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
 import static org.apache.fluss.utils.Preconditions.checkState;
 import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
 
@@ -597,13 +598,14 @@ public class ReplicaManager {
 
     /**
      * Collect missing keys from lookup results for insertion, populating both 
context and batch
-     * maps.
+     * maps. And returns the schema of the table, may return null if there is 
no missing keys.
      */
-    private void collectMissingKeysForInsert(
+    private Schema collectMissingKeysForInsert(
             Map<TableBucket, List<byte[]>> entriesPerBucket,
             Map<TableBucket, LookupResultForBucket> lookupResults,
             Map<TableBucket, MissingKeysContext> missingKeysContextMap,
             Map<TableBucket, KvRecordBatch> kvRecordBatchMap) {
+        Schema schema = null;
         for (Map.Entry<TableBucket, List<byte[]>> entry : 
entriesPerBucket.entrySet()) {
             TableBucket tb = entry.getKey();
             LookupResultForBucket lookupResult = lookupResults.get(tb);
@@ -622,13 +624,14 @@ public class ReplicaManager {
                 }
             }
             if (!missingKeys.isEmpty()) {
+                TableInfo tableInfo = getReplicaOrException(tb).getTableInfo();
                 missingKeysContextMap.put(tb, new 
MissingKeysContext(missingIndexes, missingKeys));
-                kvRecordBatchMap.put(
-                        tb,
-                        KeyRecordBatch.create(
-                                missingKeys, 
getReplicaOrException(tb).getTableInfo()));
+                kvRecordBatchMap.put(tb, KeyRecordBatch.create(missingKeys, 
tableInfo));
+                schema = tableInfo.getSchema();
             }
         }
+        // this assumes all table buckets belong to the same table
+        return schema;
     }
 
     /** Lookup a single key value. */
@@ -706,19 +709,18 @@ public class ReplicaManager {
             checkArgument(
                     timeoutMs != null && requiredAcks != null,
                     "timeoutMs and requiredAcks must be set");
+
             Map<TableBucket, MissingKeysContext> entriesPerBucketToInsert = 
new HashMap<>();
             Map<TableBucket, KvRecordBatch> produceEntryData = new HashMap<>();
-            collectMissingKeysForInsert(
-                    entriesPerBucket,
-                    lookupResultForBucketMap,
-                    entriesPerBucketToInsert,
-                    produceEntryData);
+            Schema schema =
+                    collectMissingKeysForInsert(
+                            entriesPerBucket,
+                            lookupResultForBucketMap,
+                            entriesPerBucketToInsert,
+                            produceEntryData);
 
             if (!produceEntryData.isEmpty()) {
-                // Compute target columns: exclude auto-increment to prevent 
overwriting
-                TableBucket firstBucket = 
produceEntryData.keySet().iterator().next();
-                Schema schema = 
getReplicaOrException(firstBucket).getTableInfo().getSchema();
-
+                checkNotNull(schema, "Schema must be available for 
insert-if-not-exists");
                 // TODO: Performance optimization: during 
lookup-with-insert-if-not-exists flow,
                 // the original key bytes are wrapped in KeyRecordBatch, then 
during putRecordsToKv
                 // they are decoded to rows and immediately re-encoded back to 
key bytes, causing
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 582d27ece..1d59af396 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -290,17 +290,11 @@ class KvTabletTest {
                                 0,
                                 Arrays.asList(
                                         // -- for batch 1
-                                        ChangeType.INSERT,
-                                        ChangeType.INSERT,
-                                        ChangeType.UPDATE_BEFORE,
-                                        ChangeType.UPDATE_AFTER),
+                                        ChangeType.INSERT, ChangeType.INSERT),
                                 Arrays.asList(
                                         // for k1
                                         new Object[] {1, null, null},
-                                        // for k2: +I
-                                        new Object[] {2, null, null},
-                                        // for k2: -U, +U
-                                        new Object[] {2, null, null},
+                                        // for k2: +I, the second K2 update is 
skipped
                                         new Object[] {2, null, null})));
 
         LogRecords actualLogRecords = readLogRecords();
@@ -618,7 +612,8 @@ class KvTabletTest {
                         .build();
         initLogTabletAndKvTablet(schema, new HashMap<>());
         KvRecordTestUtils.KvRecordFactory recordFactory =
-                KvRecordTestUtils.KvRecordFactory.of(schema.getRowType());
+                KvRecordTestUtils.KvRecordFactory.of(
+                        
schema.getRowType().project(Collections.singletonList("user_name")));
 
         // start threads to put records
         List<Future<LogAppendInfo>> putFutures = new ArrayList<>();
@@ -629,12 +624,12 @@ class KvTabletTest {
             KvRecordBatch kvRecordBatch1 =
                     kvRecordBatchFactory.ofRecords(
                             Arrays.asList(
-                                    recordFactory.ofRecord(k1.getBytes(), new 
Object[] {k1, null}),
-                                    recordFactory.ofRecord(k2.getBytes(), new 
Object[] {k2, null}),
-                                    recordFactory.ofRecord(
-                                            k3.getBytes(), new Object[] {k3, 
null})));
+                                    recordFactory.ofRecord(k1.getBytes(), new 
Object[] {k1}),
+                                    recordFactory.ofRecord(k2.getBytes(), new 
Object[] {k2}),
+                                    recordFactory.ofRecord(k3.getBytes(), new 
Object[] {k3})));
             // test concurrent putting to test thread-safety of 
AutoIncrementManager
-            putFutures.add(executor.submit(() -> 
kvTablet.putAsLeader(kvRecordBatch1, null)));
+            putFutures.add(
+                    executor.submit(() -> kvTablet.putAsLeader(kvRecordBatch1, 
new int[] {0})));
         }
 
         // wait for all putting finished
@@ -662,6 +657,43 @@ class KvTabletTest {
         assertThat(actualUids).isEqualTo(expectedUids);
     }
 
+    @Test
+    void testAutoIncrementWithInvalidTargetColumns() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("user_name", DataTypes.STRING())
+                        .column("uid", DataTypes.INT())
+                        .column("age", DataTypes.INT())
+                        .primaryKey("user_name")
+                        .enableAutoIncrement("uid")
+                        .build();
+        initLogTabletAndKvTablet(schema, new HashMap<>());
+        KvRecordTestUtils.KvRecordFactory recordFactory =
+                KvRecordTestUtils.KvRecordFactory.of(schema.getRowType());
+
+        KvRecordBatch kvRecordBatch =
+                kvRecordBatchFactory.ofRecords(
+                        Arrays.asList(
+                                recordFactory.ofRecord(
+                                        "k1".getBytes(), new Object[] {"k1", 
null, null}),
+                                recordFactory.ofRecord(
+                                        "k2".getBytes(), new Object[] {"k2", 
null, null})));
+        // target columns contain auto-increment column
+        assertThatThrownBy(() -> kvTablet.putAsLeader(kvRecordBatch, new int[] 
{0, 1}))
+                .isInstanceOf(InvalidTargetColumnException.class)
+                .hasMessageContaining(
+                        "Auto-increment column [uid] at index 1 must not be 
included in target columns.");
+
+        // no specify target columns, which is also invalid for auto-increment
+        assertThatThrownBy(() -> kvTablet.putAsLeader(kvRecordBatch, null))
+                .isInstanceOf(InvalidTargetColumnException.class)
+                .hasMessageContaining(
+                        "The table contains an auto-increment column [uid], 
but update target columns are not explicitly specified.");
+
+        // valid case: target columns don't contain auto-increment column
+        kvTablet.putAsLeader(kvRecordBatch, new int[] {0, 2});
+    }
+
     @Test
     void testPutAsLeaderWithOutOfOrderSequenceException() throws Exception {
         initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index 8472964ab..2fd268d5e 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -906,7 +906,6 @@ class ReplicaManagerTest extends ReplicaTestBase {
         assertLogRecordsEquals(DATA3_ROW_TYPE, records, expected, 
ChangeType.INSERT, schemaGetter2);
     }
 
-    // Fixme
     @Test
     void testConcurrentLookupWithInsertIfNotExistsAutoIncrement() throws 
Exception {
         TableBucket tb = new TableBucket(DATA3_TABLE_ID_PK_AUTO_INC, 1);
@@ -992,45 +991,22 @@ class ReplicaManagerTest extends ReplicaTestBase {
         // Values should be 1, 2, 3 (in any order due to concurrency)
         assertThat(autoIncrementValues).containsExactlyInAnyOrder(1L, 2L, 3L);
 
-        // Verify WAL: 3 INSERTs minimum, up to 15 if all concurrent updates 
execute
+        // Verify exactly 3 changelog entries were written (one per unique key)
         FetchLogResultForBucket logResult = fetchLog(tb, 0L);
+        // Only the first upsert for a given primary key generates changelog 
records. Subsequent
+        // upserts on the same primary key produce empty batches, but still 
increment the log
+        // offset. As a result, the High Watermark may exceed 3 due to these 
empty batches advancing
+        // the offset.
         assertThat(logResult.getHighWatermark()).isBetween(3L, 15L);
-
-        // Decode all WAL records
+        // Verify log records contain the expected keys with INSERT change type
         LogRecords records = logResult.records();
-        LogRecordReadContext readContext =
-                LogRecordReadContext.createArrowReadContext(
-                        DATA3_ROW_TYPE,
-                        DEFAULT_SCHEMA_ID,
-                        new TestingSchemaGetter(DEFAULT_SCHEMA_ID, 
DATA3_SCHEMA_PK_AUTO_INC));
-
-        List<Tuple2<ChangeType, Long>> recordsWithAutoInc = new ArrayList<>();
-        for (LogRecordBatch batch : records.batches()) {
-            try (CloseableIterator<LogRecord> iterator = 
batch.records(readContext)) {
-                while (iterator.hasNext()) {
-                    LogRecord record = iterator.next();
-                    recordsWithAutoInc.add(
-                            Tuple2.of(record.getChangeType(), 
record.getRow().getLong(2)));
-                }
-            }
-        }
-
-        // First 3 must be INSERTs with unique auto-increment values (1, 2, 3)
-        assertThat(recordsWithAutoInc).hasSizeGreaterThanOrEqualTo(3);
-        Set<Long> insertAutoIncs = new HashSet<>();
-        for (int i = 0; i < 3; i++) {
-            
assertThat(recordsWithAutoInc.get(i).f0).isEqualTo(ChangeType.INSERT);
-            insertAutoIncs.add(recordsWithAutoInc.get(i).f1);
-        }
-        assertThat(insertAutoIncs).containsExactlyInAnyOrder(1L, 2L, 3L);
-
-        // Remaining 12 must be UPDATE pairs (UPDATE_BEFORE + UPDATE_AFTER) 
preserving
-        // auto-increment
-        for (int i = 3; i < recordsWithAutoInc.size(); i++) {
-            Tuple2<ChangeType, Long> record = recordsWithAutoInc.get(i);
-            assertThat(record.f0).isIn(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER);
-            assertThat(record.f1).isIn(1L, 2L, 3L);
-        }
+        List<Object[]> expected =
+                Arrays.asList(
+                        new Object[] {100, null, 1L},
+                        new Object[] {200, null, 2L},
+                        new Object[] {300, null, 3L});
+        // only 3 INSERT changelogs generated, the rest are empty batches
+        assertLogRecordsEquals(DATA3_ROW_TYPE, records, expected, 
ChangeType.INSERT, schemaGetter);
     }
 
     @Test

Reply via email to