Copilot commented on code in PR #2322:
URL: https://github.com/apache/fluss/pull/2322#discussion_r2667687154
##########
fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java:
##########
@@ -137,10 +143,90 @@ protected String[] expectedJsons() {
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
"{\"type\":\"ARRAY\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
-
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
+
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\"},\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}",
+
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"
Review Comment:
The JSON string for the MAP type has the same syntax error as line 146. The
closing brace is misplaced, creating invalid JSON. This should be:
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}"
```suggestion
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}",
"{\"type\":\"ROW\",\"nullable\":false\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"
```
##########
fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java:
##########
@@ -301,16 +301,50 @@ public static MapType MAP(DataType keyType, DataType
valueType) {
return new MapType(keyType, valueType);
}
- /** Field definition with field name and data type. */
public static DataField FIELD(String name, DataType type) {
return new DataField(name, type);
}
- /** Field definition with field name, data type, and a description. */
+ /**
+ * Creates a field definition with field name, data type, and field ID.
+ *
+ * @param name the field name
+ * @param type the field data type
+ * @param fieldId the field ID for schema evolution
+ * @return a new data field without description
+ */
+ public static DataField FIELD(String name, DataType type, int fieldId) {
+ return new DataField(name, type, fieldId);
+ }
+
+ /**
+ * Creates a field definition with field name, data type, and a
description.
+ *
+ * @param name the field name
+ * @param type the field data type
+ * @param description the field description
+ * @return a new data field with description but with default field ID (0)
+ * @deprecated Use {@link #FIELD(String, DataType, String, int)} instead
to explicitly specify
+ * field ID. Field ID is required for schema evolution support.
+ */
+ @Deprecated
public static DataField FIELD(String name, DataType type, String
description) {
return new DataField(name, type, description);
Review Comment:
The deprecation JavaDoc states that the method returns "a new data field
with description but with default field ID (0)", but looking at the
implementation, the method actually calls the constructor with fieldId=-1, not
0. The documentation should be corrected to state "default field ID (-1)" for
accuracy.
##########
fluss-common/src/main/java/org/apache/fluss/types/RowType.java:
##########
@@ -172,23 +171,102 @@ public int hashCode() {
return Objects.hash(super.hashCode(), fields);
}
+ /**
+ * Compares this RowType with another RowType, ignoring field IDs.
+ *
+ * @param other the other RowType to compare with
+ * @return true if the RowTypes are equal ignoring field IDs, false
otherwise
+ */
+ public boolean equalsIgnoreFieldId(RowType other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null) {
+ return false;
+ }
+ if (this.isNullable() != other.isNullable()) {
+ return false;
+ }
+ if (this.fields.size() != other.fields.size()) {
+ return false;
+ }
+ for (int i = 0; i < fields.size(); i++) {
+ DataField thisField = fields.get(i);
+ DataField otherField = other.fields.get(i);
+ if (!thisField.getName().equals(otherField.getName())) {
+ return false;
+ }
+ if (!thisField.getType().equals(otherField.getType())) {
Review Comment:
The equalsIgnoreFieldId method doesn't recursively handle nested RowTypes.
When comparing field types on line 199, it uses equals() which will include
field IDs in the comparison for nested RowTypes. For truly ignoring field IDs
in nested structures, the comparison should recursively call
equalsIgnoreFieldId when the field type is a RowType. This is particularly
important since this PR is about adding field_id support for nested rows.
```suggestion
DataType thisType = thisField.getType();
DataType otherType = otherField.getType();
if (thisType instanceof RowType && otherType instanceof RowType)
{
if (!((RowType) thisType).equalsIgnoreFieldId((RowType)
otherType)) {
return false;
}
} else if (!thisType.equals(otherType)) {
```
##########
fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.types;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Reassign field id by given field id. */
Review Comment:
The class-level comment "Reassign field id by given field id." is unclear
and grammatically awkward. Consider revising to something more descriptive like
"Visitor that reassigns field IDs in nested data types using a provided
counter." or "Recursively reassigns field IDs to nested data type structures."
```suggestion
/** Visitor that recursively reassigns field IDs in nested data types using
a provided counter. */
```
##########
fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java:
##########
@@ -137,10 +143,90 @@ protected String[] expectedJsons() {
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
"{\"type\":\"ARRAY\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
-
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
+
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\"},\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}",
+
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"
};
}
+
+ @Test
+ void testCompatibilityFromJsonLackOfFieldId() {
+ String[] jsonLackOfFieldId =
+ new String[] {
+ "{\"type\":\"BOOLEAN\"}",
+ "{\"type\":\"BOOLEAN\",\"nullable\":false}",
+ "{\"type\":\"TINYINT\"}",
+ "{\"type\":\"TINYINT\",\"nullable\":false}",
+ "{\"type\":\"SMALLINT\"}",
+ "{\"type\":\"SMALLINT\",\"nullable\":false}",
+ "{\"type\":\"INTEGER\"}",
+ "{\"type\":\"INTEGER\",\"nullable\":false}",
+ "{\"type\":\"BIGINT\"}",
+ "{\"type\":\"BIGINT\",\"nullable\":false}",
+ "{\"type\":\"FLOAT\"}",
+ "{\"type\":\"FLOAT\",\"nullable\":false}",
+ "{\"type\":\"DOUBLE\"}",
+ "{\"type\":\"DOUBLE\",\"nullable\":false}",
+ "{\"type\":\"DECIMAL\",\"precision\":10,\"scale\":0}",
+
"{\"type\":\"DECIMAL\",\"nullable\":false,\"precision\":10,\"scale\":0}",
+ "{\"type\":\"DECIMAL\",\"precision\":15,\"scale\":5}",
+
"{\"type\":\"DECIMAL\",\"nullable\":false,\"precision\":15,\"scale\":5}",
+ "{\"type\":\"CHAR\",\"length\":1}",
+ "{\"type\":\"CHAR\",\"nullable\":false,\"length\":1}",
+ "{\"type\":\"CHAR\",\"length\":5}",
+ "{\"type\":\"CHAR\",\"nullable\":false,\"length\":5}",
+ "{\"type\":\"STRING\"}",
+ "{\"type\":\"STRING\",\"nullable\":false}",
+ "{\"type\":\"BINARY\",\"length\":1}",
+ "{\"type\":\"BINARY\",\"nullable\":false,\"length\":1}",
+ "{\"type\":\"BINARY\",\"length\":100}",
+ "{\"type\":\"BINARY\",\"nullable\":false,\"length\":100}",
+ "{\"type\":\"BYTES\"}",
+ "{\"type\":\"BYTES\",\"nullable\":false}",
+ "{\"type\":\"DATE\"}",
+ "{\"type\":\"DATE\",\"nullable\":false}",
+ "{\"type\":\"TIME_WITHOUT_TIME_ZONE\",\"precision\":0}",
+
"{\"type\":\"TIME_WITHOUT_TIME_ZONE\",\"nullable\":false,\"precision\":0}",
+ "{\"type\":\"TIME_WITHOUT_TIME_ZONE\",\"precision\":3}",
+
"{\"type\":\"TIME_WITHOUT_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
+
"{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6}",
+
"{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"nullable\":false,\"precision\":6}",
+
"{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":3}",
+
"{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
+
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"precision\":6}",
+
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":6}",
+
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"precision\":3}",
+
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
+
"{\"type\":\"ARRAY\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\"},\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
+
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
Review Comment:
The JSON string for the MAP type compatibility test has a syntax error - the
closing brace after "BIGINT" is misplaced. This should be:
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}"
```suggestion
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
"{\"type\":\"ROW\",\"nullable\":false\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
```
##########
fluss-common/src/main/java/org/apache/fluss/types/RowType.java:
##########
@@ -172,23 +171,102 @@ public int hashCode() {
return Objects.hash(super.hashCode(), fields);
}
+ /**
+ * Compares this RowType with another RowType, ignoring field IDs.
+ *
+ * @param other the other RowType to compare with
+ * @return true if the RowTypes are equal ignoring field IDs, false
otherwise
+ */
+ public boolean equalsIgnoreFieldId(RowType other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null) {
+ return false;
+ }
+ if (this.isNullable() != other.isNullable()) {
+ return false;
+ }
+ if (this.fields.size() != other.fields.size()) {
+ return false;
+ }
+ for (int i = 0; i < fields.size(); i++) {
+ DataField thisField = fields.get(i);
+ DataField otherField = other.fields.get(i);
+ if (!thisField.getName().equals(otherField.getName())) {
+ return false;
+ }
+ if (!thisField.getType().equals(otherField.getType())) {
+ return false;
+ }
+ if (!Objects.equals(thisField.getDescription(),
otherField.getDescription())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
//
--------------------------------------------------------------------------------------------
- private static void validateFields(List<DataField> fields) {
+ private static List<DataField> validateFields(List<DataField> fields) {
+ // Validate field names
final List<String> fieldNames =
fields.stream().map(DataField::getName).collect(Collectors.toList());
if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly))
{
throw new IllegalArgumentException(
"Field names must contain at least one non-whitespace
character.");
}
- final Set<String> duplicates =
+ final Set<String> duplicateNames =
fieldNames.stream()
.filter(n -> Collections.frequency(fieldNames, n) > 1)
.collect(Collectors.toSet());
- if (!duplicates.isEmpty()) {
+ if (!duplicateNames.isEmpty()) {
throw new IllegalArgumentException(
- String.format("Field names must be unique. Found
duplicates: %s", duplicates));
+ String.format(
+ "Field names must be unique. Found duplicates:
%s", duplicateNames));
}
+
+ // Validate and process field IDs
+ final List<Integer> fieldIds =
+
fields.stream().map(DataField::getFieldId).collect(Collectors.toList());
+ long negativeIdCount = fieldIds.stream().filter(id -> id ==
-1).count();
+
+ List<DataField> processedFields;
+ if (negativeIdCount == fields.size()) {
+ // All fields have ID -1, assign IDs in order
+ processedFields = new ArrayList<>();
+ for (int i = 0; i < fields.size(); i++) {
+ DataField field = fields.get(i);
+ processedFields.add(
+ new DataField(
+ field.getName(),
+ field.getType(),
+ field.getDescription().orElse(null),
+ i));
+ }
+ } else if (negativeIdCount > 0) {
+ // Some fields have ID -1 and some don't, throw error
+ throw new IllegalArgumentException(
+ "Field IDs must be either all -1 or all non-negative. "
+ + "Mixed field IDs are not allowed.");
+ } else {
+ // All fields have non-negative IDs
+ if (fieldIds.stream().anyMatch(id -> id < 0)) {
+ throw new IllegalArgumentException("Field ID must not be
negative.");
+ }
Review Comment:
The condition on line 254 is redundant. At this point in the code, we
already know that negativeIdCount is 0 (from the else branch at line 252),
which means no field has ID -1. Therefore, checking if any field has ID < 0 on
line 254 is unnecessary since -1 is the only negative value that should appear
in field IDs based on the logic. This check will never be true and can be
removed.
```suggestion
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java:
##########
@@ -592,4 +597,72 @@ void testExceptionsForComplexTypesUsage() {
"Bucket key column 'info' has unsupported data type
ROW<`name` STRING, `age` INT>. "
+ "Currently, bucket key column does not
support types: [ARRAY, MAP, ROW].");
}
+
+ @Test
+ void testProjectionAndAddColumnInLogTable() throws Exception {
+ tEnv.executeSql(
+ "create table row_log_test ("
+ + "id int, "
+ + "simple_row row<a int, b string>, "
+ + "nested_row row<x int, y row<z int, w string>, v
string>, "
+ + "array_of_rows array<row<a int, b string>>"
+ + ") with ('bucket.num' = '3')");
+
+ tEnv.executeSql(
+ "INSERT INTO row_log_test VALUES "
+ + "(1, ROW(10, 'hello'), ROW(20, ROW(30,
'nested'), 'row1'), "
+ + "ARRAY[ROW(1, 'a'), ROW(2, 'b')]), "
+ + "(2, ROW(40, 'world'), ROW(50, ROW(60,
'test'), 'row2'), "
+ + "ARRAY[ROW(3, 'c')])")
+ .await();
+
+ CloseableIterator<Row> rowIter = tEnv.executeSql("select * from
row_log_test").collect();
+ List<String> expectedRows =
+ Arrays.asList(
+ "+I[1, +I[10, hello], +I[20, +I[30, nested], row1],
[+I[1, a], +I[2, b]]]",
+ "+I[2, +I[40, world], +I[50, +I[60, test], row2],
[+I[3, c]]]");
+ assertResultsIgnoreOrder(rowIter, expectedRows, true);
+
+ // Currently, flink not supported push down nested row projection
because
+ // FlinkTableSource.supportsNestedProjection returns false.
+ // Todo: supported nested row projection down in
https://github.com/apache/fluss/issues/2311
Review Comment:
Typo: "supported" should be "support". The comment should read "Todo:
support nested row projection down in..." or "Todo: add support for nested row
projection pushdown in..."
```suggestion
// Todo: support nested row projection pushdown in
https://github.com/apache/fluss/issues/2311
```
##########
fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java:
##########
@@ -310,20 +314,82 @@ public Builder fromColumns(List<Column> inputColumns) {
return this;
}
+ public Builder highestFieldId(int highestFieldId) {
+ this.highestFieldId = new AtomicInteger(highestFieldId);
+ return this;
+ }
+
+ /**
+ * Adopts the field names and data types from the given {@link
RowType} as physical columns
+ * of the schema.
+ *
+ * <p>This method internally calls {@link #column(String, DataType)}
for each field, which
+ * means: The original field IDs in the RowType will be ignored and
replaced with new ones.
+ * If you need to preserve existing field IDs, use {@link
#fromColumns(List)} or {@link
+ * #fromSchema(Schema)} instead.
+ *
+ * @param rowType the row type to adopt fields from
+ * @return this builder for fluent API
+ */
+ public Builder fromRowType(RowType rowType) {
+ checkNotNull(rowType, "rowType must not be null.");
+ final List<DataType> fieldDataTypes = rowType.getChildren();
+ final List<String> fieldNames = rowType.getFieldNames();
+ IntStream.range(0, fieldDataTypes.size())
+ .forEach(i -> column(fieldNames.get(i),
fieldDataTypes.get(i)));
+ return this;
+ }
+
+ /**
+ * Adopts the given field names and field data types as physical
columns of the schema.
+ *
+ * <p>This method internally calls {@link #column(String, DataType)}
for each field, which
+ *
+ * <p>This method internally calls {@link #column(String, DataType)}
for each field, which
Review Comment:
The JavaDoc has a duplicated line. The sentence "This method internally
calls {@link #column(String, DataType)} for each field, which" appears twice on
lines 346 and 348. One of these duplicate lines should be removed.
```suggestion
```
##########
fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java:
##########
@@ -137,10 +143,90 @@ protected String[] expectedJsons() {
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
"{\"type\":\"ARRAY\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
-
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
+
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\"},\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}",
+
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"
};
}
+
+ @Test
+ void testCompatibilityFromJsonLackOfFieldId() {
+ String[] jsonLackOfFieldId =
+ new String[] {
+ "{\"type\":\"BOOLEAN\"}",
+ "{\"type\":\"BOOLEAN\",\"nullable\":false}",
+ "{\"type\":\"TINYINT\"}",
+ "{\"type\":\"TINYINT\",\"nullable\":false}",
+ "{\"type\":\"SMALLINT\"}",
+ "{\"type\":\"SMALLINT\",\"nullable\":false}",
+ "{\"type\":\"INTEGER\"}",
+ "{\"type\":\"INTEGER\",\"nullable\":false}",
+ "{\"type\":\"BIGINT\"}",
+ "{\"type\":\"BIGINT\",\"nullable\":false}",
+ "{\"type\":\"FLOAT\"}",
+ "{\"type\":\"FLOAT\",\"nullable\":false}",
+ "{\"type\":\"DOUBLE\"}",
+ "{\"type\":\"DOUBLE\",\"nullable\":false}",
+ "{\"type\":\"DECIMAL\",\"precision\":10,\"scale\":0}",
+
"{\"type\":\"DECIMAL\",\"nullable\":false,\"precision\":10,\"scale\":0}",
+ "{\"type\":\"DECIMAL\",\"precision\":15,\"scale\":5}",
+
"{\"type\":\"DECIMAL\",\"nullable\":false,\"precision\":15,\"scale\":5}",
+ "{\"type\":\"CHAR\",\"length\":1}",
+ "{\"type\":\"CHAR\",\"nullable\":false,\"length\":1}",
+ "{\"type\":\"CHAR\",\"length\":5}",
+ "{\"type\":\"CHAR\",\"nullable\":false,\"length\":5}",
+ "{\"type\":\"STRING\"}",
+ "{\"type\":\"STRING\",\"nullable\":false}",
+ "{\"type\":\"BINARY\",\"length\":1}",
+ "{\"type\":\"BINARY\",\"nullable\":false,\"length\":1}",
+ "{\"type\":\"BINARY\",\"length\":100}",
+ "{\"type\":\"BINARY\",\"nullable\":false,\"length\":100}",
+ "{\"type\":\"BYTES\"}",
+ "{\"type\":\"BYTES\",\"nullable\":false}",
+ "{\"type\":\"DATE\"}",
+ "{\"type\":\"DATE\",\"nullable\":false}",
+ "{\"type\":\"TIME_WITHOUT_TIME_ZONE\",\"precision\":0}",
+
"{\"type\":\"TIME_WITHOUT_TIME_ZONE\",\"nullable\":false,\"precision\":0}",
+ "{\"type\":\"TIME_WITHOUT_TIME_ZONE\",\"precision\":3}",
+
"{\"type\":\"TIME_WITHOUT_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
+
"{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6}",
+
"{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"nullable\":false,\"precision\":6}",
+
"{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":3}",
+
"{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
+
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"precision\":6}",
+
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":6}",
+
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"precision\":3}",
+
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
+
"{\"type\":\"ARRAY\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\"},\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
Review Comment:
The JSON string for the MAP type compatibility test has the same syntax
error. This should be:
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}"
```suggestion
"{\"type\":\"ARRAY\",\"nullable\":false\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
```
##########
fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java:
##########
@@ -420,7 +465,7 @@ private List<Object[]> doProjection(
private List<Object[]> doProjection(
long tableId,
- short schemaId,
+ int schemaId,
Review Comment:
The parameter type for schemaId changed from 'short' to 'int', but this
appears to be a broader change than necessary for adding field_id support for
nested rows. Consider whether this type change is intentional and whether it
should be part of this PR. If it's an independent change, it might be better
suited for a separate PR to keep changes focused.
##########
fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java:
##########
@@ -137,10 +143,90 @@ protected String[] expectedJsons() {
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
"{\"type\":\"ARRAY\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
-
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
+
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\"},\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
+
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}",
+
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"
Review Comment:
The JSON string for the MAP type has a syntax error with misplaced braces.
The closing brace after "BIGINT" is followed by "nullable\":false}" which
creates invalid JSON structure. This should be:
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}"
```suggestion
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}",
"{\"type\":\"ROW\",\"nullable\":false\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]