platinumhamburg commented on code in PR #2161:
URL: https://github.com/apache/fluss/pull/2161#discussion_r2667449472
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -454,6 +455,177 @@ void testInvalidPrefixLookup() throws Exception {
+ "because the lookup columns [b, a] must
contain all bucket keys [a, b] in order.");
}
+ @Test
+ void testSingleBucketPutAutoIncColumnAndLookup() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("col1", DataTypes.STRING())
+ .withComment("col1 is first column")
+ .column("col2", DataTypes.BIGINT())
+ .withComment("col2 is second column, auto increment
column")
+ .column("col3", DataTypes.STRING())
+ .withComment("col3 is third column")
+ .enableAutoIncrement("col2")
+ .primaryKey("col1")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder().schema(schema).distributedBy(1,
"col1").build();
+ // create the table
+ TablePath tablePath =
+ TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(),
"test_pk_table_auto_inc");
+ createTable(tablePath, tableDescriptor, true);
+ Table autoIncTable = conn.getTable(tablePath);
+ UpsertWriter upsertWriter =
+ autoIncTable.newUpsert().partialUpdate("col1",
"col3").createWriter();
+ Object[][] records = {
+ {"a", 0L, "batch1"},
+ {"b", 1L, "batch1"},
+ {"c", 2L, "batch1"},
+ {"d", 3L, "batch1"},
+ {"e", 4L, "batch1"}
Review Comment:
The tests are generally good, but they can be further improved by
abstracting the logic for writing partial updates to a specified schema and
performing query assertions into a common utility method to enhance
readability—this would significantly simplify the tests. Additionally, the test
data should include records with duplicate primary keys to strengthen the
effectiveness of behavioral validation.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncProcessor.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.types.DataType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** AutoIncProcessor is used to process auto increment column. */
+@NotThreadSafe
+public class AutoIncProcessor {
+ private static final AutoIncUpdater NO_OP_UPDATER = oldValue -> oldValue;
+
+ private final SchemaGetter schemaGetter;
+ private final KvFormat kvFormat;
+ private AutoIncUpdater autoIncUpdater;
+ private int schemaId;
+ private final Map<Integer, SequenceGenerator> sequenceGeneratorMap = new
HashMap<>();
+
+ public AutoIncProcessor(
+ SchemaGetter schemaGetter,
+ KvFormat kvFormat,
+ TablePath tablePath,
+ Configuration properties,
+ ZooKeeperClient zkClient) {
+ this.schemaGetter = schemaGetter;
+ this.kvFormat = kvFormat;
+ this.schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
+ Schema schema = schemaGetter.getSchema(schemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table, but
got "
+ + autoIncColumnIds.length);
+ }
+
+ for (int autoIncColumnId : autoIncColumnIds) {
+ ZkSequenceIDCounter zkSequenceIDCounter =
+ new ZkSequenceIDCounter(
+ zkClient.getCuratorClient(),
+ ZkData.AutoIncrementColumnZNode.path(tablePath,
autoIncColumnId));
+ SequenceGenerator sequenceGenerator =
+ new SegmentSequenceGenerator(
+ tablePath,
+ autoIncColumnId,
+ schema.getColumnName(autoIncColumnId),
+ zkSequenceIDCounter,
+ properties);
+ sequenceGeneratorMap.put(autoIncColumnId, sequenceGenerator);
+ }
+
+ if (autoIncColumnIds.length > 0) {
+ autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) schemaId,
+ schema,
+ autoIncColumnIds[0],
+ sequenceGeneratorMap.get(autoIncColumnIds[0]));
+ } else {
+ autoIncUpdater = NO_OP_UPDATER;
+ }
+ }
+
+ // Supports removing or reordering columns; does NOT support adding an
auto-increment column to
+ // an existing table.
+ public void configureSchema(int latestSchemaId) {
+ if (latestSchemaId != this.schemaId) {
+ Schema schema = schemaGetter.getSchema(latestSchemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
Review Comment:
From the schema implementation perspective, this is a high-overhead
operation and should not be repeatedly computed unnecessarily; at the very
least, some form of caching mechanism should be in place.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncProcessor.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.types.DataType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** AutoIncProcessor is used to process auto increment column. */
+@NotThreadSafe
+public class AutoIncProcessor {
+ private static final AutoIncUpdater NO_OP_UPDATER = oldValue -> oldValue;
+
+ private final SchemaGetter schemaGetter;
+ private final KvFormat kvFormat;
+ private AutoIncUpdater autoIncUpdater;
+ private int schemaId;
+ private final Map<Integer, SequenceGenerator> sequenceGeneratorMap = new
HashMap<>();
+
+ public AutoIncProcessor(
+ SchemaGetter schemaGetter,
+ KvFormat kvFormat,
+ TablePath tablePath,
+ Configuration properties,
+ ZooKeeperClient zkClient) {
+ this.schemaGetter = schemaGetter;
+ this.kvFormat = kvFormat;
+ this.schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
+ Schema schema = schemaGetter.getSchema(schemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table, but
got "
+ + autoIncColumnIds.length);
+ }
+
+ for (int autoIncColumnId : autoIncColumnIds) {
+ ZkSequenceIDCounter zkSequenceIDCounter =
+ new ZkSequenceIDCounter(
+ zkClient.getCuratorClient(),
+ ZkData.AutoIncrementColumnZNode.path(tablePath,
autoIncColumnId));
+ SequenceGenerator sequenceGenerator =
+ new SegmentSequenceGenerator(
+ tablePath,
+ autoIncColumnId,
+ schema.getColumnName(autoIncColumnId),
+ zkSequenceIDCounter,
+ properties);
+ sequenceGeneratorMap.put(autoIncColumnId, sequenceGenerator);
+ }
+
+ if (autoIncColumnIds.length > 0) {
+ autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) schemaId,
+ schema,
+ autoIncColumnIds[0],
+ sequenceGeneratorMap.get(autoIncColumnIds[0]));
+ } else {
+ autoIncUpdater = NO_OP_UPDATER;
+ }
+ }
+
+ // Supports removing or reordering columns; does NOT support adding an
auto-increment column to
+ // an existing table.
+ public void configureSchema(int latestSchemaId) {
+ if (latestSchemaId != this.schemaId) {
+ Schema schema = schemaGetter.getSchema(latestSchemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table,
but got "
+ + autoIncColumnIds.length);
+ } else if (autoIncColumnIds.length == 1) {
+ int autoIncColumnId = autoIncColumnIds[0];
+ if (sequenceGeneratorMap.containsKey(autoIncColumnId)) {
+ this.autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) latestSchemaId,
+ schemaGetter.getSchema(latestSchemaId),
+ autoIncColumnId,
+ sequenceGeneratorMap.get(autoIncColumnId));
+ } else {
+ throw new IllegalStateException(
+ "Not supported add auto increment column for a
table.");
Review Comment:
I believe intercepting here is unreasonable—the DDL has already succeeded,
and the client has even attempted to write data. If this needs to be
prohibited, it should be blocked on the DDL side.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -454,6 +455,177 @@ void testInvalidPrefixLookup() throws Exception {
+ "because the lookup columns [b, a] must
contain all bucket keys [a, b] in order.");
}
+ @Test
+ void testSingleBucketPutAutoIncColumnAndLookup() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("col1", DataTypes.STRING())
+ .withComment("col1 is first column")
+ .column("col2", DataTypes.BIGINT())
+ .withComment("col2 is second column, auto increment
column")
+ .column("col3", DataTypes.STRING())
+ .withComment("col3 is third column")
+ .enableAutoIncrement("col2")
+ .primaryKey("col1")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder().schema(schema).distributedBy(1,
"col1").build();
+ // create the table
+ TablePath tablePath =
+ TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(),
"test_pk_table_auto_inc");
+ createTable(tablePath, tableDescriptor, true);
+ Table autoIncTable = conn.getTable(tablePath);
+ UpsertWriter upsertWriter =
+ autoIncTable.newUpsert().partialUpdate("col1",
"col3").createWriter();
+ Object[][] records = {
+ {"a", 0L, "batch1"},
+ {"b", 1L, "batch1"},
+ {"c", 2L, "batch1"},
+ {"d", 3L, "batch1"},
+ {"e", 4L, "batch1"}
+ };
+ for (Object[] record : records) {
+ upsertWriter.upsert(row(record[0], null, record[2]));
+ }
+ upsertWriter.flush();
+
+ Lookuper lookuper = autoIncTable.newLookup().createLookuper();
+ ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
+
+ for (Object[] record : records) {
+ assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record))))
+ .withSchema(schema.getRowType())
+ .isEqualTo(row(record));
+ }
+
+ for (Object[] record : records) {
+ record[2] = "batch2";
+ upsertWriter.upsert(row(record[0], null, record[2]));
+ }
+ upsertWriter.flush();
+
+ for (Object[] record : records) {
+ assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record))))
+ .withSchema(schema.getRowType())
+ .isEqualTo(row(record));
+ }
+
+ admin.alterTable(
+ tablePath,
+ Collections.singletonList(
+ TableChange.addColumn(
+ "col4",
+ DataTypes.INT(),
+ null,
+ TableChange.ColumnPosition.last())),
+ false)
+ .get();
+ Table newSchemaTable = conn.getTable(tablePath);
+ Schema newSchema = newSchemaTable.getTableInfo().getSchema();
+ Lookuper newLookuper = newSchemaTable.newLookup().createLookuper();
+
+ Object[][] recordsWithNewSchema = {
+ {"a", 0L, "batch2", 10},
+ {"b", 1L, "batch2", 11},
+ {"c", 2L, "batch2", 12},
+ {"d", 3L, "batch2", 13},
+ {"e", 4L, "batch2", 14}
+ };
+ // schema change case1: read new data with new schema.
+ for (Object[] record : recordsWithNewSchema) {
+ assertThatRow(lookupRow(newLookuper,
keyRow.replaceRow(row(record))))
+ .withSchema(newSchema.getRowType())
+ .isEqualTo(row(record[0], record[1], record[2], null));
+ }
+
+ // schema change case2: update new data with new schema.
+ UpsertWriter newUpsertWriter =
+ newSchemaTable.newUpsert().partialUpdate("col1", "col3",
"col4").createWriter();
+ for (Object[] record : recordsWithNewSchema) {
+ record[2] = "batch3";
+ newUpsertWriter.upsert(row(record));
+ }
+ newUpsertWriter.flush();
+
+ // schema change case3: read data with old schema.
+ for (Object[] record : records) {
+ record[2] = "batch3";
+ assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record))))
+ .withSchema(schema.getRowType())
+ .isEqualTo(row(record));
+ }
+
+ // schema change case4: read data with new schema.
+ for (Object[] record : recordsWithNewSchema) {
+ assertThatRow(lookupRow(newLookuper,
keyRow.replaceRow(row(record))))
+ .withSchema(newSchema.getRowType())
+ .isEqualTo(row(record));
+ }
+ }
+
+ @Test
+ void testPutAutoIncColumnAndLookup() throws Exception {
Review Comment:
This test doesn't appear to include any additional behavioral verification;
it could potentially be merged with the previous test
(testSingleBucketPutAutoIncColumnAndLookup).
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncProcessor.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.types.DataType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** AutoIncProcessor is used to process auto increment column. */
+@NotThreadSafe
+public class AutoIncProcessor {
+ private static final AutoIncUpdater NO_OP_UPDATER = oldValue -> oldValue;
+
+ private final SchemaGetter schemaGetter;
+ private final KvFormat kvFormat;
+ private AutoIncUpdater autoIncUpdater;
+ private int schemaId;
+ private final Map<Integer, SequenceGenerator> sequenceGeneratorMap = new
HashMap<>();
+
+ public AutoIncProcessor(
+ SchemaGetter schemaGetter,
+ KvFormat kvFormat,
+ TablePath tablePath,
+ Configuration properties,
+ ZooKeeperClient zkClient) {
+ this.schemaGetter = schemaGetter;
+ this.kvFormat = kvFormat;
+ this.schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
+ Schema schema = schemaGetter.getSchema(schemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table, but
got "
+ + autoIncColumnIds.length);
+ }
+
+ for (int autoIncColumnId : autoIncColumnIds) {
+ ZkSequenceIDCounter zkSequenceIDCounter =
+ new ZkSequenceIDCounter(
+ zkClient.getCuratorClient(),
+ ZkData.AutoIncrementColumnZNode.path(tablePath,
autoIncColumnId));
+ SequenceGenerator sequenceGenerator =
+ new SegmentSequenceGenerator(
+ tablePath,
+ autoIncColumnId,
+ schema.getColumnName(autoIncColumnId),
+ zkSequenceIDCounter,
+ properties);
+ sequenceGeneratorMap.put(autoIncColumnId, sequenceGenerator);
+ }
+
+ if (autoIncColumnIds.length > 0) {
+ autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) schemaId,
+ schema,
+ autoIncColumnIds[0],
+ sequenceGeneratorMap.get(autoIncColumnIds[0]));
+ } else {
+ autoIncUpdater = NO_OP_UPDATER;
+ }
+ }
+
+ // Supports removing or reordering columns; does NOT support adding an
auto-increment column to
+ // an existing table.
+ public void configureSchema(int latestSchemaId) {
+ if (latestSchemaId != this.schemaId) {
+ Schema schema = schemaGetter.getSchema(latestSchemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table,
but got "
+ + autoIncColumnIds.length);
+ } else if (autoIncColumnIds.length == 1) {
+ int autoIncColumnId = autoIncColumnIds[0];
+ if (sequenceGeneratorMap.containsKey(autoIncColumnId)) {
+ this.autoIncUpdater =
+ new DefaultAutoIncUpdater(
Review Comment:
It seems that different versions of DefaultAutoIncUpdater can be managed
using a cache.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncProcessor.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.types.DataType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** AutoIncProcessor is used to process auto increment column. */
+@NotThreadSafe
+public class AutoIncProcessor {
+ private static final AutoIncUpdater NO_OP_UPDATER = oldValue -> oldValue;
+
+ private final SchemaGetter schemaGetter;
+ private final KvFormat kvFormat;
+ private AutoIncUpdater autoIncUpdater;
+ private int schemaId;
+ private final Map<Integer, SequenceGenerator> sequenceGeneratorMap = new
HashMap<>();
+
+ public AutoIncProcessor(
+ SchemaGetter schemaGetter,
+ KvFormat kvFormat,
+ TablePath tablePath,
+ Configuration properties,
+ ZooKeeperClient zkClient) {
+ this.schemaGetter = schemaGetter;
+ this.kvFormat = kvFormat;
+ this.schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
+ Schema schema = schemaGetter.getSchema(schemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table, but
got "
+ + autoIncColumnIds.length);
+ }
+
+ for (int autoIncColumnId : autoIncColumnIds) {
+ ZkSequenceIDCounter zkSequenceIDCounter =
+ new ZkSequenceIDCounter(
+ zkClient.getCuratorClient(),
+ ZkData.AutoIncrementColumnZNode.path(tablePath,
autoIncColumnId));
+ SequenceGenerator sequenceGenerator =
+ new SegmentSequenceGenerator(
+ tablePath,
+ autoIncColumnId,
+ schema.getColumnName(autoIncColumnId),
+ zkSequenceIDCounter,
+ properties);
+ sequenceGeneratorMap.put(autoIncColumnId, sequenceGenerator);
+ }
+
+ if (autoIncColumnIds.length > 0) {
+ autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) schemaId,
+ schema,
+ autoIncColumnIds[0],
+ sequenceGeneratorMap.get(autoIncColumnIds[0]));
+ } else {
+ autoIncUpdater = NO_OP_UPDATER;
+ }
+ }
+
+ // Supports removing or reordering columns; does NOT support adding an
auto-increment column to
+ // an existing table.
+ public void configureSchema(int latestSchemaId) {
+ if (latestSchemaId != this.schemaId) {
+ Schema schema = schemaGetter.getSchema(latestSchemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table,
but got "
+ + autoIncColumnIds.length);
+ } else if (autoIncColumnIds.length == 1) {
+ int autoIncColumnId = autoIncColumnIds[0];
+ if (sequenceGeneratorMap.containsKey(autoIncColumnId)) {
+ this.autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) latestSchemaId,
+ schemaGetter.getSchema(latestSchemaId),
+ autoIncColumnId,
+ sequenceGeneratorMap.get(autoIncColumnId));
+ } else {
+ throw new IllegalStateException(
+ "Not supported add auto increment column for a
table.");
Review Comment:
Alternatively, the validation/assertion here may be acceptable, but the
error message is inappropriate—please verify whether the DDL-side interception
is comprehensive.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncProcessor.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.types.DataType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** AutoIncProcessor is used to process auto increment column. */
+@NotThreadSafe
+public class AutoIncProcessor {
+ private static final AutoIncUpdater NO_OP_UPDATER = oldValue -> oldValue;
+
+ private final SchemaGetter schemaGetter;
+ private final KvFormat kvFormat;
+ private AutoIncUpdater autoIncUpdater;
+ private int schemaId;
+ private final Map<Integer, SequenceGenerator> sequenceGeneratorMap = new
HashMap<>();
+
+ public AutoIncProcessor(
+ SchemaGetter schemaGetter,
+ KvFormat kvFormat,
+ TablePath tablePath,
+ Configuration properties,
+ ZooKeeperClient zkClient) {
+ this.schemaGetter = schemaGetter;
+ this.kvFormat = kvFormat;
+ this.schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
+ Schema schema = schemaGetter.getSchema(schemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table, but
got "
+ + autoIncColumnIds.length);
+ }
+
+ for (int autoIncColumnId : autoIncColumnIds) {
+ ZkSequenceIDCounter zkSequenceIDCounter =
+ new ZkSequenceIDCounter(
+ zkClient.getCuratorClient(),
+ ZkData.AutoIncrementColumnZNode.path(tablePath,
autoIncColumnId));
+ SequenceGenerator sequenceGenerator =
+ new SegmentSequenceGenerator(
+ tablePath,
+ autoIncColumnId,
+ schema.getColumnName(autoIncColumnId),
+ zkSequenceIDCounter,
+ properties);
+ sequenceGeneratorMap.put(autoIncColumnId, sequenceGenerator);
+ }
+
+ if (autoIncColumnIds.length > 0) {
+ autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) schemaId,
+ schema,
+ autoIncColumnIds[0],
+ sequenceGeneratorMap.get(autoIncColumnIds[0]));
+ } else {
+ autoIncUpdater = NO_OP_UPDATER;
+ }
+ }
+
+ // Supports removing or reordering columns; does NOT support adding an
auto-increment column to
+ // an existing table.
+ public void configureSchema(int latestSchemaId) {
+ if (latestSchemaId != this.schemaId) {
Review Comment:
The condition here should be inverted with an early return to avoid deep
indentation of a large code block, which would otherwise degrade readability.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussFailServerTableITCase.java:
##########
@@ -147,6 +153,64 @@ void testLogScan() throws Exception {
}
}
+ @Test
+ void testPutAutoIncColumnAndLookup() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("col1", DataTypes.STRING())
+ .withComment("col1 is first column")
+ .column("col2", DataTypes.BIGINT())
+ .withComment("col2 is second column, auto increment
column")
+ .column("col3", DataTypes.STRING())
+ .withComment("col3 is third column")
+ .enableAutoIncrement("col2")
+ .primaryKey("col1")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder().schema(schema).distributedBy(2,
"col1").build();
+ createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false);
+ Object[][] records = {
+ {"a", 0L, "batch1"},
+ {"b", 100000L, "batch1"},
+ {"c", 1L, "batch1"},
+ {"d", 100001L, "batch1"}
+ };
+
+ try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) {
+ UpsertWriter upsertWriter =
+ table.newUpsert().partialUpdate("col1",
"col3").createWriter();
+ for (Object[] record : records) {
+ upsertWriter.upsert(row(record[0], null, record[2])).get();
+ }
+
+ Lookuper lookuper = table.newLookup().createLookuper();
+ ProjectedRow keyRow =
ProjectedRow.from(schema.getPrimaryKeyIndexes());
+ for (Object[] record : records) {
+ assertThatRow(lookupRow(lookuper,
keyRow.replaceRow(row(record))))
+ .withSchema(schema.getRowType())
+ .isEqualTo(row(record));
+ }
+
+ // kill and restart all tablet server
+ for (int i = 0; i < 3; i++) {
+ FLUSS_CLUSTER_EXTENSION.stopTabletServer(i);
+ FLUSS_CLUSTER_EXTENSION.startTabletServer(i);
+ }
+
+ for (Object[] record : records) {
+ assertThatRow(lookupRow(lookuper,
keyRow.replaceRow(row(record))))
+ .withSchema(schema.getRowType())
+ .isEqualTo(row(record));
+ }
+ upsertWriter.upsert(row("e", null, "batch2")).get();
+ // The auto-increment column should start from a new segment for
now, and local cached
+ // IDs have been discarded.
+ assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("e", null,
null))))
Review Comment:
This is a good test addition; however, there's no need to create a new test
class. The scenario can be simply incorporated into the existing tests in
FlussTableITCase.java.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncProcessor.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.types.DataType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** AutoIncProcessor is used to process auto increment column. */
+@NotThreadSafe
+public class AutoIncProcessor {
+ private static final AutoIncUpdater NO_OP_UPDATER = oldValue -> oldValue;
+
+ private final SchemaGetter schemaGetter;
+ private final KvFormat kvFormat;
+ private AutoIncUpdater autoIncUpdater;
+ private int schemaId;
+ private final Map<Integer, SequenceGenerator> sequenceGeneratorMap = new
HashMap<>();
+
+ public AutoIncProcessor(
+ SchemaGetter schemaGetter,
+ KvFormat kvFormat,
+ TablePath tablePath,
+ Configuration properties,
+ ZooKeeperClient zkClient) {
+ this.schemaGetter = schemaGetter;
+ this.kvFormat = kvFormat;
+ this.schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
+ Schema schema = schemaGetter.getSchema(schemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table, but
got "
+ + autoIncColumnIds.length);
+ }
+
+ for (int autoIncColumnId : autoIncColumnIds) {
+ ZkSequenceIDCounter zkSequenceIDCounter =
+ new ZkSequenceIDCounter(
+ zkClient.getCuratorClient(),
+ ZkData.AutoIncrementColumnZNode.path(tablePath,
autoIncColumnId));
+ SequenceGenerator sequenceGenerator =
+ new SegmentSequenceGenerator(
+ tablePath,
+ autoIncColumnId,
+ schema.getColumnName(autoIncColumnId),
+ zkSequenceIDCounter,
+ properties);
+ sequenceGeneratorMap.put(autoIncColumnId, sequenceGenerator);
+ }
+
+ if (autoIncColumnIds.length > 0) {
+ autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) schemaId,
+ schema,
+ autoIncColumnIds[0],
+ sequenceGeneratorMap.get(autoIncColumnIds[0]));
+ } else {
+ autoIncUpdater = NO_OP_UPDATER;
+ }
+ }
+
+ // Supports removing or reordering columns; does NOT support adding an
auto-increment column to
+ // an existing table.
+ public void configureSchema(int latestSchemaId) {
+ if (latestSchemaId != this.schemaId) {
+ Schema schema = schemaGetter.getSchema(latestSchemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table,
but got "
+ + autoIncColumnIds.length);
+ } else if (autoIncColumnIds.length == 1) {
+ int autoIncColumnId = autoIncColumnIds[0];
+ if (sequenceGeneratorMap.containsKey(autoIncColumnId)) {
+ this.autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) latestSchemaId,
+ schemaGetter.getSchema(latestSchemaId),
+ autoIncColumnId,
+ sequenceGeneratorMap.get(autoIncColumnId));
+ } else {
+ throw new IllegalStateException(
+ "Not supported add auto increment column for a
table.");
+ }
+ } else {
+ this.autoIncUpdater = NO_OP_UPDATER;
+ this.sequenceGeneratorMap.clear();
+ }
+ this.schemaId = latestSchemaId;
+ }
+ }
+
+ /**
+ * Process auto increment for a given old value.
+ *
+ * @param oldValue the old value
+ * @return the new value with auto incremented column value
+ */
+ public BinaryValue processAutoInc(BinaryValue oldValue) {
+ return autoIncUpdater.updateAutoInc(oldValue);
+ }
+
+ public boolean isNoOpUpdate() {
+ return autoIncUpdater.equals(NO_OP_UPDATER);
+ }
+
+ private interface AutoIncUpdater {
+ BinaryValue updateAutoInc(BinaryValue oldValue);
+ }
+
+ /** Default auto increment column updater. */
+ @NotThreadSafe
+ private static class DefaultAutoIncUpdater implements
AutoIncProcessor.AutoIncUpdater {
+ private final InternalRow.FieldGetter[] flussFieldGetters;
+ private final RowEncoder rowEncoder;
+ private final DataType[] fieldDataTypes;
+ private final int targetColumnIdx;
+ private final SequenceGenerator idGenerator;
+ private final short schemaId;
+
+ public DefaultAutoIncUpdater(
+ KvFormat kvFormat,
+ short schemaId,
+ Schema schema,
+ int autoIncColumnId,
+ SequenceGenerator sequenceGenerator) {
+ DataType[] fieldDataTypes =
schema.getRowType().getChildren().toArray(new DataType[0]);
+
+ // getter for the fields in row
+ InternalRow.FieldGetter[] flussFieldGetters =
+ new InternalRow.FieldGetter[fieldDataTypes.length];
+ for (int i = 0; i < fieldDataTypes.length; i++) {
+ flussFieldGetters[i] =
InternalRow.createFieldGetter(fieldDataTypes[i], i);
+ }
+ this.idGenerator = sequenceGenerator;
+ this.schemaId = schemaId;
+ this.targetColumnIdx =
schema.getColumnIds().indexOf(autoIncColumnId);
+ if (targetColumnIdx == -1) {
+ throw new IllegalStateException(
+ String.format(
+ "Auto-increment column ID %d not found in
schema columns: %s",
+ autoIncColumnId, schema.getColumnIds()));
+ }
+ this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
+ this.fieldDataTypes = fieldDataTypes;
+ this.flussFieldGetters = flussFieldGetters;
+ }
+
+ public BinaryValue updateAutoInc(BinaryValue oldValue) {
+ rowEncoder.startNewRow();
+ for (int i = 0; i < fieldDataTypes.length; i++) {
+ if (targetColumnIdx == i) {
+ rowEncoder.encodeField(i, idGenerator.nextVal());
+ } else {
+ // use the old row value
+ if (oldValue == null) {
+ rowEncoder.encodeField(i, null);
+ } else {
+ rowEncoder.encodeField(
+ i,
flussFieldGetters[i].getFieldOrNull(oldValue.row));
+ }
Review Comment:
Could oldValue here be null? I don't think that's possible—this doesn't
appear to be a scenario that RowMerger would encounter.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncProcessor.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.server.zk.ZkSequenceIDCounter;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.types.DataType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** AutoIncProcessor is used to process auto increment column. */
+@NotThreadSafe
+public class AutoIncProcessor {
+ private static final AutoIncUpdater NO_OP_UPDATER = oldValue -> oldValue;
+
+ private final SchemaGetter schemaGetter;
+ private final KvFormat kvFormat;
+ private AutoIncUpdater autoIncUpdater;
+ private int schemaId;
+ private final Map<Integer, SequenceGenerator> sequenceGeneratorMap = new
HashMap<>();
+
+ public AutoIncProcessor(
+ SchemaGetter schemaGetter,
+ KvFormat kvFormat,
+ TablePath tablePath,
+ Configuration properties,
+ ZooKeeperClient zkClient) {
+ this.schemaGetter = schemaGetter;
+ this.kvFormat = kvFormat;
+ this.schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId();
+ Schema schema = schemaGetter.getSchema(schemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table, but
got "
+ + autoIncColumnIds.length);
+ }
+
+ for (int autoIncColumnId : autoIncColumnIds) {
+ ZkSequenceIDCounter zkSequenceIDCounter =
+ new ZkSequenceIDCounter(
+ zkClient.getCuratorClient(),
+ ZkData.AutoIncrementColumnZNode.path(tablePath,
autoIncColumnId));
+ SequenceGenerator sequenceGenerator =
+ new SegmentSequenceGenerator(
+ tablePath,
+ autoIncColumnId,
+ schema.getColumnName(autoIncColumnId),
+ zkSequenceIDCounter,
+ properties);
+ sequenceGeneratorMap.put(autoIncColumnId, sequenceGenerator);
+ }
+
+ if (autoIncColumnIds.length > 0) {
+ autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) schemaId,
+ schema,
+ autoIncColumnIds[0],
+ sequenceGeneratorMap.get(autoIncColumnIds[0]));
+ } else {
+ autoIncUpdater = NO_OP_UPDATER;
+ }
+ }
+
+ // Supports removing or reordering columns; does NOT support adding an
auto-increment column to
+ // an existing table.
+ public void configureSchema(int latestSchemaId) {
+ if (latestSchemaId != this.schemaId) {
+ Schema schema = schemaGetter.getSchema(latestSchemaId);
+ int[] autoIncColumnIds = schema.getAutoIncColumnIds();
+ if (autoIncColumnIds.length > 1) {
+ throw new IllegalStateException(
+ "Only support one auto increment column for a table,
but got "
+ + autoIncColumnIds.length);
+ } else if (autoIncColumnIds.length == 1) {
+ int autoIncColumnId = autoIncColumnIds[0];
+ if (sequenceGeneratorMap.containsKey(autoIncColumnId)) {
+ this.autoIncUpdater =
+ new DefaultAutoIncUpdater(
+ kvFormat,
+ (short) latestSchemaId,
+ schemaGetter.getSchema(latestSchemaId),
+ autoIncColumnId,
+ sequenceGeneratorMap.get(autoIncColumnId));
+ } else {
+ throw new IllegalStateException(
+ "Not supported add auto increment column for a
table.");
+ }
+ } else {
+ this.autoIncUpdater = NO_OP_UPDATER;
+ this.sequenceGeneratorMap.clear();
+ }
+ this.schemaId = latestSchemaId;
+ }
+ }
+
+ /**
+ * Process auto increment for a given old value.
+ *
+ * @param oldValue the old value
+ * @return the new value with auto incremented column value
+ */
+ public BinaryValue processAutoInc(BinaryValue oldValue) {
+ return autoIncUpdater.updateAutoInc(oldValue);
+ }
+
+ public boolean isNoOpUpdate() {
+ return autoIncUpdater.equals(NO_OP_UPDATER);
+ }
+
+ private interface AutoIncUpdater {
+ BinaryValue updateAutoInc(BinaryValue oldValue);
+ }
+
+ /** Default auto increment column updater. */
+ @NotThreadSafe
+ private static class DefaultAutoIncUpdater implements
AutoIncProcessor.AutoIncUpdater {
+ private final InternalRow.FieldGetter[] flussFieldGetters;
+ private final RowEncoder rowEncoder;
+ private final DataType[] fieldDataTypes;
+ private final int targetColumnIdx;
+ private final SequenceGenerator idGenerator;
+ private final short schemaId;
+
+ public DefaultAutoIncUpdater(
+ KvFormat kvFormat,
+ short schemaId,
+ Schema schema,
+ int autoIncColumnId,
+ SequenceGenerator sequenceGenerator) {
+ DataType[] fieldDataTypes =
schema.getRowType().getChildren().toArray(new DataType[0]);
+
+ // getter for the fields in row
+ InternalRow.FieldGetter[] flussFieldGetters =
+ new InternalRow.FieldGetter[fieldDataTypes.length];
+ for (int i = 0; i < fieldDataTypes.length; i++) {
+ flussFieldGetters[i] =
InternalRow.createFieldGetter(fieldDataTypes[i], i);
+ }
+ this.idGenerator = sequenceGenerator;
+ this.schemaId = schemaId;
+ this.targetColumnIdx =
schema.getColumnIds().indexOf(autoIncColumnId);
+ if (targetColumnIdx == -1) {
+ throw new IllegalStateException(
+ String.format(
+ "Auto-increment column ID %d not found in
schema columns: %s",
+ autoIncColumnId, schema.getColumnIds()));
+ }
+ this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
+ this.fieldDataTypes = fieldDataTypes;
+ this.flussFieldGetters = flussFieldGetters;
+ }
+
+ public BinaryValue updateAutoInc(BinaryValue oldValue) {
+ rowEncoder.startNewRow();
+ for (int i = 0; i < fieldDataTypes.length; i++) {
+ if (targetColumnIdx == i) {
+ rowEncoder.encodeField(i, idGenerator.nextVal());
+ } else {
+ // use the old row value
+ if (oldValue == null) {
+ rowEncoder.encodeField(i, null);
+ } else {
+ rowEncoder.encodeField(
+ i,
flussFieldGetters[i].getFieldOrNull(oldValue.row));
+ }
Review Comment:
Here, oldValue actually corresponds to newValue in RowMerger.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]