stevenzwu commented on code in PR #15329:
URL: https://github.com/apache/iceberg/pull/15329#discussion_r2813898110


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java:
##########
@@ -29,9 +30,10 @@
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
+@Internal
 abstract class FlinkSchemaVisitor<T> {
 
-  static <T> T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor<T> 
visitor) {
+  public static <T> T visit(RowType flinkType, Schema schema, 
FlinkSchemaVisitor<T> visitor) {

Review Comment:
   is this `public` change required?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java:
##########
@@ -94,24 +96,29 @@ private static <T> T visitRecord(
     List<LogicalType> fieldTypes = 
Lists.newArrayListWithExpectedSize(fieldSize);
     List<Types.NestedField> nestedFields = struct.fields();
 
-    for (int i = 0; i < fieldSize; i++) {
-      Types.NestedField iField = nestedFields.get(i);
-      int fieldIndex = rowType.getFieldIndex(iField.name());
-      Preconditions.checkArgument(
-          fieldIndex >= 0, "NestedField: %s is not found in flink RowType: 
%s", iField, rowType);
+    visitor.beforeStruct(struct.asStructType());
 
-      LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex);
+    try {

Review Comment:
   why do we need to switch the visit impl to try-finally? if there is any 
exception, it would just fail. is it important to call the `after` methods?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -112,23 +104,23 @@ private CloseableIterable<RowData> newIterable(
     if (task.isDataTask()) {
       throw new UnsupportedOperationException("Cannot read data task.");
     } else {
-      switch (task.file().format()) {
-        case PARQUET:
-          iter = newParquetIterable(task, schema, idToConstant, 
inputFilesDecryptor);
-          break;
-
-        case AVRO:
-          iter = newAvroIterable(task, schema, idToConstant, 
inputFilesDecryptor);
-          break;
-
-        case ORC:
-          iter = newOrcIterable(task, schema, idToConstant, 
inputFilesDecryptor);
-          break;
-
-        default:
-          throw new UnsupportedOperationException(
-              "Cannot read unknown format: " + task.file().format());
+      ReadBuilder<RowData, RowType> builder =
+          FormatModelRegistry.readBuilder(
+              task.file().format(), RowData.class, 
inputFilesDecryptor.getInputFile(task));
+
+      if (nameMapping != null) {
+        builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
       }
+
+      iter =
+          builder
+              .project(schema)
+              .idToConstant(idToConstant)
+              .split(task.start(), task.length())
+              .caseSensitive(caseSensitive)
+              .filter(task.residual())
+              .reuseContainers()

Review Comment:
   the old `newOrcIterable` method doesn't set this boolean flag since Orc 
ReadBuilder doesn't support it. I forgot how the new API handles the difference



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java:
##########
@@ -319,11 +321,32 @@ public void testTableWithTargetFileSize() throws 
Exception {
   public void testPromotedFlinkDataType() throws Exception {
     Schema iSchema =
         new Schema(
+            Types.NestedField.required(

Review Comment:
   can you help me understand the purpose of expanded types in this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to