platinumhamburg commented on code in PR #2161:
URL: https://github.com/apache/fluss/pull/2161#discussion_r2650235464
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:
##########
@@ -252,6 +252,30 @@ public static String path() {
}
}
+ /**
+ * The znode for auto increment columns of a table. The znode path is:
+ *
+ * <p>/metadata/databases/[databaseName]/tables/[tableName]/auto_inc
+ */
+ public static final class AutoIncrementColumnsZNode {
+ public static String path(TablePath tablePath) {
+ return TableZNode.path(tablePath) + "/auto_inc";
+ }
+ }
+
+ /**
+ * The znode for auto increment column. The znode path is:
+ *
+ *
<p>/metadata/databases/[databaseName]/tables/[tableName]/auto_inc/col_[columnIdx]
+ */
+ public static final class AutoIncrementColumnZNode {
Review Comment:
Schema evolution (adding a column) is already supported; It would be better
to use column IDs instead of column indices here.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncProcessor.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+
+/** AutoIncProcessor is used to process auto increment column. */
+public interface AutoIncProcessor {
+
+ /**
+ * Process auto increment column.
+ *
+ * @param originalValue the original value.
+ * @return the processed value.
+ */
+ BinaryValue processAutoInc(BinaryValue originalValue);
+
+ static AutoIncProcessor create(
+ TablePath tablePath,
+ int schemaId,
+ Configuration properties,
+ TableConfig tableConf,
+ Schema schema,
Review Comment:
Use SchemaGetter instead of Schema.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java:
##########
@@ -59,4 +59,19 @@ public long getAndIncrement() throws Exception {
throw new Exception("Failed to increment sequence id counter.");
}
}
+
+ /**
+ * Atomically adds the given delta to the current sequence ID.
+ *
+ * @return The previous sequence ID
+ */
+ @Override
+ public long getAndAdd(Long delta) throws Exception {
+ AtomicValue<Long> incrementValue = sequenceIdCounter.add(delta);
+ if (incrementValue.succeeded()) {
+ return incrementValue.preValue();
+ } else {
+ throw new Exception("Failed to add sequence id counter.");
Review Comment:
Perhaps we could make the exception information clearer—for example, by
including the sequenceIDPath.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/IncIDGenerator.java:
##########
@@ -16,24 +16,15 @@
* limitations under the License.
*/
-package org.apache.fluss.row.columnar;
+package org.apache.fluss.server.kv.autoinc;
-import org.apache.fluss.annotation.Internal;
-import org.apache.fluss.row.InternalRow;
-
-/**
- * Row {@link ColumnVector}.
- *
- * @since 0.9
- */
-@Internal
-public interface RowColumnVector extends ColumnVector {
+/** IncIDGenerator is used to generate auto increment column ID. */
+public interface IncIDGenerator {
/**
- * Return row value.
+ * Returns the next auto increment column ID.
Review Comment:
Consider renaming this method, as "column ID" is misleading and conflicts
with the actual ID of a Column object, which can cause confusion. Moreover,
while UID is one use case for auto-increment columns, it does not encompass all
possible scenarios, so naming it with "ID" is not advisable.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncProcessor.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+
+/** AutoIncProcessor is used to process auto increment column. */
+public interface AutoIncProcessor {
+
+ /**
+ * Process auto increment column.
+ *
+ * @param originalValue the original value.
+ * @return the processed value.
+ */
+ BinaryValue processAutoInc(BinaryValue originalValue);
+
+ static AutoIncProcessor create(
+ TablePath tablePath,
+ int schemaId,
+ Configuration properties,
+ TableConfig tableConf,
+ Schema schema,
+ ZooKeeperClient zkClient) {
+ int[] autoIncColumnIndexes = schema.getAutoIncColumnIndexes();
Review Comment:
Use column IDs instead of column indices.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SegmentIncIDGenerator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.SequenceIDCounter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/** Segment ID generator, fetch ID with a batch size. */
+@NotThreadSafe
+public class SegmentIncIDGenerator implements IncIDGenerator {
+ private static final Logger LOG =
LoggerFactory.getLogger(SegmentIncIDGenerator.class);
+
+ private final SequenceIDCounter sequenceIDCounter;
+ private final TablePath tablePath;
+ private final int columnIdx;
+ private final String columnName;
+
+ private AutoIncIdSegment segment = new AutoIncIdSegment(0, 0);
+
+ private final long batchSize;
+
+ public SegmentIncIDGenerator(
+ TablePath tablePath,
+ int columnIdx,
+ String columnName,
+ SequenceIDCounter sequenceIDCounter,
+ Configuration properties) {
+ batchSize =
properties.getLong(ConfigOptions.TABLE_AUTO_INC_BATCH_SIZE);
+ this.columnName = columnName;
+ this.tablePath = tablePath;
+ this.columnIdx = columnIdx;
+ this.sequenceIDCounter = sequenceIDCounter;
+ }
+
+ private void fetchSegment() {
+ try {
+ long start = sequenceIDCounter.getAndAdd(batchSize);
+ LOG.info(
+ "Successfully fetch auto-increment values range [{}, {}),
table_path={}, column_idx={}, column_name={}.",
+ start,
+ start + batchSize,
+ tablePath,
+ columnIdx,
+ columnName);
+ segment = new AutoIncIdSegment(start, batchSize);
+ } catch (Exception e) {
+ throw new FlussRuntimeException(
+ String.format(
+ "Failed to fetch auto-increment values,
table_path=%s, column_idx=%d, column_name=%s.",
+ tablePath, columnIdx, columnName),
+ e);
+ }
+ }
+
+ @Override
+ public long nextVal() {
+ if (segment.remaining() <= 0) {
+ fetchSegment();
+ }
+ return segment.tryNextVal();
+ }
+
+ private static class AutoIncIdSegment {
+ private long current;
+ private final long end;
+
+ public AutoIncIdSegment(long start, long length) {
+ this.end = start + length;
+ this.current = start;
+ }
+
+ public long remaining() {
Review Comment:
This part is a bit confusing to me—the outer check already handles negative
values, so the Math.max here seems like an unnecessary overhead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]