This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit be01cffa912b7ddffe8df677bd8adc4471be3390 Author: Jark Wu <[email protected]> AuthorDate: Sat Jan 17 14:37:19 2026 +0800 [kv] Improve the implementation of AUTO_INCREMENT column --- .../java/org/apache/fluss/config/TableConfig.java | 2 +- .../java/org/apache/fluss/metadata/Schema.java | 40 +++++++++--- .../org/apache/fluss/metadata/TableSchemaTest.java | 2 +- .../apache/fluss/flink/FlinkConnectorOptions.java | 5 +- .../fluss/flink/utils/FlinkConversionsTest.java | 17 ++--- .../java/org/apache/fluss/server/kv/KvManager.java | 12 +++- .../server/kv/autoinc/AutoIncrementManager.java | 34 +++------- .../autoinc/BoundedSegmentSequenceGenerator.java | 64 +++++++++---------- .../kv/autoinc/PerSchemaAutoIncrementUpdater.java | 34 ++++------ .../kv/autoinc/SequenceGeneratorFactory.java | 38 +++++++++++ .../kv/autoinc/ZkSequenceGeneratorFactory.java | 61 ++++++++++++++++++ .../org/apache/fluss/server/kv/KvTabletTest.java | 66 ++++++++++++++++++- .../kv/autoinc/SegmentSequenceGeneratorTest.java | 73 ++++++---------------- .../autoinc/TestingSequenceGeneratorFactory.java | 40 ++++++++++++ .../kv/autoinc/TestingSequenceIDCounter.java | 54 ++++++++++++++++ website/docs/engine-flink/ddl.md | 8 ++- website/docs/engine-flink/options.md | 2 +- 17 files changed, 386 insertions(+), 166 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index 86604b6be..984a1def4 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -142,7 +142,7 @@ public class TableConfig { } /** Gets the number of auto-increment IDs cached per segment. */ - public Long getAutoIncrementCacheSize() { + public long getAutoIncrementCacheSize() { return config.get(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 4c1f1828d..e2960dcc7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -100,6 +100,23 @@ public final class Schema implements Serializable { return columns; } + /** + * Gets a column by its name. + * + * @param columnName the column name + * @return the column with the given name + * @throws IllegalArgumentException if the column does not exist + */ + public Column getColumn(String columnName) { + for (Column column : columns) { + if (column.getName().equals(columnName)) { + return column; + } + } + throw new IllegalArgumentException( + String.format("Column %s does not exist in schema.", columnName)); + } + public Optional<PrimaryKey> getPrimaryKey() { return Optional.ofNullable(primaryKey); } @@ -209,13 +226,16 @@ public final class Schema implements Serializable { @Override public String toString() { - final List<Object> components = new ArrayList<>(columns); - if (primaryKey != null) { - components.add(primaryKey); - } - return components.stream() - .map(Objects::toString) - .collect(Collectors.joining(",", "(", ")")); + return "Schema{" + + "columns=" + + columns + + ", primaryKey=" + + primaryKey + + ", autoIncrementColumnNames=" + + autoIncrementColumnNames + + ", highestFieldId=" + + highestFieldId + + '}'; } @Override @@ -228,12 +248,14 @@ public final class Schema implements Serializable { } Schema schema = (Schema) o; return Objects.equals(columns, schema.columns) - && Objects.equals(primaryKey, schema.primaryKey); + && Objects.equals(autoIncrementColumnNames, schema.autoIncrementColumnNames) + && Objects.equals(primaryKey, schema.primaryKey) + && highestFieldId == schema.highestFieldId; } @Override public int hashCode() { - return Objects.hash(columns, primaryKey); + return Objects.hash(columns, primaryKey, autoIncrementColumnNames, highestFieldId); } // -------------------------------------------------------------------------------------------- diff --git a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java index fe317b53d..a5ef1fed7 100644 --- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java @@ -369,7 +369,7 @@ class TableSchemaTest { .build(); Schema copied = Schema.newBuilder().fromSchema(original).build(); - assertThat(copied.getAutoIncrementColumnNames()).isEqualTo(Arrays.asList("value")); + assertThat(copied).isEqualTo(original); } @Test diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 56e9f9fce..6e6f0921d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -43,8 +43,9 @@ public class FlinkConnectorOptions { .withDescription( "Defines the auto increment columns. " + "The auto increment column can only be used in primary-key table." - + "With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence." - + "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT." + + "With an auto increment column in the table, whenever a new row is inserted into the table, " + + "the new row will be assigned with the next available value from the auto-increment sequence." + + "The data type of the auto increment column must be INT or BIGINT." + "Currently a table can have only one auto-increment column." + "Adding an auto increment column to an existing table is not supported."); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java index ad142a6c5..b4546701e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java @@ -217,12 +217,9 @@ public class FlinkConversionsTest { TableDescriptor flussTable = FlinkConversions.toFlussTable(new ResolvedCatalogTable(flinkTable, schema)); String expectFlussTableString = - "TableDescriptor{schema=(" - + "order_id STRING NOT NULL," - + "item ROW<`item_id` STRING, `item_price` STRING, `item_details` ROW<`category` STRING, `specifications` STRING>>," - + "orig_ts TIMESTAMP(6)," - + "CONSTRAINT PK_order_id PRIMARY KEY (order_id)" - + "), comment='test comment', partitionKeys=[], " + "TableDescriptor{schema=Schema{columns=[order_id STRING NOT NULL, item ROW<`item_id` STRING, `item_price` STRING, `item_details` ROW<`category` STRING, `specifications` STRING>>, orig_ts TIMESTAMP(6)], " + + "primaryKey=CONSTRAINT PK_order_id PRIMARY KEY (order_id), " + + "autoIncrementColumnNames=[], highestFieldId=7}, comment='test comment', partitionKeys=[], " + "tableDistribution={bucketKeys=[order_id] bucketCount=null}, " + "properties={}, " + "customProperties={" @@ -424,11 +421,9 @@ public class FlinkConversionsTest { FlinkConversions.toFlussTable( new ResolvedCatalogMaterializedTable(flinkMaterializedTable, schema)); String expectFlussTableString = - "TableDescriptor{schema=(" - + "order_id STRING NOT NULL," - + "orig_ts TIMESTAMP(6)," - + "CONSTRAINT PK_order_id PRIMARY KEY (order_id)" - + "), comment='test comment', partitionKeys=[], " + "TableDescriptor{schema=Schema{columns=[order_id STRING NOT NULL, orig_ts TIMESTAMP(6)], " + + "primaryKey=CONSTRAINT PK_order_id PRIMARY KEY (order_id), " + + "autoIncrementColumnNames=[], highestFieldId=1}, comment='test comment', partitionKeys=[], " + "tableDistribution={bucketKeys=[order_id] bucketCount=null}, " + "properties={}, " + "customProperties={materialized-table.definition-query=select order_id, orig_ts from t, " diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 8d99b461e..1f0c9c268 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -37,6 +37,7 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.TabletManagerBase; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; +import org.apache.fluss.server.kv.autoinc.ZkSequenceGeneratorFactory; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.LogTablet; @@ -251,7 +252,10 @@ public final class KvManager extends TabletManagerBase implements ServerReconfig RowMerger merger = RowMerger.create(tableConfig, kvFormat, schemaGetter); AutoIncrementManager autoIncrementManager = new AutoIncrementManager( - schemaGetter, tablePath.getTablePath(), tableConfig, zkClient); + schemaGetter, + tablePath.getTablePath(), + tableConfig, + new ZkSequenceGeneratorFactory(zkClient)); KvTablet tablet = KvTablet.create( @@ -365,7 +369,11 @@ public final class KvManager extends TabletManagerBase implements ServerReconfig RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat(), schemaGetter); AutoIncrementManager autoIncrementManager = - new AutoIncrementManager(schemaGetter, tablePath, tableConfig, zkClient); + new AutoIncrementManager( + schemaGetter, + tablePath, + tableConfig, + new ZkSequenceGeneratorFactory(zkClient)); KvTablet kvTablet = KvTablet.create( physicalTablePath, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java index 1c1d36a42..971ca4fa8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncrementManager.java @@ -23,10 +23,6 @@ import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.server.zk.ZkSequenceIDCounter; -import org.apache.fluss.server.zk.ZooKeeperClient; -import org.apache.fluss.server.zk.data.ZkData; -import org.apache.fluss.types.DataTypeRoot; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -34,6 +30,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import javax.annotation.concurrent.NotThreadSafe; import java.time.Duration; +import java.util.List; import static org.apache.fluss.utils.Preconditions.checkState; @@ -55,7 +52,7 @@ public class AutoIncrementManager { SchemaGetter schemaGetter, TablePath tablePath, TableConfig tableConf, - ZooKeeperClient zkClient) { + SequenceGeneratorFactory seqGeneratorFactory) { this.autoIncrementUpdaterCache = Caffeine.newBuilder() .maximumSize(5) @@ -64,30 +61,19 @@ public class AutoIncrementManager { this.schemaGetter = schemaGetter; int schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId(); Schema schema = schemaGetter.getSchema(schemaId); - int[] autoIncrementColumnIds = schema.getAutoIncrementColumnIds(); + List<String> autoIncrementColumnNames = schema.getAutoIncrementColumnNames(); checkState( - autoIncrementColumnIds.length <= 1, + autoIncrementColumnNames.size() <= 1, "Only support one auto increment column for a table, but got %d.", - autoIncrementColumnIds.length); + autoIncrementColumnNames.size()); - if (autoIncrementColumnIds.length == 1) { - autoIncrementColumnId = autoIncrementColumnIds[0]; - boolean requiresIntOverflowCheck = - schema.getRowType() - .getField(schema.getColumnName(autoIncrementColumnId)) - .getType() - .is(DataTypeRoot.INTEGER); + if (autoIncrementColumnNames.size() == 1) { + Schema.Column autoIncrementColumn = schema.getColumn(autoIncrementColumnNames.get(0)); + autoIncrementColumnId = autoIncrementColumn.getColumnId(); sequenceGenerator = - new BoundedSegmentSequenceGenerator( - tablePath, - schema.getColumnName(autoIncrementColumnId), - new ZkSequenceIDCounter( - zkClient.getCuratorClient(), - ZkData.AutoIncrementColumnZNode.path( - tablePath, autoIncrementColumnId)), - tableConf, - requiresIntOverflowCheck ? Integer.MAX_VALUE : Long.MAX_VALUE); + seqGeneratorFactory.createSequenceGenerator( + tablePath, autoIncrementColumn, tableConf.getAutoIncrementCacheSize()); } else { autoIncrementColumnId = -1; sequenceGenerator = null; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java index f47831ae4..44afe717e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/BoundedSegmentSequenceGenerator.java @@ -18,7 +18,6 @@ package org.apache.fluss.server.kv.autoinc; -import org.apache.fluss.config.TableConfig; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.SequenceOverflowException; import org.apache.fluss.metadata.TablePath; @@ -41,40 +40,33 @@ public class BoundedSegmentSequenceGenerator implements SequenceGenerator { private final long cacheSize; private final long maxAllowedValue; - private AutoIncIdSegment segment; + private IdSegment segment; public BoundedSegmentSequenceGenerator( TablePath tablePath, String columnName, SequenceIDCounter sequenceIDCounter, - TableConfig tableConf, + long idCacheSize, long maxAllowedValue) { - this.cacheSize = tableConf.getAutoIncrementCacheSize(); + this.cacheSize = idCacheSize; this.columnName = columnName; this.tablePath = tablePath; this.sequenceIDCounter = sequenceIDCounter; - this.segment = AutoIncIdSegment.EMPTY; + this.segment = IdSegment.EMPTY; this.maxAllowedValue = maxAllowedValue; } private void fetchSegment() { try { long start = sequenceIDCounter.getAndAdd(cacheSize); - if (start >= maxAllowedValue) { - throw new SequenceOverflowException( - String.format( - "Reached maximum value of sequence \"<%s>\" (%d).", - columnName, maxAllowedValue)); - } - - long actualEnd = Math.min(start + cacheSize, maxAllowedValue - 1); + // the initial value of ZNode is 0, but we start ID from 1 + segment = new IdSegment(start + 1, start + cacheSize); LOG.info( - "Successfully fetch auto-increment values range ({}, {}], table_path={}, column_name={}.", - start, - actualEnd, + "Successfully fetch auto-increment values range [{}, {}], table_path={}, column_name={}.", + segment.current, + segment.end, tablePath, columnName); - segment = new AutoIncIdSegment(start, actualEnd - start); } catch (SequenceOverflowException sequenceOverflowException) { throw sequenceOverflowException; } catch (Exception e) { @@ -88,32 +80,36 @@ public class BoundedSegmentSequenceGenerator implements SequenceGenerator { @Override public long nextVal() { - if (segment.remaining() <= 0) { + if (!segment.hasNext()) { fetchSegment(); } - return segment.tryNextVal(); + long id = segment.nextVal(); + if (id > maxAllowedValue) { + throw new SequenceOverflowException( + String.format( + "Reached maximum value of sequence \"<%s>\" (%d).", + columnName, maxAllowedValue)); + } + return id; } - private static class AutoIncIdSegment { - private static final AutoIncIdSegment EMPTY = new AutoIncIdSegment(0, 0); - private long current; - private final long end; + private static class IdSegment { + private static final IdSegment EMPTY = new IdSegment(0, -1); + final long end; + long current; - public AutoIncIdSegment(long start, long length) { - this.end = start + length; - this.current = start; + /** ID range from min (inclusive) to max (inclusive). */ + public IdSegment(long min, long max) { + this.current = min; + this.end = max; } - public long remaining() { - return end - current; + public boolean hasNext() { + return current <= end; } - public long tryNextVal() { - long id = ++current; - if (id > end) { - throw new IllegalStateException("No more IDs available in current segment."); - } - return id; + public long nextVal() { + return current++; } } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java index ff70f5783..b3e98fb9d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncrementUpdater.java @@ -18,7 +18,6 @@ package org.apache.fluss.server.kv.autoinc; -import org.apache.fluss.exception.SequenceOverflowException; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.record.BinaryValue; @@ -29,8 +28,6 @@ import org.apache.fluss.types.DataTypeRoot; import javax.annotation.concurrent.NotThreadSafe; -import java.util.function.LongSupplier; - /** * An {@link AutoIncrementUpdater} implementation that assigns auto-increment values to a specific * column based on a fixed schema. It is bound to a particular schema version and assumes the @@ -45,9 +42,9 @@ public class PerSchemaAutoIncrementUpdater implements AutoIncrementUpdater { private final RowEncoder rowEncoder; private final int fieldLength; private final int targetColumnIdx; - private final LongSupplier idSupplier; + private final SequenceGenerator sequenceGenerator; private final short schemaId; - private final String targetColumnName; + private final boolean requireInteger; public PerSchemaAutoIncrementUpdater( KvFormat kvFormat, @@ -63,40 +60,31 @@ public class PerSchemaAutoIncrementUpdater implements AutoIncrementUpdater { for (int i = 0; i < fieldLength; i++) { flussFieldGetters[i] = InternalRow.createFieldGetter(fieldDataTypes[i], i); } + this.sequenceGenerator = sequenceGenerator; this.schemaId = schemaId; this.targetColumnIdx = schema.getColumnIds().indexOf(autoIncrementColumnId); - this.targetColumnName = schema.getColumnName(targetColumnIdx); if (targetColumnIdx == -1) { throw new IllegalStateException( String.format( "Auto-increment column ID %d not found in schema columns: %s", autoIncrementColumnId, schema.getColumnIds())); } - - if (fieldDataTypes[targetColumnIdx].is(DataTypeRoot.INTEGER)) { - this.idSupplier = () -> checkedNextInt(sequenceGenerator.nextVal()); - } else { - this.idSupplier = sequenceGenerator::nextVal; - } + this.requireInteger = fieldDataTypes[targetColumnIdx].is(DataTypeRoot.INTEGER); this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes); this.flussFieldGetters = flussFieldGetters; } - private long checkedNextInt(long value) { - if (value > Integer.MAX_VALUE) { - throw new SequenceOverflowException( - String.format( - "Reached maximum value of sequence \"<%s>\" (2147483647).", - targetColumnName)); - } - return value; - } - public BinaryValue updateAutoIncrementColumns(BinaryValue rowValue) { rowEncoder.startNewRow(); for (int i = 0; i < fieldLength; i++) { if (targetColumnIdx == i) { - rowEncoder.encodeField(i, idSupplier.getAsLong()); + long seq = sequenceGenerator.nextVal(); + // cast to integer if needed + if (requireInteger) { + rowEncoder.encodeField(i, (int) seq); + } else { + rowEncoder.encodeField(i, seq); + } } else { // use the row value rowEncoder.encodeField(i, flussFieldGetters[i].getFieldOrNull(rowValue.row)); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGeneratorFactory.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGeneratorFactory.java new file mode 100644 index 000000000..a519c3f66 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGeneratorFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TablePath; + +/** Factory interface for creating {@link SequenceGenerator} instances. */ +public interface SequenceGeneratorFactory { + + /** + * Creates a {@link SequenceGenerator} for the specified table and auto-increment column. + * + * @param tablePath the path of the table + * @param autoIncrementColumn the auto-increment column schema + * @param idCacheSize the size of ID cache for optimization + * @return a new instance of {@link SequenceGenerator} + */ + SequenceGenerator createSequenceGenerator( + TablePath tablePath, Schema.Column autoIncrementColumn, long idCacheSize); +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java new file mode 100644 index 000000000..1842f47f6 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/ZkSequenceGeneratorFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZkSequenceIDCounter; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.types.DataTypeRoot; + +/** ZooKeeper-based implementation of {@link SequenceGeneratorFactory}. */ +public class ZkSequenceGeneratorFactory implements SequenceGeneratorFactory { + + private final ZooKeeperClient zkClient; + + public ZkSequenceGeneratorFactory(ZooKeeperClient zkClient) { + this.zkClient = zkClient; + } + + @Override + public SequenceGenerator createSequenceGenerator( + TablePath tablePath, Schema.Column autoIncrementColumn, long idCacheSize) { + DataTypeRoot typeRoot = autoIncrementColumn.getDataType().getTypeRoot(); + final long maxAllowedValue; + if (typeRoot == DataTypeRoot.INTEGER) { + maxAllowedValue = Integer.MAX_VALUE; + } else if (typeRoot == DataTypeRoot.BIGINT) { + maxAllowedValue = Long.MAX_VALUE; + } else { + throw new IllegalArgumentException( + "Auto-increment column must be of type INTEGER or BIGINT"); + } + return new BoundedSegmentSequenceGenerator( + tablePath, + autoIncrementColumn.getName(), + new ZkSequenceIDCounter( + zkClient.getCuratorClient(), + ZkData.AutoIncrementColumnZNode.path( + tablePath, autoIncrementColumn.getColumnId())), + idCacheSize, + maxAllowedValue); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 8c0a6b864..34f50691a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -36,6 +36,9 @@ import org.apache.fluss.record.FileLogProjection; import org.apache.fluss.record.KvRecord; import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.KvRecordTestUtils; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.record.LogRecordBatch; +import org.apache.fluss.record.LogRecordReadContext; import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.LogTestBase; import org.apache.fluss.record.MemoryLogRecords; @@ -46,6 +49,7 @@ import org.apache.fluss.record.bytesview.MultiBytesView; import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.encode.ValueEncoder; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; +import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; @@ -61,6 +65,7 @@ import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; import org.apache.fluss.types.StringType; +import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.clock.SystemClock; import org.apache.fluss.utils.concurrent.FlussScheduler; @@ -84,6 +89,8 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; @@ -122,7 +129,7 @@ class KvTabletTest { @BeforeEach void beforeEach() { - executor = Executors.newFixedThreadPool(2); + executor = Executors.newFixedThreadPool(3); } @AfterEach @@ -187,7 +194,7 @@ class KvTabletTest { schemaGetter, tablePath.getTablePath(), new TableConfig(new Configuration()), - null); + new TestingSequenceGeneratorFactory()); return KvTablet.create( tablePath, @@ -600,6 +607,61 @@ class KvTabletTest { } } + @Test + void testAutoIncrementWithMultiThread() throws Exception { + Schema schema = + Schema.newBuilder() + .column("user_name", DataTypes.STRING()) + .column("uid", DataTypes.INT()) + .primaryKey("user_name") + .enableAutoIncrement("uid") + .build(); + initLogTabletAndKvTablet(schema, new HashMap<>()); + KvRecordTestUtils.KvRecordFactory recordFactory = + KvRecordTestUtils.KvRecordFactory.of(schema.getRowType()); + + // start threads to put records + List<Future<LogAppendInfo>> putFutures = new ArrayList<>(); + for (int i = 1; i <= 30; ) { + String k1 = "k" + i++; + String k2 = "k" + i++; + String k3 = "k" + i++; + KvRecordBatch kvRecordBatch1 = + kvRecordBatchFactory.ofRecords( + Arrays.asList( + recordFactory.ofRecord(k1.getBytes(), new Object[] {k1, null}), + recordFactory.ofRecord(k2.getBytes(), new Object[] {k2, null}), + recordFactory.ofRecord( + k3.getBytes(), new Object[] {k3, null}))); + // test concurrent putting to test thread-safety of AutoIncrementManager + putFutures.add(executor.submit(() -> kvTablet.putAsLeader(kvRecordBatch1, null))); + } + + // wait for all putting finished + for (Future<LogAppendInfo> future : putFutures) { + future.get(); + } + + LogRecords actualLogRecords = readLogRecords(logTablet, 0L, null); + LogRecordReadContext context = + LogRecordReadContext.createArrowReadContext( + schema.getRowType(), schemaId, schemaGetter); + List<Integer> actualUids = new ArrayList<>(); + for (LogRecordBatch actualNext : actualLogRecords.batches()) { + CloseableIterator<LogRecord> iterator = actualNext.records(context); + while (iterator.hasNext()) { + LogRecord record = iterator.next(); + assertThat(record.getChangeType()).isEqualTo(ChangeType.INSERT); + assertThat(record.getRow().isNullAt(1)).isFalse(); + actualUids.add(record.getRow().getInt(1)); + } + } + // create a List from 1 to 30 + List<Integer> expectedUids = + IntStream.rangeClosed(1, 30).boxed().collect(Collectors.toList()); + assertThat(actualUids).isEqualTo(expectedUids); + } + @Test void testPutAsLeaderWithOutOfOrderSequenceException() throws Exception { initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java index 685c2d539..c7c90ab6c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGeneratorTest.java @@ -18,21 +18,15 @@ package org.apache.fluss.server.kv.autoinc; -import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.config.Configuration; -import org.apache.fluss.config.TableConfig; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.SequenceOverflowException; import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.server.SequenceIDCounter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicLong; @@ -47,16 +41,10 @@ class SegmentSequenceGeneratorTest { private static final long CACHE_SIZE = 100; private AtomicLong snapshotIdGenerator; - private Configuration configuration; - private TableConfig tableConfig; @BeforeEach void setUp() { snapshotIdGenerator = new AtomicLong(0); - Map<String, String> map = new HashMap<>(); - map.put(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE.key(), String.valueOf(CACHE_SIZE)); - configuration = Configuration.fromMap(map); - tableConfig = new TableConfig(configuration); } @Test @@ -65,8 +53,8 @@ class SegmentSequenceGeneratorTest { new BoundedSegmentSequenceGenerator( TABLE_PATH, COLUMN_NAME, - new TestingSnapshotIDCounter(snapshotIdGenerator), - new TableConfig(configuration), + new TestingSequenceIDCounter(snapshotIdGenerator), + CACHE_SIZE, Long.MAX_VALUE); for (long i = 1; i <= CACHE_SIZE; i++) { assertThat(generator.nextVal()).isEqualTo(i); @@ -90,8 +78,8 @@ class SegmentSequenceGeneratorTest { new BoundedSegmentSequenceGenerator( new TablePath("test_db", "table1"), COLUMN_NAME, - new TestingSnapshotIDCounter(snapshotIdGenerator), - tableConfig, + new TestingSequenceIDCounter(snapshotIdGenerator), + CACHE_SIZE, Long.MAX_VALUE); for (int j = 0; j < 130; j++) { linkedDeque.add(generator.nextVal()); @@ -116,8 +104,8 @@ class SegmentSequenceGeneratorTest { new BoundedSegmentSequenceGenerator( new TablePath("test_db", "table1"), COLUMN_NAME, - new TestingSnapshotIDCounter(snapshotIdGenerator, 2), - tableConfig, + new TestingSequenceIDCounter(snapshotIdGenerator, 2), + CACHE_SIZE, Long.MAX_VALUE); for (int j = 1; j <= CACHE_SIZE; j++) { assertThat(generator.nextVal()).isEqualTo(j); @@ -132,51 +120,28 @@ class SegmentSequenceGeneratorTest { @Test void testFetchIdOverFlow() { + int initialValue = Integer.MAX_VALUE - 10; + snapshotIdGenerator = new AtomicLong(initialValue); BoundedSegmentSequenceGenerator generator = new BoundedSegmentSequenceGenerator( new TablePath("test_db", "table1"), COLUMN_NAME, - new TestingSnapshotIDCounter(snapshotIdGenerator), - tableConfig, - CACHE_SIZE + 9); - for (int j = 1; j < CACHE_SIZE + 9; j++) { - assertThat(generator.nextVal()).isEqualTo(j); + new TestingSequenceIDCounter(snapshotIdGenerator), + CACHE_SIZE, + Integer.MAX_VALUE); + + int lastValue = 0; + for (int j = 1; j <= 10; j++) { + lastValue = (int) generator.nextVal(); + assertThat(lastValue).isEqualTo(initialValue + j); } + assertThat(lastValue).isEqualTo(Integer.MAX_VALUE); + assertThatThrownBy(generator::nextVal) .isInstanceOf(SequenceOverflowException.class) .hasMessage( String.format( "Reached maximum value of sequence \"<%s>\" (%d).", - COLUMN_NAME, CACHE_SIZE + 9)); - } - - private static class TestingSnapshotIDCounter implements SequenceIDCounter { - - private final AtomicLong snapshotIdGenerator; - private int fetchTime; - private final int failedTrigger; - - public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator) { - this(snapshotIdGenerator, Integer.MAX_VALUE); - } - - public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator, int failedTrigger) { - this.snapshotIdGenerator = snapshotIdGenerator; - fetchTime = 0; - this.failedTrigger = failedTrigger; - } - - @Override - public long getAndIncrement() { - return snapshotIdGenerator.getAndIncrement(); - } - - @Override - public long getAndAdd(Long delta) { - if (++fetchTime < failedTrigger) { - return snapshotIdGenerator.getAndAdd(delta); - } - throw new RuntimeException("Failed to get snapshot ID"); - } + COLUMN_NAME, Integer.MAX_VALUE)); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java new file mode 100644 index 000000000..2bafcd670 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceGeneratorFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TablePath; + +import java.util.concurrent.atomic.AtomicLong; + +/** Factory for creating {@link SequenceGenerator} instances for testing. */ +public class TestingSequenceGeneratorFactory implements SequenceGeneratorFactory { + + @Override + public SequenceGenerator createSequenceGenerator( + TablePath tablePath, Schema.Column autoIncrementColumn, long idCacheSize) { + return new BoundedSegmentSequenceGenerator( + tablePath, + autoIncrementColumn.getName(), + new TestingSequenceIDCounter(new AtomicLong(0)), + idCacheSize, + Long.MAX_VALUE); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java new file mode 100644 index 000000000..72f10f5d4 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/TestingSequenceIDCounter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.server.SequenceIDCounter; + +import java.util.concurrent.atomic.AtomicLong; + +/** A testing implementation of {@link SequenceIDCounter} based Java {@link AtomicLong}. */ +public class TestingSequenceIDCounter implements SequenceIDCounter { + private final AtomicLong idGenerator; + private int fetchTime; + private final int failedTrigger; + + public TestingSequenceIDCounter(AtomicLong idGenerator) { + this(idGenerator, Integer.MAX_VALUE); + } + + public TestingSequenceIDCounter(AtomicLong idGenerator, int failedTrigger) { + this.idGenerator = idGenerator; + fetchTime = 0; + this.failedTrigger = failedTrigger; + } + + @Override + public long getAndIncrement() { + return idGenerator.getAndIncrement(); + } + + @Override + public long getAndAdd(Long delta) { + if (++fetchTime < failedTrigger) { + return idGenerator.getAndAdd(delta); + } + throw new RuntimeException("Failed to get snapshot ID"); + } +} diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md index b333a1400..40e12a9c1 100644 --- a/website/docs/engine-flink/ddl.md +++ b/website/docs/engine-flink/ddl.md @@ -236,9 +236,13 @@ Currently, this feature has the following characteristics: - **Position**: New columns are always appended to the end of the existing column list. - **Nullability**: Only nullable columns can be added to an existing table to ensure compatibility with existing data. - **Type Support**: You can add columns of any data type, including complex types such as `ROW`, `MAP`, and `ARRAY`. -- **Nested Fields**: Currently, adding fields within an existing nested `ROW` is not supported. Such operations are categorized as "updating column types" and will be supported in future versions. -You can add a single column or multiple columns using the `ALTER TABLE statement. +The following limitations currently apply but will be supported in the future: + +- **Nested Fields**: Adding fields within an existing nested `ROW` is not supported. Such operations are categorized as "updating column types" and will be supported in future versions. +- **AUTO INCREMENT**: Adding `AUTO_INCREMENT` columns by using `ALTER TABLE` is not supported; such columns must be defined when the table is created. + +You can add a single column or multiple columns using the `ALTER TABLE` statement. ```sql title="Flink SQL" -- Add a single column at the end of the table diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index 8734396f3..33cf9e68b 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -63,7 +63,7 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) | Option | Type | Default | Description [...] |-----------------------------------------|----------|-------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] -| auto-increment.fields | String | (None) | Defines the auto increment columns. The auto increment column can only be used in primary-key table. With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence. The auto increment column can only be used in primary-key table. The data type of the auto increment column must b [...] +| auto-increment.fields | String | (None) | Defines the auto increment columns. The auto increment column can only be used in primary-key table. With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence. The data type of the auto increment column must be INT or BIGINT. Currently a table can have only one auto-increme [...] | bucket.num | int | The bucket number of Fluss cluster. | The number of buckets of a Fluss table. [...] | bucket.key | String | (None) | Specific the distribution policy of the Fluss table. Data will be distributed to each bucket according to the hash value of bucket-key (It must be a subset of the primary keys excluding partition keys of the primary key table). If you specify multiple fields, delimiter is `,`. If the table has a primary key and a bucket key is not specified, the bucket key will be used as primary key(excluding th [...] | table.log.ttl | Duration | 7 days | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. [...]
