openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r839152573



##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
##########
@@ -63,16 +109,48 @@ public static RowDataProjection create(RowType rowType, 
Types.StructType schema,
     return new RowDataProjection(rowType, schema, projectedSchema);
   }
 
-  private final RowData.FieldGetter[] getters;
-  private RowData rowData;
+  private RowDataProjection(RowType rowType, Types.StructType rowStruct, 
Types.StructType projectType,

Review comment:
       I see the previous `RowDataProjection` has already supported accessing 
the values of nested projected schema.   I think the only thing that we need to 
do is:  converting the `int[][] projectedFieldIndexes` into a projected iceberg 
`Schema`,  then we can just call the `RowDataProjection#create(RowType rowType, 
Types.StructType schema, Types.StructType projectedSchema)` to access those 
projected values

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -104,22 +105,15 @@ public void applyProjection(int[][] projectFields) {
         .tableLoader(loader)
         .properties(properties)
         .project(getProjectedSchema())
+        .projectedFields(projectedFields)
         .limit(limit)
         .filters(filters)
         .flinkConf(readableConfig)
         .build();
   }
 
   private TableSchema getProjectedSchema() {
-    if (projectedFields == null) {
-      return schema;
-    } else {
-      String[] fullNames = schema.getFieldNames();
-      DataType[] fullTypes = schema.getFieldDataTypes();
-      return TableSchema.builder().fields(
-          Arrays.stream(projectedFields).mapToObj(i -> 
fullNames[i]).toArray(String[]::new),
-          Arrays.stream(projectedFields).mapToObj(i -> 
fullTypes[i]).toArray(DataType[]::new)).build();
-    }
+    return projectedFields == null ? schema : projectSchema(schema, 
topProjectedFields);

Review comment:
       I still don't understand why the `projected schema` is the projected 
top-level schema, rather than the projected nested schema.   Let's say the 
original schema is a big tree, and all fields are nodes in this tree, then the 
`projected schema` should be a subtree from the original big tree. 

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
##########
@@ -84,9 +162,51 @@ private RowDataProjection(RowType rowType, Types.StructType 
rowStruct, Types.Str
     }
   }
 
-  private static RowData.FieldGetter createFieldGetter(RowType rowType,
-                                                       int position,
-                                                       Types.NestedField 
rowField,
+  private static FieldGetter createFieldGetter(RowType rowType,

Review comment:
       Let's just revert those changes according to the above comments ? 

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -104,22 +105,15 @@ public void applyProjection(int[][] projectFields) {
         .tableLoader(loader)
         .properties(properties)
         .project(getProjectedSchema())
+        .projectedFields(projectedFields)
         .limit(limit)
         .filters(filters)
         .flinkConf(readableConfig)
         .build();
   }
 
   private TableSchema getProjectedSchema() {
-    if (projectedFields == null) {
-      return schema;
-    } else {
-      String[] fullNames = schema.getFieldNames();
-      DataType[] fullTypes = schema.getFieldDataTypes();
-      return TableSchema.builder().fields(
-          Arrays.stream(projectedFields).mapToObj(i -> 
fullNames[i]).toArray(String[]::new),
-          Arrays.stream(projectedFields).mapToObj(i -> 
fullTypes[i]).toArray(DataType[]::new)).build();
-    }
+    return projectedFields == null ? schema : projectSchema(schema, 
topProjectedFields);

Review comment:
       So why do we must use the top-level projected schema ?   Will it 
introduce extra unnecessary fields when do the real row data projection ? 
   
   For example, we have an original schema: 
   
   ```
   id: bigint
   data: struct {
       f0: bigint,
       f1: bigint
   }
   ```
   
   The required projecting fields are : `<id, data.f0>`.   Here we will get a 
`<id, data>` as the projected schema,  but in fact we should get a `<id, 
data.f0>` as the projected schema. 
   
   If we encounter a  `RowData`,  will we also encode the value of `data.f1` to 
the end users ? 
   
   

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -76,7 +81,7 @@ public IcebergTableSource(TableLoader loader, TableSchema 
schema, Map<String, St
   }
 
   private IcebergTableSource(TableLoader loader, TableSchema schema, 
Map<String, String> properties,
-                             int[] projectedFields, boolean isLimitPushDown,
+                             int[][] projectedFields, boolean isLimitPushDown,

Review comment:
       What's the value of `topProjectedFields` in this constructor ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to