gemini-code-assist[bot] commented on code in PR #38752:
URL: https://github.com/apache/beam/pull/38752#discussion_r3329071708


##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java:
##########
@@ -83,9 +126,351 @@ public ReadRows withConfig(Map<String, String> config) {
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      // TODO(https://github.com/apache/beam/issues/38551): Implement 
expansion for
-      // Delta Lake ReadRows
-      throw new UnsupportedOperationException("Not implemented yet.");
+      if (getTablePath() == null) {
+        throw new IllegalArgumentException("Table path must be set.");
+      }
+
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, getTablePath());
+
+      try {
+        Snapshot snapshot = buildSnapshot(engine, table);
+        Scan scan = snapshot.getScanBuilder(engine).build();
+        StructType readSchema = scan.getSchema(engine);
+        org.apache.beam.sdk.schemas.Schema beamSchema = 
inferBeamSchema(readSchema);
+
+        List<DeltaFileDescriptor> fileDescriptors = 
buildFileDescriptors(engine, scan);
+
+        return input
+            .apply("CreateFileDescriptors", Create.of(fileDescriptors)
+                .withCoder(SerializableCoder.of(DeltaFileDescriptor.class)))
+            .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema)))
+            .setRowSchema(beamSchema);
+
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to read Delta table: " + 
getTablePath(), e);
+      }
+    }
+
+    private Snapshot buildSnapshot(Engine engine, Table table) throws 
Exception {
+      if (getVersion() != null) {
+        return table.getSnapshotAsOfVersion(engine, getVersion());
+      } else if (getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(getTimestamp()).getMillis();
+        return table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        return table.getLatestSnapshot(engine);
+      }
+    }
+
+    private List<DeltaFileDescriptor> buildFileDescriptors(Engine engine, Scan 
scan) throws Exception {
+      List<DeltaFileDescriptor> descriptors = new ArrayList<>();
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              descriptors.add(new DeltaFileDescriptor(
+                  getTablePath(),
+                  fileStatus.getPath(),
+                  fileStatus.getSize(),
+                  fileStatus.getModificationTime(),
+                  getHadoopConfig(),
+                  getVersion(),
+                  getTimestamp()
+              ));
+            }
+          }
+        }
+      }
+      return descriptors;
+    }
+  }
+
+  public static class DeltaFileDescriptor implements Serializable {
+    private final String tablePath;
+    private final String filePath;
+    private final long fileSize;
+    private final long modificationTime;
+    private final @Nullable Map<String, String> hadoopConfig;
+    private final @Nullable Long version;
+    private final @Nullable String timestamp;
+
+    public DeltaFileDescriptor(
+        String tablePath,
+        String filePath,
+        long fileSize,
+        long modificationTime,
+        @Nullable Map<String, String> hadoopConfig,
+        @Nullable Long version,
+        @Nullable String timestamp) {
+      this.tablePath = tablePath;
+      this.filePath = filePath;
+      this.fileSize = fileSize;
+      this.modificationTime = modificationTime;
+      this.hadoopConfig = hadoopConfig;
+      this.version = version;
+      this.timestamp = timestamp;
+    }
+
+    public String getTablePath() {
+      return tablePath;
+    }
+
+    public String getFilePath() {
+      return filePath;
+    }
+
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    public long getModificationTime() {
+      return modificationTime;
+    }
+
+    public @Nullable Map<String, String> getHadoopConfig() {
+      return hadoopConfig;
+    }
+
+    public @Nullable Long getVersion() {
+      return version;
+    }
+
+    public @Nullable String getTimestamp() {
+      return timestamp;
+    }
+  }
+
+  public static class ReadFileFn extends DoFn<DeltaFileDescriptor, Row> {
+    private final org.apache.beam.sdk.schemas.Schema beamSchema;
+
+    public ReadFileFn(org.apache.beam.sdk.schemas.Schema beamSchema) {
+      this.beamSchema = beamSchema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      DeltaFileDescriptor desc = c.element();
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(desc.getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, desc.getTablePath());

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Creating a Hadoop `Configuration` and a Delta `Engine` inside 
`processElement` for every single file is highly inefficient as these are 
expensive operations.
   
   Instead, pass the Hadoop configuration map to the `ReadFileFn` constructor, 
initialize the `Engine` once in `@Setup`, and store it in a `transient` field 
to reuse it across all elements in the bundle.



##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java:
##########
@@ -83,9 +126,351 @@ public ReadRows withConfig(Map<String, String> config) {
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      // TODO(https://github.com/apache/beam/issues/38551): Implement 
expansion for
-      // Delta Lake ReadRows
-      throw new UnsupportedOperationException("Not implemented yet.");
+      if (getTablePath() == null) {
+        throw new IllegalArgumentException("Table path must be set.");
+      }
+
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, getTablePath());
+
+      try {
+        Snapshot snapshot = buildSnapshot(engine, table);
+        Scan scan = snapshot.getScanBuilder(engine).build();
+        StructType readSchema = scan.getSchema(engine);
+        org.apache.beam.sdk.schemas.Schema beamSchema = 
inferBeamSchema(readSchema);
+
+        List<DeltaFileDescriptor> fileDescriptors = 
buildFileDescriptors(engine, scan);
+
+        return input
+            .apply("CreateFileDescriptors", Create.of(fileDescriptors)
+                .withCoder(SerializableCoder.of(DeltaFileDescriptor.class)))
+            .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema)))
+            .setRowSchema(beamSchema);
+
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to read Delta table: " + 
getTablePath(), e);
+      }
+    }
+
+    private Snapshot buildSnapshot(Engine engine, Table table) throws 
Exception {
+      if (getVersion() != null) {
+        return table.getSnapshotAsOfVersion(engine, getVersion());
+      } else if (getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(getTimestamp()).getMillis();
+        return table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        return table.getLatestSnapshot(engine);
+      }
+    }
+
+    private List<DeltaFileDescriptor> buildFileDescriptors(Engine engine, Scan 
scan) throws Exception {
+      List<DeltaFileDescriptor> descriptors = new ArrayList<>();
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              descriptors.add(new DeltaFileDescriptor(
+                  getTablePath(),
+                  fileStatus.getPath(),
+                  fileStatus.getSize(),
+                  fileStatus.getModificationTime(),
+                  getHadoopConfig(),
+                  getVersion(),
+                  getTimestamp()
+              ));
+            }
+          }
+        }
+      }
+      return descriptors;
+    }
+  }
+
+  public static class DeltaFileDescriptor implements Serializable {
+    private final String tablePath;
+    private final String filePath;
+    private final long fileSize;
+    private final long modificationTime;
+    private final @Nullable Map<String, String> hadoopConfig;
+    private final @Nullable Long version;
+    private final @Nullable String timestamp;
+
+    public DeltaFileDescriptor(
+        String tablePath,
+        String filePath,
+        long fileSize,
+        long modificationTime,
+        @Nullable Map<String, String> hadoopConfig,
+        @Nullable Long version,
+        @Nullable String timestamp) {
+      this.tablePath = tablePath;
+      this.filePath = filePath;
+      this.fileSize = fileSize;
+      this.modificationTime = modificationTime;
+      this.hadoopConfig = hadoopConfig;
+      this.version = version;
+      this.timestamp = timestamp;
+    }
+
+    public String getTablePath() {
+      return tablePath;
+    }
+
+    public String getFilePath() {
+      return filePath;
+    }
+
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    public long getModificationTime() {
+      return modificationTime;
+    }
+
+    public @Nullable Map<String, String> getHadoopConfig() {
+      return hadoopConfig;
+    }
+
+    public @Nullable Long getVersion() {
+      return version;
+    }
+
+    public @Nullable String getTimestamp() {
+      return timestamp;
+    }
+  }
+
+  public static class ReadFileFn extends DoFn<DeltaFileDescriptor, Row> {
+    private final org.apache.beam.sdk.schemas.Schema beamSchema;
+
+    public ReadFileFn(org.apache.beam.sdk.schemas.Schema beamSchema) {
+      this.beamSchema = beamSchema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      DeltaFileDescriptor desc = c.element();
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(desc.getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, desc.getTablePath());
+
+      Snapshot snapshot;
+      if (desc.getVersion() != null) {
+        snapshot = table.getSnapshotAsOfVersion(engine, desc.getVersion());
+      } else if (desc.getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(desc.getTimestamp()).getMillis();
+        snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        snapshot = table.getLatestSnapshot(engine);
+      }
+
+      Scan scan = snapshot.getScanBuilder(engine).build();
+      io.delta.kernel.data.Row scanState = scan.getScanState(engine);
+
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              if (fileStatus.getPath().equals(desc.getFilePath())) {
+                StructType physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(scanState);
+                CloseableIterator<ColumnarBatch> physicalDataIter =
+                    engine.getParquetHandler().readParquetFiles(
+                        singletonCloseableIterator(fileStatus),
+                        physicalReadSchema,
+                        Optional.empty()).map(FilteredColumnarBatch::getData);
+                try (
+                    CloseableIterator<FilteredColumnarBatch> transformedData =
+                        Scan.transformPhysicalData(
+                            engine,
+                            scanState,
+                            scanFileRow,
+                            physicalDataIter)) {
+                  while (transformedData.hasNext()) {
+                    FilteredColumnarBatch filteredData = 
transformedData.next();
+                    try (CloseableIterator<io.delta.kernel.data.Row> rows = 
filteredData.getRows()) {
+                      while (rows.hasNext()) {
+                        io.delta.kernel.data.Row row = rows.next();
+                        c.output(convertKernelRowToBeamRow(row, beamSchema));
+                      }
+                    }
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(
+      @Nullable Map<String, String> configMap) {
+    org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
+    if (configMap != null) {
+      for (Map.Entry<String, String> entry : configMap.entrySet()) {
+        config.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return config;
+  }
+
+  private static org.apache.beam.sdk.schemas.Schema inferBeamSchema(StructType 
structType) {
+    org.apache.beam.sdk.schemas.Schema.Builder builder = 
org.apache.beam.sdk.schemas.Schema.builder();
+    for (StructField field : structType.fields()) {
+      builder.addField(field.getName(), toBeamFieldType(field.getDataType()));
+    }
+    return builder.build();
+  }
+
+  private static org.apache.beam.sdk.schemas.Schema.FieldType 
toBeamFieldType(DataType dataType) {
+    if (dataType instanceof IntegerType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+    } else if (dataType instanceof LongType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+    } else if (dataType instanceof StringType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+    } else if (dataType instanceof DoubleType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE;
+    } else if (dataType instanceof FloatType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT;
+    } else if (dataType instanceof BooleanType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
+    } else if (dataType instanceof ShortType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.INT16;
+    } else if (dataType instanceof ByteType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.BYTE;
+    } else if (dataType instanceof BinaryType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.BYTES;
+    } else if (dataType instanceof DecimalType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.DECIMAL;
+    } else if (dataType instanceof TimestampType || dataType instanceof 
DateType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
+    } else if (dataType instanceof StructType) {
+      return 
org.apache.beam.sdk.schemas.Schema.FieldType.row(inferBeamSchema((StructType) 
dataType));
+    } else if (dataType instanceof ArrayType) {
+      return 
org.apache.beam.sdk.schemas.Schema.FieldType.array(toBeamFieldType(((ArrayType) 
dataType).getElementType()));
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      return org.apache.beam.sdk.schemas.Schema.FieldType.map(
+          toBeamFieldType(mapType.getKeyType()),
+          toBeamFieldType(mapType.getValueType()));
+    } else {
+      throw new IllegalArgumentException("Unsupported Delta type: " + 
dataType);
+    }
+  }
+
+  private static Row convertKernelRowToBeamRow(
+      io.delta.kernel.data.Row deltaRow, org.apache.beam.sdk.schemas.Schema 
beamSchema) {
+    List<Object> values = new ArrayList<>();
+    StructType structType = deltaRow.getSchema();
+    for (int i = 0; i < structType.length(); i++) {
+      if (deltaRow.isNullAt(i)) {
+        values.add(null);
+      } else {
+        DataType dataType = structType.at(i).getDataType();
+        values.add(convertValue(deltaRow, i, dataType));
+      }
+    }
+    return Row.withSchema(beamSchema).addValues(values).build();
+  }
+
+  private static Object convertValue(io.delta.kernel.data.Row deltaRow, int 
ordinal, DataType dataType) {
+    if (deltaRow.isNullAt(ordinal)) {
+      return null;
+    }
+    if (dataType instanceof IntegerType) {
+      return deltaRow.getInt(ordinal);
+    } else if (dataType instanceof LongType) {
+      return deltaRow.getLong(ordinal);
+    } else if (dataType instanceof StringType) {
+      return deltaRow.getString(ordinal);
+    } else if (dataType instanceof DoubleType) {
+      return deltaRow.getDouble(ordinal);
+    } else if (dataType instanceof FloatType) {
+      return deltaRow.getFloat(ordinal);
+    } else if (dataType instanceof BooleanType) {
+      return deltaRow.getBoolean(ordinal);
+    } else if (dataType instanceof ShortType) {
+      return deltaRow.getShort(ordinal);
+    } else if (dataType instanceof ByteType) {
+      return deltaRow.getByte(ordinal);
+    } else if (dataType instanceof BinaryType) {
+      return deltaRow.getBinary(ordinal);
+    } else if (dataType instanceof DecimalType) {
+      return deltaRow.getDecimal(ordinal);
+    } else if (dataType instanceof TimestampType) {
+      long micros = deltaRow.getLong(ordinal);
+      return org.joda.time.Instant.ofEpochMilli(micros / 1000);
+    } else if (dataType instanceof DateType) {
+      int days = deltaRow.getInt(ordinal);
+      return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000);
+    } else if (dataType instanceof StructType) {
+      io.delta.kernel.data.Row structRow = deltaRow.getStruct(ordinal);
+      return convertKernelRowToBeamRow(structRow, inferBeamSchema((StructType) 
dataType));
+    } else if (dataType instanceof ArrayType) {
+      ArrayValue arrayVal = deltaRow.getArray(ordinal);
+      int size = arrayVal.getSize();
+      List<Object> list = new ArrayList<>(size);
+      DataType elemType = ((ArrayType) dataType).getElementType();
+      ColumnVector vec = arrayVal.getElements();
+      for (int j = 0; j < size; j++) {
+        if (vec.isNullAt(j)) {
+          list.add(null);
+        } else {
+          list.add(convertVectorValue(vec, j, elemType));
+        }
+      }
+      return list;
+    } else if (dataType instanceof MapType) {
+      MapValue mapVal = deltaRow.getMap(ordinal);
+      int size = mapVal.getSize();
+      Map<Object, Object> map = new HashMap<>(size);
+      DataType keyType = ((MapType) dataType).getKeyType();
+      DataType valueType = ((MapType) dataType).getValueType();
+      ColumnVector keysVec = mapVal.getKeys();
+      ColumnVector valuesVec = mapVal.getValues();
+      for (int j = 0; j < size; j++) {
+        Object key = convertVectorValue(keysVec, j, keyType);
+        Object val = valuesVec.isNullAt(j) ? null : 
convertVectorValue(valuesVec, j, valueType);
+        map.put(key, val);
+      }
+      return map;
+    } else {
+      return deltaRow.toString();
+    }
+  }
+
+  private static Object convertVectorValue(ColumnVector vec, int rowId, 
DataType dataType) {
+    if (dataType instanceof IntegerType) {
+      return vec.getInt(rowId);
+    } else if (dataType instanceof LongType) {
+      return vec.getLong(rowId);
+    } else if (dataType instanceof StringType) {
+      return vec.getString(rowId);
+    } else if (dataType instanceof DoubleType) {
+      return vec.getDouble(rowId);
+    } else if (dataType instanceof FloatType) {
+      return vec.getFloat(rowId);
+    } else if (dataType instanceof BooleanType) {
+      return vec.getBoolean(rowId);
+    } else if (dataType instanceof ShortType) {
+      return vec.getShort(rowId);
+    } else if (dataType instanceof ByteType) {
+      return vec.getByte(rowId);
+    } else if (dataType instanceof BinaryType) {
+      return vec.getBinary(rowId);
+    } else if (dataType instanceof DecimalType) {
+      return vec.getDecimal(rowId);
+    } else {
+      return vec.toString();
     }
   }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The current implementation of `convertVectorValue` does not handle complex 
types such as `TimestampType`, `DateType`, `StructType`, `ArrayType`, or 
`MapType`. If any of these types are present inside an array or map, it will 
fall back to `vec.toString()`, which returns an internal class name or debug 
string instead of the actual value, leading to correctness bugs.
   
   Update `convertVectorValue` to recursively handle all supported Delta types, 
similar to how `convertValue` is implemented.
   
   ```java
     private static Object convertVectorValue(ColumnVector vec, int rowId, 
DataType dataType) {
       if (vec.isNullAt(rowId)) {
         return null;
       }
       if (dataType instanceof IntegerType) {
         return vec.getInt(rowId);
       } else if (dataType instanceof LongType) {
         return vec.getLong(rowId);
       } else if (dataType instanceof StringType) {
         return vec.getString(rowId);
       } else if (dataType instanceof DoubleType) {
         return vec.getDouble(rowId);
       } else if (dataType instanceof FloatType) {
         return vec.getFloat(rowId);
       } else if (dataType instanceof BooleanType) {
         return vec.getBoolean(rowId);
       } else if (dataType instanceof ShortType) {
         return vec.getShort(rowId);
       } else if (dataType instanceof ByteType) {
         return vec.getByte(rowId);
       } else if (dataType instanceof BinaryType) {
         return vec.getBinary(rowId);
       } else if (dataType instanceof DecimalType) {
         return vec.getDecimal(rowId);
       } else if (dataType instanceof TimestampType) {
         long micros = vec.getLong(rowId);
         return org.joda.time.Instant.ofEpochMilli(micros / 1000);
       } else if (dataType instanceof DateType) {
         int days = vec.getInt(rowId);
         return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000);
       } else if (dataType instanceof StructType) {
         io.delta.kernel.data.Row structRow = vec.getStruct(rowId);
         return convertKernelRowToBeamRow(structRow, 
inferBeamSchema((StructType) dataType));
       } else if (dataType instanceof ArrayType) {
         ArrayValue arrayVal = vec.getArray(rowId);
         int size = arrayVal.getSize();
         List<Object> list = new ArrayList<>(size);
         DataType elemType = ((ArrayType) dataType).getElementType();
         ColumnVector elementsVec = arrayVal.getElements();
         for (int j = 0; j < size; j++) {
           if (elementsVec.isNullAt(j)) {
             list.add(null);
           } else {
             list.add(convertVectorValue(elementsVec, j, elemType));
           }
         }
         return list;
       } else if (dataType instanceof MapType) {
         MapValue mapVal = vec.getMap(rowId);
         int size = mapVal.getSize();
         Map<Object, Object> map = new HashMap<>(size);
         DataType keyType = ((MapType) dataType).getKeyType();
         DataType valueType = ((MapType) dataType).getValueType();
         ColumnVector keysVec = mapVal.getKeys();
         ColumnVector valuesVec = mapVal.getValues();
         for (int j = 0; j < size; j++) {
           Object key = convertVectorValue(keysVec, j, keyType);
           Object val = valuesVec.isNullAt(j) ? null : 
convertVectorValue(valuesVec, j, valueType);
           map.put(key, val);
         }
         return map;
       } else {
         return vec.toString();
       }
     }
   ```



##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java:
##########
@@ -83,9 +126,351 @@ public ReadRows withConfig(Map<String, String> config) {
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      // TODO(https://github.com/apache/beam/issues/38551): Implement 
expansion for
-      // Delta Lake ReadRows
-      throw new UnsupportedOperationException("Not implemented yet.");
+      if (getTablePath() == null) {
+        throw new IllegalArgumentException("Table path must be set.");
+      }
+
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, getTablePath());
+
+      try {
+        Snapshot snapshot = buildSnapshot(engine, table);
+        Scan scan = snapshot.getScanBuilder(engine).build();
+        StructType readSchema = scan.getSchema(engine);
+        org.apache.beam.sdk.schemas.Schema beamSchema = 
inferBeamSchema(readSchema);
+
+        List<DeltaFileDescriptor> fileDescriptors = 
buildFileDescriptors(engine, scan);
+
+        return input
+            .apply("CreateFileDescriptors", Create.of(fileDescriptors)
+                .withCoder(SerializableCoder.of(DeltaFileDescriptor.class)))
+            .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema)))
+            .setRowSchema(beamSchema);
+
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to read Delta table: " + 
getTablePath(), e);
+      }
+    }
+
+    private Snapshot buildSnapshot(Engine engine, Table table) throws 
Exception {
+      if (getVersion() != null) {
+        return table.getSnapshotAsOfVersion(engine, getVersion());
+      } else if (getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(getTimestamp()).getMillis();
+        return table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        return table.getLatestSnapshot(engine);
+      }
+    }
+
+    private List<DeltaFileDescriptor> buildFileDescriptors(Engine engine, Scan 
scan) throws Exception {
+      List<DeltaFileDescriptor> descriptors = new ArrayList<>();
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              descriptors.add(new DeltaFileDescriptor(
+                  getTablePath(),
+                  fileStatus.getPath(),
+                  fileStatus.getSize(),
+                  fileStatus.getModificationTime(),
+                  getHadoopConfig(),
+                  getVersion(),
+                  getTimestamp()
+              ));
+            }
+          }
+        }
+      }
+      return descriptors;
+    }
+  }
+
+  public static class DeltaFileDescriptor implements Serializable {
+    private final String tablePath;
+    private final String filePath;
+    private final long fileSize;
+    private final long modificationTime;
+    private final @Nullable Map<String, String> hadoopConfig;
+    private final @Nullable Long version;
+    private final @Nullable String timestamp;
+
+    public DeltaFileDescriptor(
+        String tablePath,
+        String filePath,
+        long fileSize,
+        long modificationTime,
+        @Nullable Map<String, String> hadoopConfig,
+        @Nullable Long version,
+        @Nullable String timestamp) {
+      this.tablePath = tablePath;
+      this.filePath = filePath;
+      this.fileSize = fileSize;
+      this.modificationTime = modificationTime;
+      this.hadoopConfig = hadoopConfig;
+      this.version = version;
+      this.timestamp = timestamp;
+    }
+
+    public String getTablePath() {
+      return tablePath;
+    }
+
+    public String getFilePath() {
+      return filePath;
+    }
+
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    public long getModificationTime() {
+      return modificationTime;
+    }
+
+    public @Nullable Map<String, String> getHadoopConfig() {
+      return hadoopConfig;
+    }
+
+    public @Nullable Long getVersion() {
+      return version;
+    }
+
+    public @Nullable String getTimestamp() {
+      return timestamp;
+    }
+  }
+
+  public static class ReadFileFn extends DoFn<DeltaFileDescriptor, Row> {
+    private final org.apache.beam.sdk.schemas.Schema beamSchema;
+
+    public ReadFileFn(org.apache.beam.sdk.schemas.Schema beamSchema) {
+      this.beamSchema = beamSchema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      DeltaFileDescriptor desc = c.element();
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(desc.getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, desc.getTablePath());
+
+      Snapshot snapshot;
+      if (desc.getVersion() != null) {
+        snapshot = table.getSnapshotAsOfVersion(engine, desc.getVersion());
+      } else if (desc.getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(desc.getTimestamp()).getMillis();
+        snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        snapshot = table.getLatestSnapshot(engine);
+      }
+
+      Scan scan = snapshot.getScanBuilder(engine).build();
+      io.delta.kernel.data.Row scanState = scan.getScanState(engine);
+
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              if (fileStatus.getPath().equals(desc.getFilePath())) {

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   Re-listing all scan files in the table for *every* file descriptor processed 
by `ReadFileFn` leads to $O(N^2)$ complexity, where $N$ is the number of files 
in the Delta table. For large tables, this will cause severe performance 
degradation and potential timeouts on workers.
   
   Consider refactoring the reading logic to avoid re-listing all files on 
every element. For example, you could serialize the necessary metadata of 
`scanFileRow` directly into `DeltaFileDescriptor` and reconstruct a lightweight 
`Row` wrapper on the worker, or group the file reads so that a single worker 
task can process multiple files from the same scan iterator without re-listing.



##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java:
##########
@@ -83,9 +126,351 @@ public ReadRows withConfig(Map<String, String> config) {
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      // TODO(https://github.com/apache/beam/issues/38551): Implement 
expansion for
-      // Delta Lake ReadRows
-      throw new UnsupportedOperationException("Not implemented yet.");
+      if (getTablePath() == null) {
+        throw new IllegalArgumentException("Table path must be set.");
+      }
+
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, getTablePath());
+
+      try {
+        Snapshot snapshot = buildSnapshot(engine, table);
+        Scan scan = snapshot.getScanBuilder(engine).build();
+        StructType readSchema = scan.getSchema(engine);
+        org.apache.beam.sdk.schemas.Schema beamSchema = 
inferBeamSchema(readSchema);
+
+        List<DeltaFileDescriptor> fileDescriptors = 
buildFileDescriptors(engine, scan);
+
+        return input
+            .apply("CreateFileDescriptors", Create.of(fileDescriptors)
+                .withCoder(SerializableCoder.of(DeltaFileDescriptor.class)))

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `SerializableCoder` for pipeline elements is highly discouraged in 
Apache Beam as it is slow and produces large serialized payloads. Since 
`DeltaFileDescriptor` is a simple POJO, you can annotate it with 
`@DefaultSchema(JavaFieldSchema.class)` and let Beam automatically infer and 
use a highly optimized `SchemaCoder` instead.
   
   ```java
           return input
               .apply("CreateFileDescriptors", Create.of(fileDescriptors))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to