wuchong commented on code in PR #2322:
URL: https://github.com/apache/fluss/pull/2322#discussion_r2670791386


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java:
##########
@@ -575,10 +578,7 @@ private void sanityCheck(RowType flussTableRowType, 
@Nullable int[] projectedFie
                         ? flussTableRowType.project(projectedFields)
                         // only read the output fields from source
                         : 
flussTableRowType.project(sourceOutputType.getFieldNames());
-        if (!sourceOutputType.copy(false).equals(tableRowType.copy(false))) {
-            // The default nullability of Flink row type and Fluss row type 
might be not the same,
-            // thus we need to compare the row type without nullability here.
-
+        if (!compareRowTypesIgnoreFieldId(tableRowType, sourceOutputType)) {

Review Comment:
   We don't need this complex logic if we ignore field id in `RowType#equals`.



##########
fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java:
##########
@@ -291,18 +294,38 @@ private static DataType deserializeMap(JsonNode 
dataTypeNode) {
     private static DataType deserializeRow(JsonNode dataTypeNode) {
         final ArrayNode fieldNodes = (ArrayNode) 
dataTypeNode.get(FIELD_NAME_FIELDS);
         final List<DataField> fields = new ArrayList<>();
+
+        boolean hasFieldWithId = false;
+        boolean hasFieldWithoutId = false;
+        int autoFieldId = 0;
+
         for (JsonNode fieldNode : fieldNodes) {
             final String fieldName = 
fieldNode.get(FIELD_NAME_FIELD_NAME).asText();
             final DataType fieldType =
                     
DataTypeJsonSerde.INSTANCE.deserialize(fieldNode.get(FIELD_NAME_FIELD_TYPE));
-            final String fieldDescription;
-            if (fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION)) {
-                fieldDescription = 
fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText();
+            final String fieldDescription =
+                    fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION)
+                            ? 
fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText()
+                            : null;
+
+            final int fieldId;
+            if (fieldNode.has(FIELD_NAME_FIELD_ID)) {
+                hasFieldWithId = true;
+                fieldId = fieldNode.get(FIELD_NAME_FIELD_ID).asInt();
             } else {
-                fieldDescription = null;
+                hasFieldWithoutId = true;
+                fieldId = autoFieldId++;

Review Comment:
   It seems the `autoFieldId` is not used. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java:
##########
@@ -592,4 +597,72 @@ void testExceptionsForComplexTypesUsage() {
                         "Bucket key column 'info' has unsupported data type 
ROW<`name` STRING, `age` INT>. "
                                 + "Currently, bucket key column does not 
support types: [ARRAY, MAP, ROW].");
     }
+
+    @Test
+    void testProjectionAndAddColumnInLogTable() throws Exception {
+        tEnv.executeSql(
+                "create table row_log_test ("
+                        + "id int, "
+                        + "simple_row row<a int, b string>, "
+                        + "nested_row row<x int, y row<z int, w string>, v 
string>, "
+                        + "array_of_rows array<row<a int, b string>>"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO row_log_test VALUES "
+                                + "(1, ROW(10, 'hello'), ROW(20, ROW(30, 
'nested'), 'row1'), "
+                                + "ARRAY[ROW(1, 'a'), ROW(2, 'b')]), "
+                                + "(2, ROW(40, 'world'), ROW(50, ROW(60, 
'test'), 'row2'), "
+                                + "ARRAY[ROW(3, 'c')])")
+                .await();
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
row_log_test").collect();
+        List<String> expectedRows =
+                Arrays.asList(
+                        "+I[1, +I[10, hello], +I[20, +I[30, nested], row1], 
[+I[1, a], +I[2, b]]]",
+                        "+I[2, +I[40, world], +I[50, +I[60, test], row2], 
[+I[3, c]]]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+
+        // Currently, flink not supported push down nested row projection 
because
+        // FlinkTableSource.supportsNestedProjection returns false.
+        // Todo: support nested row projection pushdown in
+        // https://github.com/apache/fluss/issues/2311 later.
+        String s = tEnv.explainSql("select id, simple_row.a, nested_row.y.z 
from row_log_test");
+        assertThat(s)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
row_log_test, project=[id, simple_row, nested_row]]], fields=[id, simple_row, 
nested_row])");
+        rowIter =
+                tEnv.executeSql("select id, simple_row.a, nested_row.y.z from 
row_log_test")

Review Comment:
   Can you add a primitive type column `f` at the end of the schema, and 
`select id, simple_row.a, nested_row.y.z, f from xxx`, this verifies the 
projection pushdown works if there is complex type in the middle of schema. 



##########
fluss-common/src/main/java/org/apache/fluss/types/RowType.java:
##########
@@ -172,23 +171,98 @@ public int hashCode() {
         return Objects.hash(super.hashCode(), fields);
     }
 
+    /**
+     * Compares this RowType with another RowType, ignoring field IDs.
+     *
+     * @param other the other RowType to compare with
+     * @return true if the RowTypes are equal ignoring field IDs, false 
otherwise
+     */
+    public boolean equalsIgnoreFieldId(RowType other) {

Review Comment:
   We can provide a `equalsWithFieldId` method for the purpose that compares 
with field id if we ignore field id in `equals()`. Besides, we should use a 
DataTypeVisitor to do the equals field id, because there are nested fields. 



##########
fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java:
##########
@@ -291,18 +294,38 @@ private static DataType deserializeMap(JsonNode 
dataTypeNode) {
     private static DataType deserializeRow(JsonNode dataTypeNode) {
         final ArrayNode fieldNodes = (ArrayNode) 
dataTypeNode.get(FIELD_NAME_FIELDS);
         final List<DataField> fields = new ArrayList<>();
+
+        boolean hasFieldWithId = false;
+        boolean hasFieldWithoutId = false;
+        int autoFieldId = 0;
+
         for (JsonNode fieldNode : fieldNodes) {
             final String fieldName = 
fieldNode.get(FIELD_NAME_FIELD_NAME).asText();
             final DataType fieldType =
                     
DataTypeJsonSerde.INSTANCE.deserialize(fieldNode.get(FIELD_NAME_FIELD_TYPE));
-            final String fieldDescription;
-            if (fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION)) {
-                fieldDescription = 
fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText();
+            final String fieldDescription =
+                    fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION)
+                            ? 
fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText()
+                            : null;
+
+            final int fieldId;
+            if (fieldNode.has(FIELD_NAME_FIELD_ID)) {
+                hasFieldWithId = true;
+                fieldId = fieldNode.get(FIELD_NAME_FIELD_ID).asInt();
             } else {
-                fieldDescription = null;
+                hasFieldWithoutId = true;
+                fieldId = autoFieldId++;
             }
-            fields.add(new DataField(fieldName, fieldType, fieldDescription));
+
+            fields.add(new DataField(fieldName, fieldType, fieldDescription, 
fieldId));
         }
+
+        if (hasFieldWithId && hasFieldWithoutId) {
+            throw new TableException(
+                    "Field ID inconsistency detected in row type: "
+                            + "all fields must either have field IDs or none 
should have field IDs.");
+        }

Review Comment:
   I think we can remove this to simplify the logic. We can treat the field id 
the same as description (a metadata), becuase `RowType` itself doesn't use it. 
So RowType is hard to know the correctness of the field id. It's the `Schema` 
use the field id, so it should be the `Schema` to validate it. 



##########
fluss-common/src/main/java/org/apache/fluss/types/RowType.java:
##########
@@ -172,23 +171,98 @@ public int hashCode() {
         return Objects.hash(super.hashCode(), fields);
     }
 
+    /**
+     * Compares this RowType with another RowType, ignoring field IDs.
+     *
+     * @param other the other RowType to compare with
+     * @return true if the RowTypes are equal ignoring field IDs, false 
otherwise
+     */
+    public boolean equalsIgnoreFieldId(RowType other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null) {
+            return false;
+        }
+        if (this.isNullable() != other.isNullable()) {
+            return false;
+        }
+        if (this.fields.size() != other.fields.size()) {
+            return false;
+        }
+        for (int i = 0; i < fields.size(); i++) {
+            DataField thisField = fields.get(i);
+            DataField otherField = other.fields.get(i);
+            if (!thisField.getName().equals(otherField.getName())) {
+                return false;
+            }
+            if (!thisField.getType().equals(otherField.getType())) {
+                return false;
+            }
+            if (!Objects.equals(thisField.getDescription(), 
otherField.getDescription())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
-    private static void validateFields(List<DataField> fields) {
+    private static List<DataField> validateFields(List<DataField> fields) {
+        // Validate field names
         final List<String> fieldNames =
                 
fields.stream().map(DataField::getName).collect(Collectors.toList());
         if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) 
{
             throw new IllegalArgumentException(
                     "Field names must contain at least one non-whitespace 
character.");
         }
-        final Set<String> duplicates =
+        final Set<String> duplicateNames =
                 fieldNames.stream()
                         .filter(n -> Collections.frequency(fieldNames, n) > 1)
                         .collect(Collectors.toSet());
-        if (!duplicates.isEmpty()) {
+        if (!duplicateNames.isEmpty()) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Field names must be unique. Found duplicates: 
%s", duplicateNames));
+        }
+
+        // Validate and process field IDs
+        final List<Integer> fieldIds =
+                
fields.stream().map(DataField::getFieldId).collect(Collectors.toList());

Review Comment:
   We can remove the validation, because Schema should check it, not the 
RowType.



##########
fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java:
##########
@@ -728,4 +780,73 @@ public static RowType getKeyRowType(Schema schema, int[] 
keyIndexes) {
         }
         return new RowType(keyRowFields);
     }
+
+    /**
+     * Validates field IDs in the schema, including both top-level column IDs 
and nested field IDs.
+     *
+     * <p>This method performs the following checks:
+     *
+     * <ul>
+     *   <li>Ensures all top-level column IDs are unique
+     *   <li>Ensures all field IDs (including nested fields in ROW, ARRAY, MAP 
types) are globally
+     *       unique
+     *   <li>Verifies that the highest field ID is greater than or equal to 
all existing field IDs
+     * </ul>
+     *
+     * @param columns the list of columns to validate
+     * @param highestFieldId the highest field ID that should be greater than 
or equal to all field
+     *     IDs
+     * @throws IllegalStateException if any validation fails
+     */
+    private static void checkFieldIds(List<Column> columns, int 
highestFieldId) {
+
+        // Collect all field IDs (including nested fields) for validation
+        List<Integer> allFieldIds = collectAllFieldIds(columns);
+
+        // Validate all field IDs (including nested fields) are unique
+        long uniqueFieldIdsCount = allFieldIds.stream().distinct().count();
+        checkState(
+                uniqueFieldIdsCount == allFieldIds.size(),
+                "All field IDs (including nested fields) must be unique. Found 
%s unique IDs but expected %s.",

Review Comment:
   nit: Would be better to just print the `columns` which should carries the 
field id in the result string. 



##########
fluss-common/src/main/java/org/apache/fluss/types/RowType.java:
##########
@@ -229,18 +307,20 @@ public static class Builder {
         private final List<DataField> fields = new ArrayList<>();
 
         private final boolean isNullable;
+        private final AtomicInteger fieldId;

Review Comment:
   We don't need this because `Schema#column()` will reassign field ids. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to