This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit dc8cc6514e5b6a4a5673ad4dcde1617322fac554 Author: Junbo Wang <[email protected]> AuthorDate: Tue Jan 6 14:50:55 2026 +0800 [kv] Add basic implementation of AUTO_INCREMENT column (#2161) --- .../fluss/client/table/FlussTableITCase.java | 176 ++++++++++++++++++++ .../org/apache/fluss/config/ConfigOptions.java | 19 +-- .../java/org/apache/fluss/config/TableConfig.java | 5 + .../fluss/exception/SequenceOverflowException.java | 30 ++++ .../org/apache/fluss/metadata/ChangelogImage.java | 11 +- .../java/org/apache/fluss/metadata/Schema.java | 28 +++- .../org/apache/fluss/metadata/TableSchemaTest.java | 14 ++ .../fluss/utils/json/SchemaJsonSerdeTest.java | 2 +- .../apache/fluss/flink/FlinkConnectorOptions.java | 18 +- .../fluss/flink/catalog/FlinkTableFactory.java | 1 + .../apache/fluss/flink/utils/FlinkConversions.java | 17 +- .../fluss/flink/catalog/FlinkCatalogITCase.java | 7 + .../fluss/flink/sink/FlinkTableSinkITCase.java | 58 +++++++ .../org/apache/fluss/server/SequenceIDCounter.java | 8 + .../fluss/server/coordinator/SchemaUpdate.java | 5 + .../java/org/apache/fluss/server/kv/KvManager.java | 13 +- .../java/org/apache/fluss/server/kv/KvTablet.java | 47 ++++-- .../server/kv/autoinc/AutoIncrementManager.java | 128 +++++++++++++++ .../server/kv/autoinc/AutoIncrementUpdater.java | 46 ++++++ .../autoinc/BoundedSegmentSequenceGenerator.java | 119 ++++++++++++++ .../kv/autoinc/PerSchemaAutoIncrementUpdater.java | 112 +++++++++++++ .../fluss/server/kv/autoinc/SequenceGenerator.java | 32 ++++ .../fluss/server/zk/ZkSequenceIDCounter.java | 25 ++- .../org/apache/fluss/server/zk/data/ZkData.java | 22 +++ .../org/apache/fluss/server/kv/KvTabletTest.java | 11 +- .../kv/autoinc/SegmentSequenceGeneratorTest.java | 182 +++++++++++++++++++++ .../kv/snapshot/KvTabletSnapshotTargetTest.java | 5 + website/docs/engine-flink/options.md | 55 ++++--- 28 files changed, 1125 insertions(+), 71 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index a65529648..c21c6a8bb 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -48,6 +48,7 @@ import org.apache.fluss.record.ChangeType; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.ProjectedRow; import org.apache.fluss.row.indexed.IndexedRow; import org.apache.fluss.types.BigIntType; import org.apache.fluss.types.DataTypes; @@ -454,6 +455,181 @@ class FlussTableITCase extends ClientToServerITCaseBase { + "because the lookup columns [b, a] must contain all bucket keys [a, b] in order."); } + @Test + void testSingleBucketPutAutoIncrementColumnAndLookup() throws Exception { + Schema schema = + Schema.newBuilder() + .column("col1", DataTypes.STRING()) + .withComment("col1 is first column") + .column("col2", DataTypes.BIGINT()) + .withComment("col2 is second column, auto increment column") + .column("col3", DataTypes.STRING()) + .withComment("col3 is third column") + .enableAutoIncrement("col2") + .primaryKey("col1") + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "col1").build(); + // create the table + TablePath tablePath = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc"); + createTable(tablePath, tableDescriptor, true); + Table autoIncTable = conn.getTable(tablePath); + Object[][] records = { + {"a", null, "batch1"}, + {"b", null, "batch1"}, + {"c", null, "batch1"}, + {"d", null, "batch1"}, + {"e", null, "batch1"}, + {"d", null, "batch2"}, + {"e", null, "batch2"} + }; + partialUpdateRecords(new String[] {"col1", "col3"}, records, autoIncTable); + + Object[][] expectedRecords = { + {"a", 1L, "batch1"}, + {"b", 2L, "batch1"}, + {"c", 3L, "batch1"}, + {"d", 4L, "batch2"}, + {"e", 5L, "batch2"} + }; + verifyRecords(expectedRecords, autoIncTable, schema); + + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "col4", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last())), + false) + .get(); + Table newSchemaTable = conn.getTable(tablePath); + Schema newSchema = newSchemaTable.getTableInfo().getSchema(); + + // schema change case1: read new data with new schema. + Object[][] expectedRecordsWithOldSchema = { + {"a", 1L, "batch1"}, + {"b", 2L, "batch1"}, + {"c", 3L, "batch1"}, + {"d", 4L, "batch2"}, + {"e", 5L, "batch2"} + }; + verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema); + + // schema change case2: update new data with new schema. + + Object[][] recordsWithNewSchema = { + {"a", null, "batch3", 10}, + {"b", null, "batch3", 11} + }; + partialUpdateRecords( + new String[] {"col1", "col3", "col4"}, recordsWithNewSchema, newSchemaTable); + + // schema change case3: read data with old schema. + expectedRecordsWithOldSchema[0][2] = "batch3"; + expectedRecordsWithOldSchema[1][2] = "batch3"; + verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema); + + // schema change case4: read data with new schema. + Object[][] expectedRecordsWithNewSchema = { + {"a", 1L, "batch3", 10}, + {"b", 2L, "batch3", 11}, + {"c", 3L, "batch1", null}, + {"d", 4L, "batch2", null}, + {"e", 5L, "batch2", null} + }; + verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema); + + // kill and restart all tablet server + for (int i = 0; i < 3; i++) { + FLUSS_CLUSTER_EXTENSION.stopTabletServer(i); + FLUSS_CLUSTER_EXTENSION.startTabletServer(i); + } + + // reconnect fluss server + conn = ConnectionFactory.createConnection(clientConf); + newSchemaTable = conn.getTable(tablePath); + verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema); + + Object[][] restartWriteRecords = {{"f", null, "batch4", 12}}; + partialUpdateRecords( + new String[] {"col1", "col3", "col4"}, restartWriteRecords, newSchemaTable); + + // The auto-increment column should start from a new segment for now, and local cached + // IDs have been discarded. + Object[][] expectedRestartWriteRecords = {{"f", 100001L, "batch4", 12}}; + verifyRecords(expectedRestartWriteRecords, newSchemaTable, newSchema); + } + + @Test + void testPutAutoIncrementColumnAndLookup() throws Exception { + Schema schema = + Schema.newBuilder() + .column("dt", DataTypes.STRING()) + .column("col1", DataTypes.STRING()) + .withComment("col1 is first column") + .column("col2", DataTypes.BIGINT()) + .withComment("col2 is second column, auto increment column") + .column("col3", DataTypes.STRING()) + .withComment("col3 is third column") + .enableAutoIncrement("col2") + .primaryKey("dt", "col1") + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .partitionedBy("dt") + .distributedBy(2, "col1") + .build(); + // create the table + TablePath tablePath = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc"); + createTable(tablePath, tableDescriptor, true); + Table autoIncTable = conn.getTable(tablePath); + Object[][] records = { + {"2026-01-06", "a", null, "batch1"}, + {"2026-01-06", "b", null, "batch1"}, + {"2026-01-06", "c", null, "batch1"}, + {"2026-01-06", "d", null, "batch1"}, + {"2026-01-07", "e", null, "batch1"}, + {"2026-01-06", "a", null, "batch2"}, + {"2026-01-06", "b", null, "batch2"}, + }; + + // upsert records with auto inc column col1 null value + partialUpdateRecords(new String[] {"dt", "col1", "col3"}, records, autoIncTable); + Object[][] expectedRecords = { + {"2026-01-06", "a", 1L, "batch2"}, + {"2026-01-06", "b", 100001L, "batch2"}, + {"2026-01-06", "c", 2L, "batch1"}, + {"2026-01-06", "d", 100002L, "batch1"}, + {"2026-01-07", "e", 200001L, "batch1"} + }; + verifyRecords(expectedRecords, autoIncTable, schema); + } + + private void partialUpdateRecords(String[] targetColumns, Object[][] records, Table table) { + UpsertWriter upsertWriter = table.newUpsert().partialUpdate(targetColumns).createWriter(); + for (Object[] record : records) { + upsertWriter.upsert(row(record)); + // flush immediately to ensure auto-increment values are assigned sequentially across + // multiple buckets. + upsertWriter.flush(); + } + } + + private void verifyRecords(Object[][] records, Table table, Schema schema) throws Exception { + Lookuper lookuper = table.newLookup().createLookuper(); + ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes()); + for (Object[] record : records) { + assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record)))) + .withSchema(schema.getRowType()) + .isEqualTo(row(record)); + } + } + @Test void testLookupForNotReadyTable() throws Exception { TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1"); diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 9f4b603e9..d3f7cd2ff 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1447,16 +1447,15 @@ public class ConfigOptions { + "For tables with FIRST_ROW, VERSIONED, or AGGREGATION merge engines, this option defaults to `ignore`. " + "Note: For AGGREGATION merge engine, when set to `allow`, delete operations will remove the entire record."); - public static final ConfigOption<String> TABLE_AUTO_INCREMENT_FIELDS = - key("table.auto-increment.fields") - .stringType() - .noDefaultValue() + public static final ConfigOption<Long> TABLE_AUTO_INCREMENT_CACHE_SIZE = + key("table.auto-increment.cache-size") + .longType() + .defaultValue(100000L) .withDescription( - "Defines the auto increment columns. " - + "The auto increment column can only be used in primary-key table." - + "With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence." - + "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT." - + "Currently a table can have only one auto-increment column."); + "The cache size of auto-increment IDs fetched from the distributed counter each time. " + + "This value determines the length of the locally cached ID segment. Default: 100000. " + + "A larger cache size may cause significant auto-increment ID gaps, especially when unused cached ID segments are discarded due to TabletServer restarts or abnormal terminations. " + + "Conversely, a smaller cache size increases the frequency of ID fetch requests to the distributed counter, introducing extra network overhead and reducing write throughput and performance."); public static final ConfigOption<ChangelogImage> TABLE_CHANGELOG_IMAGE = key("table.changelog.image") @@ -1468,7 +1467,7 @@ public class ConfigOptions { + "The supported modes are `FULL` (default) and `WAL`. " + "The `FULL` mode produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values. " + "The `WAL` mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. " - + "When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, " + + "When WAL mode is enabled, the default merge engine is used (no merge engine configured), updates are full row updates (not partial update), and there is no auto-increment column, an optimization is applied to skip looking up old values, " + "and in this case INSERT operations are converted to UPDATE_AFTER events. " + "This mode reduces storage and transmission costs but loses the ability to track previous values. " + "This option only affects primary key tables."); diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index 80d2ee8f7..86604b6be 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -140,4 +140,9 @@ public class TableConfig { public AutoPartitionStrategy getAutoPartitionStrategy() { return AutoPartitionStrategy.from(config); } + + /** Gets the number of auto-increment IDs cached per segment. */ + public Long getAutoIncrementCacheSize() { + return config.get(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/SequenceOverflowException.java b/fluss-common/src/main/java/org/apache/fluss/exception/SequenceOverflowException.java new file mode 100644 index 000000000..4a54cc0b8 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/SequenceOverflowException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** + * Exception for increment number overflow. + * + * @since 0.9 + */ +public class SequenceOverflowException extends ApiException { + public SequenceOverflowException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java index 02f7a5750..094b4601e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java @@ -36,11 +36,12 @@ public enum ChangelogImage { /** * WAL mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if - * allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge - * engine configured) and full row updates (not partial update), an optimization is applied to - * skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER - * events, similar to database WAL (Write-Ahead Log) behavior. This mode reduces storage and - * transmission costs but loses the ability to track previous values. + * allowed) records are emitted. When WAL mode is enabled, the default merge engine (no merge + * engine configured) us used, updates are full row (not partial update), and there is no + * auto-increment column, an optimization is applied to skip looking up old values, and in this + * case INSERT operations are converted to UPDATE_AFTER events, similar to database WAL + * (Write-Ahead Log) behavior. This mode reduces storage and transmission costs but loses the + * ability to track previous values. */ WAL; diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 21a3a3c93..4c1f1828d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -134,6 +134,18 @@ public final class Schema implements Serializable { .orElseGet(() -> new int[0]); } + /** Returns the auto-increment columnIds, if any, otherwise returns an empty array. */ + public int[] getAutoIncrementColumnIds() { + if (autoIncrementColumnNames.isEmpty()) { + return new int[0]; + } else { + return getColumns().stream() + .filter(column -> autoIncrementColumnNames.contains(column.getName())) + .mapToInt(Column::getColumnId) + .toArray(); + } + } + /** Returns the primary key column names, if any, otherwise returns an empty array. */ public List<String> getPrimaryKeyColumnNames() { return getPrimaryKey().map(PrimaryKey::getColumnNames).orElse(Collections.emptyList()); @@ -176,6 +188,11 @@ public final class Schema implements Serializable { return columnNames; } + /** Returns the column name in given column index. */ + public String getColumnName(int columnIndex) { + return columns.get(columnIndex).columnName; + } + /** Returns the indexes of the fields in the schema. */ public int[] getColumnIndexes(List<String> keyNames) { int[] keyIndexes = new int[keyNames.size()]; @@ -250,6 +267,13 @@ public final class Schema implements Serializable { if (schema.primaryKey != null) { primaryKeyNamed(schema.primaryKey.constraintName, schema.primaryKey.columnNames); } + if (schema.autoIncrementColumnNames != null + && !schema.autoIncrementColumnNames.isEmpty()) { + checkState( + schema.autoIncrementColumnNames.size() == 1, + "Multiple auto increment columns are not supported yet."); + enableAutoIncrement(schema.autoIncrementColumnNames.get(0)); + } this.highestFieldId = new AtomicInteger(schema.highestFieldId); return this; } @@ -748,9 +772,7 @@ public final class Schema implements Serializable { } // primary key and auto increment column should not nullable - if ((pkSet.contains(column.getName()) - || autoIncrementColumnNames.contains(column.getName())) - && column.getDataType().isNullable()) { + if (pkSet.contains(column.getName()) && column.getDataType().isNullable()) { newColumns.add( new Column( column.getName(), diff --git a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java index 82e2fd76e..fe317b53d 100644 --- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java @@ -358,6 +358,20 @@ class TableSchemaTest { assertThat(copied.getAggFunction("max_val").get()).isEqualTo(AggFunctions.MAX()); } + @Test + void testSchemaFromSchemaPreservesAutoIncrement() { + Schema original = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("value", DataTypes.BIGINT()) + .primaryKey("id") + .enableAutoIncrement("value") + .build(); + + Schema copied = Schema.newBuilder().fromSchema(original).build(); + assertThat(copied.getAutoIncrementColumnNames()).isEqualTo(Arrays.asList("value")); + } + @Test void testListaggWithCustomDelimiter() { // Test LISTAGG with default delimiter (comma) diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java index 54c5f69ea..410be3cad 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java @@ -100,7 +100,7 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase<Schema> { "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c is third column\",\"id\":2}],\"highest_field_id\":2}"; static final String SCHEMA_JSON_4 = - "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}"; + "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}"; static final String SCHEMA_JSON_5 = "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"ROW\",\"fields\":[{\"name\":\"c\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a is first column\",\"field_id\":2},{\"name\":\"d\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a is first column\",\"field_id\":3}]},\"comment\":\"b is second column\",\"id\":1}],\"highest_field_id\":3}"; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 9c2f7aafa..56e9f9fce 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -36,6 +36,18 @@ import static org.apache.flink.configuration.description.TextElement.text; /** Options for flink connector. */ public class FlinkConnectorOptions { + public static final ConfigOption<String> AUTO_INCREMENT_FIELDS = + ConfigOptions.key("auto-increment.fields") + .stringType() + .noDefaultValue() + .withDescription( + "Defines the auto increment columns. " + + "The auto increment column can only be used in primary-key table." + + "With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence." + + "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT." + + "Currently a table can have only one auto-increment column." + + "Adding an auto increment column to an existing table is not supported."); + public static final ConfigOption<Integer> BUCKET_NUMBER = ConfigOptions.key("bucket.num") .intType() @@ -144,7 +156,11 @@ public class FlinkConnectorOptions { // -------------------------------------------------------------------------------------------- public static final List<String> ALTER_DISALLOW_OPTIONS = - Arrays.asList(BUCKET_NUMBER.key(), BUCKET_KEY.key(), BOOTSTRAP_SERVERS.key()); + Arrays.asList( + AUTO_INCREMENT_FIELDS.key(), + BUCKET_NUMBER.key(), + BUCKET_KEY.key(), + BOOTSTRAP_SERVERS.key()); // ------------------------------------------------------------------------------------------- // Only used internally to support materialized table diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index aba27735c..a52852e44 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -206,6 +206,7 @@ public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTabl HashSet<ConfigOption<?>> options = new HashSet<>( Arrays.asList( + FlinkConnectorOptions.AUTO_INCREMENT_FIELDS, FlinkConnectorOptions.BUCKET_KEY, FlinkConnectorOptions.BUCKET_NUMBER, FlinkConnectorOptions.SCAN_STARTUP_MODE, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index 9aef6c931..5fc721c40 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -67,8 +67,8 @@ import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes; import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; -import static org.apache.fluss.config.ConfigOptions.TABLE_AUTO_INCREMENT_FIELDS; import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; +import static org.apache.fluss.flink.FlinkConnectorOptions.AUTO_INCREMENT_FIELDS; import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY; import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER; import static org.apache.fluss.flink.FlinkConnectorOptions.MATERIALIZED_TABLE_DEFINITION_QUERY; @@ -209,17 +209,18 @@ public class FlinkConversions { .withComment(column.getComment().orElse(null)); }); - // convert some flink options to fluss table configs. - Map<String, String> storageProperties = - convertFlinkOptionsToFlussTableProperties(flinkTableConf); - - if (storageProperties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) { + // Configure auto-increment columns based on the 'auto-increment.fields' option. + if (flinkTableConf.containsKey(AUTO_INCREMENT_FIELDS.key())) { for (String autoIncrementColumn : - storageProperties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) { - schemBuilder.enableAutoIncrement(autoIncrementColumn); + flinkTableConf.get(AUTO_INCREMENT_FIELDS).split(",")) { + schemBuilder.enableAutoIncrement(autoIncrementColumn.trim()); } } + // convert some flink options to fluss table configs. + Map<String, String> storageProperties = + convertFlinkOptionsToFlussTableProperties(flinkTableConf); + // serialize computed column and watermark spec to custom properties Map<String, String> customProperties = extractCustomProperties(flinkTableConf, storageProperties); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index e544bfd1e..f42ba30ae 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -290,6 +290,13 @@ abstract class FlinkCatalogITCase { .isInstanceOf(InvalidConfigException.class) .hasMessage( "Property 'paimon.file.format' is not supported to alter which is for datalake table."); + + String unSupportedDml7 = + "alter table test_alter_table_append_only set ('auto-increment.fields' = 'b')"; + assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml7)) + .rootCause() + .isInstanceOf(CatalogException.class) + .hasMessage("The option 'auto-increment.fields' is not supported to alter yet."); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java index 15e59295c..80a01b62c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java @@ -1453,4 +1453,62 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { // Collect results with timeout assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults); } + + @Test + void testWalModeWithAutoIncrement() throws Exception { + // use single parallelism to make result ordering stable + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + String tableName = "wal_mode_pk_table"; + // Create a table with WAL mode and auto increment column + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " auto_increment_id bigint," + + " amount bigint," + + " primary key (id) not enforced" + + ") with ('table.changelog.image' = 'wal', 'auto-increment.fields'='auto_increment_id')", + tableName)); + + // Insert initial data + tEnv.executeSql( + String.format( + "INSERT INTO %s (id, amount) VALUES " + + "(1, 100), " + + "(2, 200), " + + "(3, 150), " + + "(4, 250)", + tableName)) + .await(); + + // Use batch mode to update and delete records + + // Upsert data, not support update/delete rows in table with auto-inc column for now. + // TODO: Support Batch Update + tEnv.executeSql( + String.format( + "INSERT INTO %s (id, amount) VALUES " + "(1, 120), " + "(3, 180)", + tableName)) + .await(); + + List<String> expectedResults = + Arrays.asList( + "+I[1, 1, 100]", + "+I[2, 2, 200]", + "+I[3, 3, 150]", + "+I[4, 4, 250]", + "-U[1, 1, 100]", + "+U[1, 1, 120]", + "-U[3, 3, 150]", + "+U[3, 3, 180]"); + + // Collect results with timeout + assertQueryResultExactOrder( + tEnv, + String.format( + "SELECT id, auto_increment_id, amount FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */", + tableName), + expectedResults); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java index 7145d9d7f..a5d3ff025 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java @@ -26,4 +26,12 @@ public interface SequenceIDCounter { * @return The previous sequence ID */ long getAndIncrement() throws Exception; + + /** + * Atomically adds the given delta to the sequence ID. + * + * @param delta The delta to add + * @return The previous sequence ID + */ + long getAndAdd(Long delta) throws Exception; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index eff79605a..e3527cb88 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -47,12 +47,14 @@ public class SchemaUpdate { private final AtomicInteger highestFieldId; private final List<String> primaryKeys; private final Map<String, Schema.Column> existedColumns; + private final List<String> autoIncrementColumns; public SchemaUpdate(TableInfo tableInfo) { this.columns = new ArrayList<>(); this.existedColumns = new HashMap<>(); this.highestFieldId = new AtomicInteger(tableInfo.getSchema().getHighestFieldId()); this.primaryKeys = tableInfo.getPrimaryKeys(); + this.autoIncrementColumns = tableInfo.getSchema().getAutoIncrementColumnNames(); this.columns.addAll(tableInfo.getSchema().getColumns()); for (Schema.Column column : columns) { existedColumns.put(column.getName(), column); @@ -67,6 +69,9 @@ public class SchemaUpdate { if (!primaryKeys.isEmpty()) { builder.primaryKey(primaryKeys); } + for (String autoIncrementColumn : autoIncrementColumns) { + builder.enableAutoIncrement(autoIncrementColumn); + } return builder.build(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 1637af39d..8d99b461e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -36,6 +36,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.TabletManagerBase; +import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.LogTablet; @@ -248,6 +249,10 @@ public final class KvManager extends TabletManagerBase implements ServerReconfig File tabletDir = getOrCreateTabletDir(tablePath, tableBucket); RowMerger merger = RowMerger.create(tableConfig, kvFormat, schemaGetter); + AutoIncrementManager autoIncrementManager = + new AutoIncrementManager( + schemaGetter, tablePath.getTablePath(), tableConfig, zkClient); + KvTablet tablet = KvTablet.create( tablePath, @@ -263,7 +268,8 @@ public final class KvManager extends TabletManagerBase implements ServerReconfig arrowCompressionInfo, schemaGetter, tableConfig.getChangelogImage(), - sharedRocksDBRateLimiter); + sharedRocksDBRateLimiter, + autoIncrementManager); currentKvs.put(tableBucket, tablet); LOG.info( @@ -358,6 +364,8 @@ public final class KvManager extends TabletManagerBase implements ServerReconfig TableConfig tableConfig = tableInfo.getTableConfig(); RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat(), schemaGetter); + AutoIncrementManager autoIncrementManager = + new AutoIncrementManager(schemaGetter, tablePath, tableConfig, zkClient); KvTablet kvTablet = KvTablet.create( physicalTablePath, @@ -373,7 +381,8 @@ public final class KvManager extends TabletManagerBase implements ServerReconfig tableConfig.getArrowCompressionInfo(), schemaGetter, tableConfig.getChangelogImage(), - sharedRocksDBRateLimiter); + sharedRocksDBRateLimiter, + autoIncrementManager); if (this.currentKvs.containsKey(tableBucket)) { throw new IllegalStateException( String.format( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index cfa691e66..79ab8eb21 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -45,6 +45,8 @@ import org.apache.fluss.row.PaddingRow; import org.apache.fluss.row.arrow.ArrowWriterPool; import org.apache.fluss.row.arrow.ArrowWriterProvider; import org.apache.fluss.row.encode.ValueDecoder; +import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; +import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason; import org.apache.fluss.server.kv.rocksdb.RocksDBKv; @@ -114,6 +116,7 @@ public final class KvTablet { // defines how to merge rows on the same primary key private final RowMerger rowMerger; private final ArrowCompressionInfo arrowCompressionInfo; + private final AutoIncrementManager autoIncrementManager; private final SchemaGetter schemaGetter; @@ -148,7 +151,8 @@ public final class KvTablet { ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, ChangelogImage changelogImage, - @Nullable RocksDBStatistics rocksDBStatistics) { + @Nullable RocksDBStatistics rocksDBStatistics, + AutoIncrementManager autoIncrementManager) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logTablet = logTablet; @@ -166,6 +170,7 @@ public final class KvTablet { this.schemaGetter = schemaGetter; this.changelogImage = changelogImage; this.rocksDBStatistics = rocksDBStatistics; + this.autoIncrementManager = autoIncrementManager; } public static KvTablet create( @@ -182,7 +187,8 @@ public final class KvTablet { ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, ChangelogImage changelogImage, - RateLimiter sharedRateLimiter) + RateLimiter sharedRateLimiter, + AutoIncrementManager autoIncrementManager) throws IOException { RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter); @@ -214,7 +220,8 @@ public final class KvTablet { arrowCompressionInfo, schemaGetter, changelogImage, - rocksDBStatistics); + rocksDBStatistics, + autoIncrementManager); } private static RocksDBKv buildRocksDBKv( @@ -302,6 +309,8 @@ public final class KvTablet { RowMerger currentMerger = rowMerger.configureTargetColumns( targetColumns, latestSchemaId, latestSchema); + AutoIncrementUpdater currentAutoIncrementUpdater = + autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId); RowType latestRowType = latestSchema.getRowType(); WalBuilder walBuilder = createWalBuilder(latestSchemaId, latestRowType); @@ -318,6 +327,7 @@ public final class KvTablet { kvRecords, kvRecords.schemaId(), currentMerger, + currentAutoIncrementUpdater, walBuilder, latestSchemaRow, logEndOffsetOfPrevBatch); @@ -371,6 +381,7 @@ public final class KvTablet { KvRecordBatch kvRecords, short schemaIdOfNewData, RowMerger currentMerger, + AutoIncrementUpdater autoIncrementUpdater, WalBuilder walBuilder, PaddingRow latestSchemaRow, long startLogOffset) @@ -403,6 +414,7 @@ public final class KvTablet { key, currentValue, currentMerger, + autoIncrementUpdater, valueDecoder, walBuilder, latestSchemaRow, @@ -452,22 +464,31 @@ public final class KvTablet { KvPreWriteBuffer.Key key, BinaryValue currentValue, RowMerger currentMerger, + AutoIncrementUpdater autoIncrementUpdater, ValueDecoder valueDecoder, WalBuilder walBuilder, PaddingRow latestSchemaRow, long logOffset) throws Exception { - // Optimization: when using WAL mode and merger is DefaultRowMerger (full update, not - // partial update), we can skip fetching old value for better performance since it - // always returns new value. In this case, both INSERT and UPDATE will produce - // UPDATE_AFTER. - if (changelogImage == ChangelogImage.WAL && currentMerger instanceof DefaultRowMerger) { + // Optimization: IN WAL mode,when using DefaultRowMerger (full update, not partial update) + // and there is no auto-increment column, we can skip fetching old value for better + // performance since the result always reflects the new value. In this case, both INSERT and + // UPDATE will produce UPDATE_AFTER. + if (changelogImage == ChangelogImage.WAL + && !autoIncrementUpdater.hasAutoIncrement() + && currentMerger instanceof DefaultRowMerger) { return applyUpdate(key, null, currentValue, walBuilder, latestSchemaRow, logOffset); } byte[] oldValueBytes = getFromBufferOrKv(key); if (oldValueBytes == null) { - return applyInsert(key, currentValue, walBuilder, latestSchemaRow, logOffset); + return applyInsert( + key, + currentValue, + walBuilder, + latestSchemaRow, + logOffset, + autoIncrementUpdater); } BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes); @@ -498,10 +519,12 @@ public final class KvTablet { BinaryValue currentValue, WalBuilder walBuilder, PaddingRow latestSchemaRow, - long logOffset) + long logOffset, + AutoIncrementUpdater autoIncrementUpdater) throws Exception { - walBuilder.append(ChangeType.INSERT, latestSchemaRow.replaceRow(currentValue.row)); - kvPreWriteBuffer.put(key, currentValue.encodeValue(), logOffset); + BinaryValue newValue = autoIncrementUpdater.updateAutoIncrementColumns(currentValue); + walBuilder.append(ChangeType.INSERT, latestSchemaRow.replaceRow(newValue.row)); + kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset); return logOffset + 1; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java new file mode 100644 index 000000000..1c1d36a42 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZkSequenceIDCounter; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.types.DataTypeRoot; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.time.Duration; + +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Manages auto-increment logic for tables, providing schema-specific updaters that handle + * auto-increment column assignment during row writes. + */ +@NotThreadSafe +public class AutoIncrementManager { + // No-op implementation that returns the input unchanged. + public static final AutoIncrementUpdater NO_OP_UPDATER = rowValue -> rowValue; + + private final SchemaGetter schemaGetter; + private final Cache<Integer, AutoIncrementUpdater> autoIncrementUpdaterCache; + private final int autoIncrementColumnId; + private final SequenceGenerator sequenceGenerator; + + public AutoIncrementManager( + SchemaGetter schemaGetter, + TablePath tablePath, + TableConfig tableConf, + ZooKeeperClient zkClient) { + this.autoIncrementUpdaterCache = + Caffeine.newBuilder() + .maximumSize(5) + .expireAfterAccess(Duration.ofMinutes(5)) + .build(); + this.schemaGetter = schemaGetter; + int schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId(); + Schema schema = schemaGetter.getSchema(schemaId); + int[] autoIncrementColumnIds = schema.getAutoIncrementColumnIds(); + + checkState( + autoIncrementColumnIds.length <= 1, + "Only support one auto increment column for a table, but got %d.", + autoIncrementColumnIds.length); + + if (autoIncrementColumnIds.length == 1) { + autoIncrementColumnId = autoIncrementColumnIds[0]; + boolean requiresIntOverflowCheck = + schema.getRowType() + .getField(schema.getColumnName(autoIncrementColumnId)) + .getType() + .is(DataTypeRoot.INTEGER); + sequenceGenerator = + new BoundedSegmentSequenceGenerator( + tablePath, + schema.getColumnName(autoIncrementColumnId), + new ZkSequenceIDCounter( + zkClient.getCuratorClient(), + ZkData.AutoIncrementColumnZNode.path( + tablePath, autoIncrementColumnId)), + tableConf, + requiresIntOverflowCheck ? Integer.MAX_VALUE : Long.MAX_VALUE); + } else { + autoIncrementColumnId = -1; + sequenceGenerator = null; + } + } + + // Supports removing or reordering columns; does NOT support adding an auto-increment column to + // an existing table. + public AutoIncrementUpdater getUpdaterForSchema(KvFormat kvFormat, int latestSchemaId) { + return autoIncrementUpdaterCache.get( + latestSchemaId, k -> createAutoIncrementUpdater(kvFormat, k)); + } + + private AutoIncrementUpdater createAutoIncrementUpdater(KvFormat kvFormat, int schemaId) { + Schema schema = schemaGetter.getSchema(schemaId); + int[] autoIncrementColumnIds = schema.getAutoIncrementColumnIds(); + if (autoIncrementColumnId == -1) { + checkState( + autoIncrementColumnIds.length == 0, + "Cannot add auto-increment column after table creation."); + } else { + checkState( + autoIncrementColumnIds.length == 1 + && autoIncrementColumnIds[0] == autoIncrementColumnId, + "Auto-increment column cannot be changed after table creation."); + } + if (autoIncrementColumnIds.length == 1) { + return new PerSchemaAutoIncrementUpdater( + kvFormat, + (short) schemaId, + schema, + autoIncrementColumnIds[0], + sequenceGenerator); + } else { + return NO_OP_UPDATER; + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java new file mode 100644 index 000000000..4451cc0f0 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementUpdater.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.record.BinaryValue; + +/** A updater to auto increment column . */ +public interface AutoIncrementUpdater { + + /** + * Updates the auto-increment column in the given row by replacing its value with a new sequence + * number. + * + * <p>This method may return a new {@link BinaryValue} instance or the same instance if no + * update is needed (e.g., in a no-op implementation). + * + * @param rowValue the input row in binary form, must not be {@code null} + * @return a {@link BinaryValue} representing the updated row; never {@code null} + */ + BinaryValue updateAutoIncrementColumns(BinaryValue rowValue); + + /** + * Returns whether this updater actually performs auto-increment logic. + * + * @return {@code true} if auto-increment is active; {@code false} otherwise. + */ + default boolean hasAutoIncrement() { + return false; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java new file mode 100644 index 000000000..f47831ae4 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.SequenceOverflowException; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.SequenceIDCounter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +/** Segment ID generator, fetch ID with a batch size. */ +@NotThreadSafe +public class BoundedSegmentSequenceGenerator implements SequenceGenerator { + private static final Logger LOG = + LoggerFactory.getLogger(BoundedSegmentSequenceGenerator.class); + + private final SequenceIDCounter sequenceIDCounter; + private final TablePath tablePath; + private final String columnName; + private final long cacheSize; + private final long maxAllowedValue; + + private AutoIncIdSegment segment; + + public BoundedSegmentSequenceGenerator( + TablePath tablePath, + String columnName, + SequenceIDCounter sequenceIDCounter, + TableConfig tableConf, + long maxAllowedValue) { + this.cacheSize = tableConf.getAutoIncrementCacheSize(); + this.columnName = columnName; + this.tablePath = tablePath; + this.sequenceIDCounter = sequenceIDCounter; + this.segment = AutoIncIdSegment.EMPTY; + this.maxAllowedValue = maxAllowedValue; + } + + private void fetchSegment() { + try { + long start = sequenceIDCounter.getAndAdd(cacheSize); + if (start >= maxAllowedValue) { + throw new SequenceOverflowException( + String.format( + "Reached maximum value of sequence \"<%s>\" (%d).", + columnName, maxAllowedValue)); + } + + long actualEnd = Math.min(start + cacheSize, maxAllowedValue - 1); + LOG.info( + "Successfully fetch auto-increment values range ({}, {}], table_path={}, column_name={}.", + start, + actualEnd, + tablePath, + columnName); + segment = new AutoIncIdSegment(start, actualEnd - start); + } catch (SequenceOverflowException sequenceOverflowException) { + throw sequenceOverflowException; + } catch (Exception e) { + throw new FlussRuntimeException( + String.format( + "Failed to fetch auto-increment values, table_path=%s, column_name=%s.", + tablePath, columnName), + e); + } + } + + @Override + public long nextVal() { + if (segment.remaining() <= 0) { + fetchSegment(); + } + return segment.tryNextVal(); + } + + private static class AutoIncIdSegment { + private static final AutoIncIdSegment EMPTY = new AutoIncIdSegment(0, 0); + private long current; + private final long end; + + public AutoIncIdSegment(long start, long length) { + this.end = start + length; + this.current = start; + } + + public long remaining() { + return end - current; + } + + public long tryNextVal() { + long id = ++current; + if (id > end) { + throw new IllegalStateException("No more IDs available in current segment."); + } + return id; + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java new file mode 100644 index 000000000..ff70f5783 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.exception.SequenceOverflowException; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.record.BinaryValue; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.RowEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeRoot; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.function.LongSupplier; + +/** + * An {@link AutoIncrementUpdater} implementation that assigns auto-increment values to a specific + * column based on a fixed schema. It is bound to a particular schema version and assumes the + * auto-increment column position remains constant within that schema. + * + * <p>This class is not thread-safe and is intended to be used within a single-threaded execution + * context. + */ +@NotThreadSafe +public class PerSchemaAutoIncrementUpdater implements AutoIncrementUpdater { + private final InternalRow.FieldGetter[] flussFieldGetters; + private final RowEncoder rowEncoder; + private final int fieldLength; + private final int targetColumnIdx; + private final LongSupplier idSupplier; + private final short schemaId; + private final String targetColumnName; + + public PerSchemaAutoIncrementUpdater( + KvFormat kvFormat, + short schemaId, + Schema schema, + int autoIncrementColumnId, + SequenceGenerator sequenceGenerator) { + DataType[] fieldDataTypes = schema.getRowType().getChildren().toArray(new DataType[0]); + + fieldLength = fieldDataTypes.length; + // getter for the fields in row + InternalRow.FieldGetter[] flussFieldGetters = new InternalRow.FieldGetter[fieldLength]; + for (int i = 0; i < fieldLength; i++) { + flussFieldGetters[i] = InternalRow.createFieldGetter(fieldDataTypes[i], i); + } + this.schemaId = schemaId; + this.targetColumnIdx = schema.getColumnIds().indexOf(autoIncrementColumnId); + this.targetColumnName = schema.getColumnName(targetColumnIdx); + if (targetColumnIdx == -1) { + throw new IllegalStateException( + String.format( + "Auto-increment column ID %d not found in schema columns: %s", + autoIncrementColumnId, schema.getColumnIds())); + } + + if (fieldDataTypes[targetColumnIdx].is(DataTypeRoot.INTEGER)) { + this.idSupplier = () -> checkedNextInt(sequenceGenerator.nextVal()); + } else { + this.idSupplier = sequenceGenerator::nextVal; + } + this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes); + this.flussFieldGetters = flussFieldGetters; + } + + private long checkedNextInt(long value) { + if (value > Integer.MAX_VALUE) { + throw new SequenceOverflowException( + String.format( + "Reached maximum value of sequence \"<%s>\" (2147483647).", + targetColumnName)); + } + return value; + } + + public BinaryValue updateAutoIncrementColumns(BinaryValue rowValue) { + rowEncoder.startNewRow(); + for (int i = 0; i < fieldLength; i++) { + if (targetColumnIdx == i) { + rowEncoder.encodeField(i, idSupplier.getAsLong()); + } else { + // use the row value + rowEncoder.encodeField(i, flussFieldGetters[i].getFieldOrNull(rowValue.row)); + } + } + return new BinaryValue(schemaId, rowEncoder.finishRow()); + } + + @Override + public boolean hasAutoIncrement() { + return true; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java new file mode 100644 index 000000000..d448ffa4c --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +/** SequenceGenerator is used to generate auto increment column ID. */ +public interface SequenceGenerator { + + /** + * Retrieves the next sequential value for the auto-increment column. + * + * <p>This method provides the next available value for auto-increment columns. + * + * @return the next sequential value of the auto-increment column + */ + long nextVal(); +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java index acbecfa63..7781340bc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java @@ -34,9 +34,11 @@ public class ZkSequenceIDCounter implements SequenceIDCounter { private static final int BASE_SLEEP_MS = 100; private static final int MAX_SLEEP_MS = 1000; + private final String sequenceIDPath; private final DistributedAtomicLong sequenceIdCounter; public ZkSequenceIDCounter(CuratorFramework curatorClient, String sequenceIDPath) { + this.sequenceIDPath = sequenceIDPath; sequenceIdCounter = new DistributedAtomicLong( curatorClient, @@ -56,7 +58,28 @@ public class ZkSequenceIDCounter implements SequenceIDCounter { if (incrementValue.succeeded()) { return incrementValue.preValue(); } else { - throw new Exception("Failed to increment sequence id counter."); + throw new Exception( + String.format( + "Failed to increment sequence id counter. ZooKeeper sequence ID path: %s.", + sequenceIDPath)); + } + } + + /** + * Atomically adds the given delta to the current sequence ID. + * + * @return The previous sequence ID + */ + @Override + public long getAndAdd(Long delta) throws Exception { + AtomicValue<Long> incrementValue = sequenceIdCounter.add(delta); + if (incrementValue.succeeded()) { + return incrementValue.preValue(); + } else { + throw new Exception( + String.format( + "Failed to increment sequence id counter. ZooKeeper sequence ID path: %s, Delta value: %d.", + sequenceIDPath, delta)); } } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 59a8644c8..467245567 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -252,6 +252,28 @@ public final class ZkData { } } + /** + * The znode for auto increment columns of a table. The znode path is: + * + * <p>/metadata/databases/[databaseName]/tables/[tableName]/auto_inc + */ + public static final class AutoIncrementColumnsZNode { + public static String path(TablePath tablePath) { + return TableZNode.path(tablePath) + "/auto_inc"; + } + } + + /** + * The znode for auto increment column. The znode path is: + * + * <p>/metadata/databases/[databaseName]/tables/[tableName]/auto_inc/col_[columnId] + */ + public static final class AutoIncrementColumnZNode { + public static String path(TablePath tablePath, int columnId) { + return AutoIncrementColumnsZNode.path(tablePath) + String.format("/col_%d", columnId); + } + } + /** * The znode used to generate a sequence unique id for a partition. The znode path is: * diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 52b1080ed..8c0a6b864 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -45,6 +45,7 @@ import org.apache.fluss.record.TestingSchemaGetter; import org.apache.fluss.record.bytesview.MultiBytesView; import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.encode.ValueEncoder; +import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; @@ -181,6 +182,13 @@ class KvTabletTest { throws Exception { TableConfig tableConf = new TableConfig(Configuration.fromMap(tableConfig)); RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, schemaGetter); + AutoIncrementManager autoIncrementManager = + new AutoIncrementManager( + schemaGetter, + tablePath.getTablePath(), + new TableConfig(new Configuration()), + null); + return KvTablet.create( tablePath, tableBucket, @@ -195,7 +203,8 @@ class KvTabletTest { DEFAULT_COMPRESSION, schemaGetter, tableConf.getChangelogImage(), - KvManager.getDefaultRateLimiter()); + KvManager.getDefaultRateLimiter(), + autoIncrementManager); } @Test diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java new file mode 100644 index 000000000..685c2d539 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.SequenceOverflowException; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.SequenceIDCounter; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test class for {@link BoundedSegmentSequenceGenerator}. */ +class SegmentSequenceGeneratorTest { + + private static final TablePath TABLE_PATH = new TablePath("test_db", "test_table"); + private static final String COLUMN_NAME = "id"; + private static final long CACHE_SIZE = 100; + + private AtomicLong snapshotIdGenerator; + private Configuration configuration; + private TableConfig tableConfig; + + @BeforeEach + void setUp() { + snapshotIdGenerator = new AtomicLong(0); + Map<String, String> map = new HashMap<>(); + map.put(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE.key(), String.valueOf(CACHE_SIZE)); + configuration = Configuration.fromMap(map); + tableConfig = new TableConfig(configuration); + } + + @Test + void testNextValBasicContinuousId() { + BoundedSegmentSequenceGenerator generator = + new BoundedSegmentSequenceGenerator( + TABLE_PATH, + COLUMN_NAME, + new TestingSnapshotIDCounter(snapshotIdGenerator), + new TableConfig(configuration), + Long.MAX_VALUE); + for (long i = 1; i <= CACHE_SIZE; i++) { + assertThat(generator.nextVal()).isEqualTo(i); + } + + for (long i = CACHE_SIZE + 1; i <= 2 * CACHE_SIZE; i++) { + assertThat(generator.nextVal()).isEqualTo(i); + } + } + + @Test + void testMultiGenerator() throws InterruptedException { + ConcurrentLinkedDeque<Long> linkedDeque = new ConcurrentLinkedDeque<>(); + List<Thread> threads = new ArrayList<>(); + + for (int i = 0; i < 20; i++) { + Thread thread = + new Thread( + () -> { + BoundedSegmentSequenceGenerator generator = + new BoundedSegmentSequenceGenerator( + new TablePath("test_db", "table1"), + COLUMN_NAME, + new TestingSnapshotIDCounter(snapshotIdGenerator), + tableConfig, + Long.MAX_VALUE); + for (int j = 0; j < 130; j++) { + linkedDeque.add(generator.nextVal()); + } + }); + threads.add(thread); + thread.start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertThat(linkedDeque.stream().mapToLong(Long::longValue).max().orElse(0)) + .isLessThanOrEqualTo(40 * CACHE_SIZE); + assertThat(linkedDeque.stream().distinct().count()).isEqualTo(130 * 20); + } + + @Test + void testFetchFailed() { + BoundedSegmentSequenceGenerator generator = + new BoundedSegmentSequenceGenerator( + new TablePath("test_db", "table1"), + COLUMN_NAME, + new TestingSnapshotIDCounter(snapshotIdGenerator, 2), + tableConfig, + Long.MAX_VALUE); + for (int j = 1; j <= CACHE_SIZE; j++) { + assertThat(generator.nextVal()).isEqualTo(j); + } + assertThatThrownBy(generator::nextVal) + .isInstanceOf(FlussRuntimeException.class) + .hasMessage( + String.format( + "Failed to fetch auto-increment values, table_path=%s, column_name=%s.", + "test_db.table1", COLUMN_NAME)); + } + + @Test + void testFetchIdOverFlow() { + BoundedSegmentSequenceGenerator generator = + new BoundedSegmentSequenceGenerator( + new TablePath("test_db", "table1"), + COLUMN_NAME, + new TestingSnapshotIDCounter(snapshotIdGenerator), + tableConfig, + CACHE_SIZE + 9); + for (int j = 1; j < CACHE_SIZE + 9; j++) { + assertThat(generator.nextVal()).isEqualTo(j); + } + assertThatThrownBy(generator::nextVal) + .isInstanceOf(SequenceOverflowException.class) + .hasMessage( + String.format( + "Reached maximum value of sequence \"<%s>\" (%d).", + COLUMN_NAME, CACHE_SIZE + 9)); + } + + private static class TestingSnapshotIDCounter implements SequenceIDCounter { + + private final AtomicLong snapshotIdGenerator; + private int fetchTime; + private final int failedTrigger; + + public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator) { + this(snapshotIdGenerator, Integer.MAX_VALUE); + } + + public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator, int failedTrigger) { + this.snapshotIdGenerator = snapshotIdGenerator; + fetchTime = 0; + this.failedTrigger = failedTrigger; + } + + @Override + public long getAndIncrement() { + return snapshotIdGenerator.getAndIncrement(); + } + + @Override + public long getAndAdd(Long delta) { + if (++fetchTime < failedTrigger) { + return snapshotIdGenerator.getAndAdd(delta); + } + throw new RuntimeException("Failed to get snapshot ID"); + } + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java index 9f9ea078d..9f26bc772 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java @@ -659,6 +659,11 @@ class KvTabletSnapshotTargetTest { public long getAndIncrement() { return snapshotIdGenerator.getAndIncrement(); } + + @Override + public long getAndAdd(Long delta) { + return snapshotIdGenerator.getAndAdd(delta); + } } private enum SnapshotFailType { diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index fb456b45e..8734396f3 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -61,33 +61,34 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) ## Storage Options -| Option | Type | Default | Description [...] -|-----------------------------------------|----------|-------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] -| bucket.num | int | The bucket number of Fluss cluster. | The number of buckets of a Fluss table. [...] -| bucket.key | String | (None) | Specific the distribution policy of the Fluss table. Data will be distributed to each bucket according to the hash value of bucket-key (It must be a subset of the primary keys excluding partition keys of the primary key table). If you specify multiple fields, delimiter is `,`. If the table has a primary key and a bucket key is not specified, the bucket key will be used as primary key(excluding th [...] -| table.log.ttl | Duration | 7 days | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. [...] -| table.auto-partition.enabled | Boolean | false | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. [...] -| table.auto-partition.key | String | (None) | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple par [...] -| table.auto-partition.time-unit | ENUM | DAY | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the parti [...] -| table.auto-partition.num-precreate | Integer | 2 | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `tab [...] -| table.auto-partition.num-retention | Integer | 7 | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7, which means that 7 partitions will [...] -| table.auto-partition.time-zone | String | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. [...] -| table.replication.factor | Integer | (None) | The replication factor for the log of the new table. When it's not set, Fluss will use the cluster's default replication factor configured by default.replication.factor. It should be a positive number and not larger than the number of tablet servers in the Fluss cluster. A value larger than the number of tablet servers in Fluss cluster will result in an error when the new table is created. [...] -| table.log.format | Enum | ARROW | The format of the log records in log store. The default value is `ARROW`. The supported formats are `ARROW`, `INDEXED` and `COMPACTED`. [...] -| table.log.arrow.compression.type | Enum | ZSTD | The compression type of the log records if the log format is set to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The default value is `ZSTD`. [...] -| table.log.arrow.compression.zstd.level | Integer | 3 | The compression level of the log records if the log format is set to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 22. The default value is 3. [...] -| table.kv.format | Enum | COMPACTED | The format of the kv records in kv store. The default value is `COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`. [...] -| table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. [...] -| table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. [...] -| table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format. Currently, supported formats are `paimon`, `iceberg`, and `lance`. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This [...] -| table.datalake.freshness | Duration | 3min | It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. If the data does not need to be as fresh, you can specify a longer targe [...] -| table.datalake.auto-compaction | Boolean | false | If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default. [...] -| table.datalake.auto-expire-snapshot | Boolean | false | If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default. [...] -| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table uses the [default merge engine(last_row)](table-design/merge-engines/default.md). It also supports two merge engines are `first_row`, `versioned` and `aggregation`. The [first_row merge engine](table-design/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](tabl [...] -| table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the `versioned` merge engine. If the merge engine is set to `versioned`, the version column must be set. [...] -| table.delete.behavior | Enum | ALLOW | Controls the behavior of delete operations on primary key tables. Three modes are supported: `ALLOW` (default for default merge engine) - allows normal delete operations; `IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects delete requests and throws explicit errors. This configuration provides system-level guarantees for some downstream pipelines (e.g., Flink Delta Joi [...] -| table.changelog.image | Enum | FULL | Defines the changelog image mode for primary key tables. This configuration is inspired by similar settings in database systems like MySQL's `binlog_row_image` and PostgreSQL's `replica identity`. Two modes are supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous val [...] - +| Option | Type | Default | Description [...] +|-----------------------------------------|----------|-------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| auto-increment.fields | String | (None) | Defines the auto increment columns. The auto increment column can only be used in primary-key table. With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence. The auto increment column can only be used in primary-key table. The data type of the auto increment column must b [...] +| bucket.num | int | The bucket number of Fluss cluster. | The number of buckets of a Fluss table. [...] +| bucket.key | String | (None) | Specific the distribution policy of the Fluss table. Data will be distributed to each bucket according to the hash value of bucket-key (It must be a subset of the primary keys excluding partition keys of the primary key table). If you specify multiple fields, delimiter is `,`. If the table has a primary key and a bucket key is not specified, the bucket key will be used as primary key(excluding th [...] +| table.log.ttl | Duration | 7 days | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. [...] +| table.auto-partition.enabled | Boolean | false | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. [...] +| table.auto-partition.key | String | (None) | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple par [...] +| table.auto-partition.time-unit | ENUM | DAY | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the parti [...] +| table.auto-partition.num-precreate | Integer | 2 | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `tab [...] +| table.auto-partition.num-retention | Integer | 7 | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7, which means that 7 partitions will [...] +| table.auto-partition.time-zone | String | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. [...] +| table.replication.factor | Integer | (None) | The replication factor for the log of the new table. When it's not set, Fluss will use the cluster's default replication factor configured by default.replication.factor. It should be a positive number and not larger than the number of tablet servers in the Fluss cluster. A value larger than the number of tablet servers in Fluss cluster will result in an error when the new table is created. [...] +| table.log.format | Enum | ARROW | The format of the log records in log store. The default value is `ARROW`. The supported formats are `ARROW`, `INDEXED` and `COMPACTED`. [...] +| table.log.arrow.compression.type | Enum | ZSTD | The compression type of the log records if the log format is set to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The default value is `ZSTD`. [...] +| table.log.arrow.compression.zstd.level | Integer | 3 | The compression level of the log records if the log format is set to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 22. The default value is 3. [...] +| table.kv.format | Enum | COMPACTED | The format of the kv records in kv store. The default value is `COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`. [...] +| table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. [...] +| table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. [...] +| table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format. Currently, supported formats are `paimon`, `iceberg`, and `lance`. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This [...] +| table.datalake.freshness | Duration | 3min | It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. If the data does not need to be as fresh, you can specify a longer targe [...] +| table.datalake.auto-compaction | Boolean | false | If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default. [...] +| table.datalake.auto-expire-snapshot | Boolean | false | If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default. [...] +| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table uses the [default merge engine(last_row)](table-design/merge-engines/default.md). It also supports two merge engines are `first_row`, `versioned` and `aggregation`. The [first_row merge engine](table-design/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](tabl [...] +| table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the `versioned` merge engine. If the merge engine is set to `versioned`, the version column must be set. [...] +| table.delete.behavior | Enum | ALLOW | Controls the behavior of delete operations on primary key tables. Three modes are supported: `ALLOW` (default for default merge engine) - allows normal delete operations; `IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects delete requests and throws explicit errors. This configuration provides system-level guarantees for some downstream pipelines (e.g., Flink Delta Joi [...] +| table.changelog.image | Enum | FULL | Defines the changelog image mode for primary key tables. This configuration is inspired by similar settings in database systems like MySQL's `binlog_row_image` and PostgreSQL's `replica identity`. Two modes are supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous val [...] +| table.auto-inc.batch-size | Long | 100000L | The batch size of auto-increment IDs fetched from the distributed counter each time. This value determines the length of the locally cached ID segment. Default: 100000. A larger batch size may cause significant auto-increment ID gaps, especially when unused cached ID segments are discarded due to TabletServer restarts or abnormal terminations. Conversely, a smaller batch size increases the freque [...] ## Read Options
