This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 84b204421d7af24db28b9582626c6e06423ffd48 Author: Jark Wu <[email protected]> AuthorDate: Tue Feb 3 17:21:56 2026 +0800 [kv] Improve validation for target columns on auto-increment tables --- .../java/org/apache/fluss/metadata/Schema.java | 11 --- .../row/decode/paimon/PaimonKeyDecoderTest.java | 3 +- .../fluss/flink/sink/FlinkTableSinkITCase.java | 93 ++++++++++++++++++++++ .../java/org/apache/fluss/server/kv/KvTablet.java | 17 +--- .../server/kv/autoinc/AutoIncrementManager.java | 20 +++-- .../server/kv/autoinc/AutoIncrementUpdater.java | 14 ++++ .../kv/autoinc/PerSchemaAutoIncrementUpdater.java | 33 ++++++++ .../server/kv/partialupdate/PartialUpdater.java | 7 ++ .../fluss/server/replica/ReplicaManager.java | 32 ++++---- .../org/apache/fluss/server/kv/KvTabletTest.java | 60 ++++++++++---- .../fluss/server/replica/ReplicaManagerTest.java | 50 +++--------- 11 files changed, 239 insertions(+), 101 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index a97f09ee9..92b675c42 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -151,17 +151,6 @@ public final class Schema implements Serializable { .orElseGet(() -> new int[0]); } - /** - * Returns column indexes excluding auto-increment columns, used for partial update operations. - */ - public @Nullable int[] getNonAutoIncrementColumnIndexes() { - if (autoIncrementColumnNames.isEmpty()) { - return null; - } - int autoIncIdx = getColumnIndexes(autoIncrementColumnNames)[0]; - return IntStream.range(0, columns.size()).filter(i -> i != autoIncIdx).toArray(); - } - /** Returns the auto-increment columnIds, if any, otherwise returns an empty array. */ public int[] getAutoIncrementColumnIds() { if (autoIncrementColumnNames.isEmpty()) { diff --git a/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java b/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java index 6066d827f..76c8065ea 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java @@ -141,7 +141,8 @@ class PaimonKeyDecoderTest { PaimonKeyDecoder decoder = new PaimonKeyDecoder(rowType, rowType.getFieldNames()); // Test short strings (≤7 bytes with 0x80 marker) and long strings (>7 bytes) - for (String testStr : Arrays.asList("", "a", "1234567", "12345678", "Hello, Paimon!")) { + for (String testStr : + Arrays.asList("", "a", "1234567", "12345678", "Hello, Paimon!", "你好,世界!")) { InternalRow original = createStringRow(testStr); byte[] encoded = encoder.encodeKey(original); InternalRow decoded = decoder.decodeKey(encoded); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java index f342379b1..73324a6ab 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java @@ -1670,6 +1670,99 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults); } + @Test + void testAutoIncrementWithTargetColumns() throws Exception { + // use single parallelism to make result ordering stable + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + String tableName = "auto_increment_pk_table"; + // Create a table with auto increment column + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " auto_increment_id bigint," + + " amount bigint," + + " primary key (id) not enforced" + + ") with ('auto-increment.fields'='auto_increment_id')", + tableName)); + + // Insert initial data specifying only id and amount + tEnv.executeSql( + String.format( + "INSERT INTO %s (id, amount) VALUES " + + "(1, 100), " + + "(2, 200), " + + "(3, 150), " + + "(4, 250)", + tableName)) + .await(); + + tEnv.executeSql( + String.format( + "INSERT INTO %s (id, amount) VALUES " + "(3, 350), " + "(5, 500)", + tableName)) + .await(); + + List<String> expectedResults = + Arrays.asList( + "+I[1, 1, 100]", + "+I[2, 2, 200]", + "+I[3, 3, 150]", + "+I[4, 4, 250]", + "-U[3, 3, 150]", + "+U[3, 3, 350]", + "+I[5, 5, 500]"); + + // Collect results with timeout + assertQueryResultExactOrder( + tEnv, + String.format( + "SELECT id, auto_increment_id, amount FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */", + tableName), + expectedResults); + + // verify invalid target columns that include auto-increment column + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "INSERT INTO %s (id, auto_increment_id) VALUES " + + "(6, 10)", + tableName)) + .await()) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "Explicitly specifying values for the auto increment column auto_increment_id is not allowed."); + + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "INSERT INTO %s VALUES " + "(6, 10, 600)", + tableName)) + .await()) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "This table has auto increment column [auto_increment_id]. " + + "Explicitly specifying values for an auto increment column is not allowed. " + + "Please specify non-auto-increment columns as target columns using partialUpdate first."); + + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "INSERT INTO %s (id, amount, auto_increment_id) VALUES " + + "(6, 600, 10)", + tableName)) + .await()) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "This table has auto increment column [auto_increment_id]. " + + "Explicitly specifying values for an auto increment column is not allowed. " + + "Please specify non-auto-increment columns as target columns using partialUpdate first."); + } + @Test void testWalModeWithAutoIncrement() throws Exception { // use single parallelism to make result ordering stable diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 0c711f4f3..8d279555a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -332,7 +332,7 @@ public final class KvTablet { autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId); // Validate targetColumns doesn't contain auto-increment column - validateTargetColumns(targetColumns, currentAutoIncrementUpdater, latestSchema); + currentAutoIncrementUpdater.validateTargetColumns(targetColumns); // Determine the row merger based on mergeMode: // - DEFAULT: Use the configured merge engine (rowMerger) @@ -411,21 +411,6 @@ public final class KvTablet { } } - private void validateTargetColumns( - int[] targetColumns, AutoIncrementUpdater autoIncrementUpdater, Schema schema) { - if (!autoIncrementUpdater.hasAutoIncrement() || targetColumns == null) { - return; - } - List<String> autoIncrementColumnNames = schema.getAutoIncrementColumnNames(); - for (int colIdx : targetColumns) { - if (autoIncrementColumnNames.contains(schema.getColumnName(colIdx))) { - throw new IllegalArgumentException( - "targetColumns must not include auto-increment column name: " - + schema.getColumnName(colIdx)); - } - } - } - private void processKvRecords( KvRecordBatch kvRecords, short schemaIdOfNewData, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java index 971ca4fa8..fca383c8f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java @@ -53,11 +53,6 @@ public class AutoIncrementManager { TablePath tablePath, TableConfig tableConf, SequenceGeneratorFactory seqGeneratorFactory) { - this.autoIncrementUpdaterCache = - Caffeine.newBuilder() - .maximumSize(5) - .expireAfterAccess(Duration.ofMinutes(5)) - .build(); this.schemaGetter = schemaGetter; int schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId(); Schema schema = schemaGetter.getSchema(schemaId); @@ -74,17 +69,28 @@ public class AutoIncrementManager { sequenceGenerator = seqGeneratorFactory.createSequenceGenerator( tablePath, autoIncrementColumn, tableConf.getAutoIncrementCacheSize()); + autoIncrementUpdaterCache = + Caffeine.newBuilder() + .maximumSize(5) + .expireAfterAccess(Duration.ofMinutes(5)) + .build(); } else { autoIncrementColumnId = -1; sequenceGenerator = null; + autoIncrementUpdaterCache = null; } } // Supports removing or reordering columns; does NOT support adding an auto-increment column to // an existing table. public AutoIncrementUpdater getUpdaterForSchema(KvFormat kvFormat, int latestSchemaId) { - return autoIncrementUpdaterCache.get( - latestSchemaId, k -> createAutoIncrementUpdater(kvFormat, k)); + if (autoIncrementColumnId == -1) { + // return no-op updater directly if there is no auto-increment column + return NO_OP_UPDATER; + } else { + return autoIncrementUpdaterCache.get( + latestSchemaId, k -> createAutoIncrementUpdater(kvFormat, k)); + } } private AutoIncrementUpdater createAutoIncrementUpdater(KvFormat kvFormat, int schemaId) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java index 4451cc0f0..aa555661a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java @@ -18,8 +18,11 @@ package org.apache.fluss.server.kv.autoinc; +import org.apache.fluss.exception.InvalidTargetColumnException; import org.apache.fluss.record.BinaryValue; +import javax.annotation.Nullable; + /** A updater to auto increment column . */ public interface AutoIncrementUpdater { @@ -35,6 +38,17 @@ public interface AutoIncrementUpdater { */ BinaryValue updateAutoIncrementColumns(BinaryValue rowValue); + /** + * Validates the target column indexes for auto-increment updates. + * + * @param targetColumnIndexes the indexes of the target columns to be validated, may be {@code + * null} + * @throws InvalidTargetColumnException if the target columns are invalid + */ + default void validateTargetColumns(@Nullable int[] targetColumnIndexes) { + // no op + } + /** * Returns whether this updater actually performs auto-increment logic. * diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java index b3e98fb9d..a07698a65 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.kv.autoinc; +import org.apache.fluss.exception.InvalidTargetColumnException; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.record.BinaryValue; @@ -26,6 +27,7 @@ import org.apache.fluss.row.encode.RowEncoder; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeRoot; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; /** @@ -42,6 +44,7 @@ public class PerSchemaAutoIncrementUpdater implements AutoIncrementUpdater { private final RowEncoder rowEncoder; private final int fieldLength; private final int targetColumnIdx; + private final String autoIncrementColumnName; private final SequenceGenerator sequenceGenerator; private final short schemaId; private final boolean requireInteger; @@ -69,11 +72,13 @@ public class PerSchemaAutoIncrementUpdater implements AutoIncrementUpdater { "Auto-increment column ID %d not found in schema columns: %s", autoIncrementColumnId, schema.getColumnIds())); } + this.autoIncrementColumnName = schema.getAutoIncrementColumnNames().get(0); this.requireInteger = fieldDataTypes[targetColumnIdx].is(DataTypeRoot.INTEGER); this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes); this.flussFieldGetters = flussFieldGetters; } + @Override public BinaryValue updateAutoIncrementColumns(BinaryValue rowValue) { rowEncoder.startNewRow(); for (int i = 0; i < fieldLength; i++) { @@ -93,6 +98,34 @@ public class PerSchemaAutoIncrementUpdater implements AutoIncrementUpdater { return new BinaryValue(schemaId, rowEncoder.finishRow()); } + @Override + public void validateTargetColumns(@Nullable int[] targetColumnIndexes) { + if (targetColumnIndexes != null) { + boolean found = false; + for (int idx : targetColumnIndexes) { + if (idx == targetColumnIdx) { + found = true; + break; + } + } + if (found) { + throw new InvalidTargetColumnException( + String.format( + "Auto-increment column [%s] at index %d must not be included in target columns.", + autoIncrementColumnName, targetColumnIdx)); + } + } else { + // The current table contains an auto-increment column, but no target columns have been + // specified, which implies all columns (including the auto-increment one) would be + // updated. This is not allowed. + throw new InvalidTargetColumnException( + String.format( + "The table contains an auto-increment column [%s], but update target columns are not explicitly specified. " + + "Please specify the update target columns and exclude the auto-increment column from them.", + autoIncrementColumnName)); + } + } + @Override public boolean hasAutoIncrement() { return true; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java index d8741e010..426bd37e6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java @@ -41,6 +41,7 @@ public class PartialUpdater { private final BitSet partialUpdateCols = new BitSet(); private final BitSet primaryKeyCols = new BitSet(); + private final boolean updatePrimaryKeyOnly; private final DataType[] fieldDataTypes; public PartialUpdater(KvFormat kvFormat, short schemaId, Schema schema, int[] targetColumns) { @@ -60,6 +61,7 @@ public class PartialUpdater { flussFieldGetters[i] = InternalRow.createFieldGetter(fieldDataTypes[i], i); } this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes); + this.updatePrimaryKeyOnly = partialUpdateCols.equals(primaryKeyCols); } private void sanityCheck(Schema schema, int[] targetColumns) { @@ -100,6 +102,11 @@ public class PartialUpdater { * @return the updated value (schema id + row bytes) */ public BinaryValue updateRow(@Nullable BinaryValue oldValue, BinaryValue partialValue) { + if (updatePrimaryKeyOnly && oldValue != null) { + // only primary key columns are updated, return the old value directly + return oldValue; + } + rowEncoder.startNewRow(); // write each field for (int i = 0; i < fieldDataTypes.length; i++) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index d1010918e..dda8cef8a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -142,6 +142,7 @@ import static org.apache.fluss.config.ConfigOptions.KV_FORMAT_VERSION_2; import static org.apache.fluss.server.TabletManagerBase.getTableInfo; import static org.apache.fluss.utils.FileUtils.isDirectoryEmpty; import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; import static org.apache.fluss.utils.Preconditions.checkState; import static org.apache.fluss.utils.concurrent.LockUtils.inLock; @@ -597,13 +598,14 @@ public class ReplicaManager { /** * Collect missing keys from lookup results for insertion, populating both context and batch - * maps. + * maps. And returns the schema of the table, may return null if there is no missing keys. */ - private void collectMissingKeysForInsert( + private Schema collectMissingKeysForInsert( Map<TableBucket, List<byte[]>> entriesPerBucket, Map<TableBucket, LookupResultForBucket> lookupResults, Map<TableBucket, MissingKeysContext> missingKeysContextMap, Map<TableBucket, KvRecordBatch> kvRecordBatchMap) { + Schema schema = null; for (Map.Entry<TableBucket, List<byte[]>> entry : entriesPerBucket.entrySet()) { TableBucket tb = entry.getKey(); LookupResultForBucket lookupResult = lookupResults.get(tb); @@ -622,13 +624,14 @@ public class ReplicaManager { } } if (!missingKeys.isEmpty()) { + TableInfo tableInfo = getReplicaOrException(tb).getTableInfo(); missingKeysContextMap.put(tb, new MissingKeysContext(missingIndexes, missingKeys)); - kvRecordBatchMap.put( - tb, - KeyRecordBatch.create( - missingKeys, getReplicaOrException(tb).getTableInfo())); + kvRecordBatchMap.put(tb, KeyRecordBatch.create(missingKeys, tableInfo)); + schema = tableInfo.getSchema(); } } + // this assumes all table buckets belong to the same table + return schema; } /** Lookup a single key value. */ @@ -706,19 +709,18 @@ public class ReplicaManager { checkArgument( timeoutMs != null && requiredAcks != null, "timeoutMs and requiredAcks must be set"); + Map<TableBucket, MissingKeysContext> entriesPerBucketToInsert = new HashMap<>(); Map<TableBucket, KvRecordBatch> produceEntryData = new HashMap<>(); - collectMissingKeysForInsert( - entriesPerBucket, - lookupResultForBucketMap, - entriesPerBucketToInsert, - produceEntryData); + Schema schema = + collectMissingKeysForInsert( + entriesPerBucket, + lookupResultForBucketMap, + entriesPerBucketToInsert, + produceEntryData); if (!produceEntryData.isEmpty()) { - // Compute target columns: exclude auto-increment to prevent overwriting - TableBucket firstBucket = produceEntryData.keySet().iterator().next(); - Schema schema = getReplicaOrException(firstBucket).getTableInfo().getSchema(); - + checkNotNull(schema, "Schema must be available for insert-if-not-exists"); // TODO: Performance optimization: during lookup-with-insert-if-not-exists flow, // the original key bytes are wrapped in KeyRecordBatch, then during putRecordsToKv // they are decoded to rows and immediately re-encoded back to key bytes, causing diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 582d27ece..1d59af396 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -290,17 +290,11 @@ class KvTabletTest { 0, Arrays.asList( // -- for batch 1 - ChangeType.INSERT, - ChangeType.INSERT, - ChangeType.UPDATE_BEFORE, - ChangeType.UPDATE_AFTER), + ChangeType.INSERT, ChangeType.INSERT), Arrays.asList( // for k1 new Object[] {1, null, null}, - // for k2: +I - new Object[] {2, null, null}, - // for k2: -U, +U - new Object[] {2, null, null}, + // for k2: +I, the second K2 update is skipped new Object[] {2, null, null}))); LogRecords actualLogRecords = readLogRecords(); @@ -618,7 +612,8 @@ class KvTabletTest { .build(); initLogTabletAndKvTablet(schema, new HashMap<>()); KvRecordTestUtils.KvRecordFactory recordFactory = - KvRecordTestUtils.KvRecordFactory.of(schema.getRowType()); + KvRecordTestUtils.KvRecordFactory.of( + schema.getRowType().project(Collections.singletonList("user_name"))); // start threads to put records List<Future<LogAppendInfo>> putFutures = new ArrayList<>(); @@ -629,12 +624,12 @@ class KvTabletTest { KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords( Arrays.asList( - recordFactory.ofRecord(k1.getBytes(), new Object[] {k1, null}), - recordFactory.ofRecord(k2.getBytes(), new Object[] {k2, null}), - recordFactory.ofRecord( - k3.getBytes(), new Object[] {k3, null}))); + recordFactory.ofRecord(k1.getBytes(), new Object[] {k1}), + recordFactory.ofRecord(k2.getBytes(), new Object[] {k2}), + recordFactory.ofRecord(k3.getBytes(), new Object[] {k3}))); // test concurrent putting to test thread-safety of AutoIncrementManager - putFutures.add(executor.submit(() -> kvTablet.putAsLeader(kvRecordBatch1, null))); + putFutures.add( + executor.submit(() -> kvTablet.putAsLeader(kvRecordBatch1, new int[] {0}))); } // wait for all putting finished @@ -662,6 +657,43 @@ class KvTabletTest { assertThat(actualUids).isEqualTo(expectedUids); } + @Test + void testAutoIncrementWithInvalidTargetColumns() throws Exception { + Schema schema = + Schema.newBuilder() + .column("user_name", DataTypes.STRING()) + .column("uid", DataTypes.INT()) + .column("age", DataTypes.INT()) + .primaryKey("user_name") + .enableAutoIncrement("uid") + .build(); + initLogTabletAndKvTablet(schema, new HashMap<>()); + KvRecordTestUtils.KvRecordFactory recordFactory = + KvRecordTestUtils.KvRecordFactory.of(schema.getRowType()); + + KvRecordBatch kvRecordBatch = + kvRecordBatchFactory.ofRecords( + Arrays.asList( + recordFactory.ofRecord( + "k1".getBytes(), new Object[] {"k1", null, null}), + recordFactory.ofRecord( + "k2".getBytes(), new Object[] {"k2", null, null}))); + // target columns contain auto-increment column + assertThatThrownBy(() -> kvTablet.putAsLeader(kvRecordBatch, new int[] {0, 1})) + .isInstanceOf(InvalidTargetColumnException.class) + .hasMessageContaining( + "Auto-increment column [uid] at index 1 must not be included in target columns."); + + // no specify target columns, which is also invalid for auto-increment + assertThatThrownBy(() -> kvTablet.putAsLeader(kvRecordBatch, null)) + .isInstanceOf(InvalidTargetColumnException.class) + .hasMessageContaining( + "The table contains an auto-increment column [uid], but update target columns are not explicitly specified."); + + // valid case: target columns don't contain auto-increment column + kvTablet.putAsLeader(kvRecordBatch, new int[] {0, 2}); + } + @Test void testPutAsLeaderWithOutOfOrderSequenceException() throws Exception { initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index 8472964ab..2fd268d5e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -906,7 +906,6 @@ class ReplicaManagerTest extends ReplicaTestBase { assertLogRecordsEquals(DATA3_ROW_TYPE, records, expected, ChangeType.INSERT, schemaGetter2); } - // Fixme @Test void testConcurrentLookupWithInsertIfNotExistsAutoIncrement() throws Exception { TableBucket tb = new TableBucket(DATA3_TABLE_ID_PK_AUTO_INC, 1); @@ -992,45 +991,22 @@ class ReplicaManagerTest extends ReplicaTestBase { // Values should be 1, 2, 3 (in any order due to concurrency) assertThat(autoIncrementValues).containsExactlyInAnyOrder(1L, 2L, 3L); - // Verify WAL: 3 INSERTs minimum, up to 15 if all concurrent updates execute + // Verify exactly 3 changelog entries were written (one per unique key) FetchLogResultForBucket logResult = fetchLog(tb, 0L); + // Only the first upsert for a given primary key generates changelog records. Subsequent + // upserts on the same primary key produce empty batches, but still increment the log + // offset. As a result, the High Watermark may exceed 3 due to these empty batches advancing + // the offset. assertThat(logResult.getHighWatermark()).isBetween(3L, 15L); - - // Decode all WAL records + // Verify log records contain the expected keys with INSERT change type LogRecords records = logResult.records(); - LogRecordReadContext readContext = - LogRecordReadContext.createArrowReadContext( - DATA3_ROW_TYPE, - DEFAULT_SCHEMA_ID, - new TestingSchemaGetter(DEFAULT_SCHEMA_ID, DATA3_SCHEMA_PK_AUTO_INC)); - - List<Tuple2<ChangeType, Long>> recordsWithAutoInc = new ArrayList<>(); - for (LogRecordBatch batch : records.batches()) { - try (CloseableIterator<LogRecord> iterator = batch.records(readContext)) { - while (iterator.hasNext()) { - LogRecord record = iterator.next(); - recordsWithAutoInc.add( - Tuple2.of(record.getChangeType(), record.getRow().getLong(2))); - } - } - } - - // First 3 must be INSERTs with unique auto-increment values (1, 2, 3) - assertThat(recordsWithAutoInc).hasSizeGreaterThanOrEqualTo(3); - Set<Long> insertAutoIncs = new HashSet<>(); - for (int i = 0; i < 3; i++) { - assertThat(recordsWithAutoInc.get(i).f0).isEqualTo(ChangeType.INSERT); - insertAutoIncs.add(recordsWithAutoInc.get(i).f1); - } - assertThat(insertAutoIncs).containsExactlyInAnyOrder(1L, 2L, 3L); - - // Remaining 12 must be UPDATE pairs (UPDATE_BEFORE + UPDATE_AFTER) preserving - // auto-increment - for (int i = 3; i < recordsWithAutoInc.size(); i++) { - Tuple2<ChangeType, Long> record = recordsWithAutoInc.get(i); - assertThat(record.f0).isIn(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER); - assertThat(record.f1).isIn(1L, 2L, 3L); - } + List<Object[]> expected = + Arrays.asList( + new Object[] {100, null, 1L}, + new Object[] {200, null, 2L}, + new Object[] {300, null, 3L}); + // only 3 INSERT changelogs generated, the rest are empty batches + assertLogRecordsEquals(DATA3_ROW_TYPE, records, expected, ChangeType.INSERT, schemaGetter); } @Test
