gemini-code-assist[bot] commented on code in PR #38750:
URL: https://github.com/apache/beam/pull/38750#discussion_r3328562189


##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java:
##########
@@ -83,9 +126,351 @@ public ReadRows withConfig(Map<String, String> config) {
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      // TODO(https://github.com/apache/beam/issues/38551): Implement 
expansion for
-      // Delta Lake ReadRows
-      throw new UnsupportedOperationException("Not implemented yet.");
+      if (getTablePath() == null) {
+        throw new IllegalArgumentException("Table path must be set.");
+      }
+
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, getTablePath());
+
+      try {
+        Snapshot snapshot = buildSnapshot(engine, table);
+        Scan scan = snapshot.getScanBuilder(engine).build();
+        StructType readSchema = scan.getSchema(engine);
+        org.apache.beam.sdk.schemas.Schema beamSchema = 
inferBeamSchema(readSchema);
+
+        List<DeltaFileDescriptor> fileDescriptors = 
buildFileDescriptors(engine, scan);
+
+        return input
+            .apply("CreateFileDescriptors", Create.of(fileDescriptors)
+                .withCoder(SerializableCoder.of(DeltaFileDescriptor.class)))
+            .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema)))
+            .setRowSchema(beamSchema);
+
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to read Delta table: " + 
getTablePath(), e);
+      }
+    }
+
+    private Snapshot buildSnapshot(Engine engine, Table table) throws 
Exception {
+      if (getVersion() != null) {
+        return table.getSnapshotAsOfVersion(engine, getVersion());
+      } else if (getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(getTimestamp()).getMillis();
+        return table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        return table.getLatestSnapshot(engine);
+      }
+    }
+
+    private List<DeltaFileDescriptor> buildFileDescriptors(Engine engine, Scan 
scan) throws Exception {
+      List<DeltaFileDescriptor> descriptors = new ArrayList<>();
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              descriptors.add(new DeltaFileDescriptor(
+                  getTablePath(),
+                  fileStatus.getPath(),
+                  fileStatus.getSize(),
+                  fileStatus.getModificationTime(),
+                  getHadoopConfig(),
+                  getVersion(),
+                  getTimestamp()
+              ));
+            }
+          }
+        }
+      }
+      return descriptors;
+    }
+  }
+
+  public static class DeltaFileDescriptor implements Serializable {
+    private final String tablePath;
+    private final String filePath;
+    private final long fileSize;
+    private final long modificationTime;
+    private final @Nullable Map<String, String> hadoopConfig;
+    private final @Nullable Long version;
+    private final @Nullable String timestamp;
+
+    public DeltaFileDescriptor(
+        String tablePath,
+        String filePath,
+        long fileSize,
+        long modificationTime,
+        @Nullable Map<String, String> hadoopConfig,
+        @Nullable Long version,
+        @Nullable String timestamp) {
+      this.tablePath = tablePath;
+      this.filePath = filePath;
+      this.fileSize = fileSize;
+      this.modificationTime = modificationTime;
+      this.hadoopConfig = hadoopConfig;
+      this.version = version;
+      this.timestamp = timestamp;
+    }
+
+    public String getTablePath() {
+      return tablePath;
+    }
+
+    public String getFilePath() {
+      return filePath;
+    }
+
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    public long getModificationTime() {
+      return modificationTime;
+    }
+
+    public @Nullable Map<String, String> getHadoopConfig() {
+      return hadoopConfig;
+    }
+
+    public @Nullable Long getVersion() {
+      return version;
+    }
+
+    public @Nullable String getTimestamp() {
+      return timestamp;
+    }
+  }
+
+  public static class ReadFileFn extends DoFn<DeltaFileDescriptor, Row> {
+    private final org.apache.beam.sdk.schemas.Schema beamSchema;
+
+    public ReadFileFn(org.apache.beam.sdk.schemas.Schema beamSchema) {
+      this.beamSchema = beamSchema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      DeltaFileDescriptor desc = c.element();
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(desc.getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, desc.getTablePath());
+
+      Snapshot snapshot;
+      if (desc.getVersion() != null) {
+        snapshot = table.getSnapshotAsOfVersion(engine, desc.getVersion());
+      } else if (desc.getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(desc.getTimestamp()).getMillis();
+        snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        snapshot = table.getLatestSnapshot(engine);
+      }
+
+      Scan scan = snapshot.getScanBuilder(engine).build();
+      io.delta.kernel.data.Row scanState = scan.getScanState(engine);
+
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              if (fileStatus.getPath().equals(desc.getFilePath())) {
+                StructType physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(scanState);
+                CloseableIterator<ColumnarBatch> physicalDataIter =
+                    engine.getParquetHandler().readParquetFiles(

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   In `ReadFileFn.processElement`, the code re-scans the entire list of files 
in the Delta table (`scan.getScanFiles(engine)`) for every single 
`DeltaFileDescriptor` to find the matching file status. If a table has $N$ 
files, this results in $O(N^2)$ complexity across the cluster, and re-loads the 
Delta log $N$ times. This will cause severe performance degradation on larger 
tables. Consider serializing the `scanFileRow` or passing the necessary 
metadata directly in `DeltaFileDescriptor` to avoid re-scanning the log on 
every worker thread.



##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java:
##########
@@ -83,9 +126,351 @@ public ReadRows withConfig(Map<String, String> config) {
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      // TODO(https://github.com/apache/beam/issues/38551): Implement 
expansion for
-      // Delta Lake ReadRows
-      throw new UnsupportedOperationException("Not implemented yet.");
+      if (getTablePath() == null) {
+        throw new IllegalArgumentException("Table path must be set.");
+      }
+
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, getTablePath());
+
+      try {
+        Snapshot snapshot = buildSnapshot(engine, table);
+        Scan scan = snapshot.getScanBuilder(engine).build();
+        StructType readSchema = scan.getSchema(engine);
+        org.apache.beam.sdk.schemas.Schema beamSchema = 
inferBeamSchema(readSchema);
+
+        List<DeltaFileDescriptor> fileDescriptors = 
buildFileDescriptors(engine, scan);
+
+        return input
+            .apply("CreateFileDescriptors", Create.of(fileDescriptors)
+                .withCoder(SerializableCoder.of(DeltaFileDescriptor.class)))
+            .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema)))

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The Delta table scan is planned and the file list is built during pipeline 
construction (inside `expand`). This is a major architectural issue because the 
list of files is frozen at construction time, meaning any updates to the Delta 
table between pipeline construction and execution will be ignored. 
Additionally, the client machine submitting the pipeline must have access to 
the Delta table and enough memory to plan the scan. Consider deferring scan 
planning to runtime (e.g., inside a `DoFn` or using a Splittable `DoFn`).



##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java:
##########
@@ -83,9 +126,351 @@ public ReadRows withConfig(Map<String, String> config) {
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      // TODO(https://github.com/apache/beam/issues/38551): Implement 
expansion for
-      // Delta Lake ReadRows
-      throw new UnsupportedOperationException("Not implemented yet.");
+      if (getTablePath() == null) {
+        throw new IllegalArgumentException("Table path must be set.");
+      }
+
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, getTablePath());
+
+      try {
+        Snapshot snapshot = buildSnapshot(engine, table);
+        Scan scan = snapshot.getScanBuilder(engine).build();
+        StructType readSchema = scan.getSchema(engine);
+        org.apache.beam.sdk.schemas.Schema beamSchema = 
inferBeamSchema(readSchema);
+
+        List<DeltaFileDescriptor> fileDescriptors = 
buildFileDescriptors(engine, scan);
+
+        return input
+            .apply("CreateFileDescriptors", Create.of(fileDescriptors)
+                .withCoder(SerializableCoder.of(DeltaFileDescriptor.class)))
+            .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema)))
+            .setRowSchema(beamSchema);
+
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to read Delta table: " + 
getTablePath(), e);
+      }
+    }
+
+    private Snapshot buildSnapshot(Engine engine, Table table) throws 
Exception {
+      if (getVersion() != null) {
+        return table.getSnapshotAsOfVersion(engine, getVersion());
+      } else if (getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(getTimestamp()).getMillis();
+        return table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        return table.getLatestSnapshot(engine);
+      }
+    }
+
+    private List<DeltaFileDescriptor> buildFileDescriptors(Engine engine, Scan 
scan) throws Exception {
+      List<DeltaFileDescriptor> descriptors = new ArrayList<>();
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              descriptors.add(new DeltaFileDescriptor(
+                  getTablePath(),
+                  fileStatus.getPath(),
+                  fileStatus.getSize(),
+                  fileStatus.getModificationTime(),
+                  getHadoopConfig(),
+                  getVersion(),
+                  getTimestamp()
+              ));
+            }
+          }
+        }
+      }
+      return descriptors;
+    }
+  }
+
+  public static class DeltaFileDescriptor implements Serializable {
+    private final String tablePath;
+    private final String filePath;
+    private final long fileSize;
+    private final long modificationTime;
+    private final @Nullable Map<String, String> hadoopConfig;
+    private final @Nullable Long version;
+    private final @Nullable String timestamp;
+
+    public DeltaFileDescriptor(
+        String tablePath,
+        String filePath,
+        long fileSize,
+        long modificationTime,
+        @Nullable Map<String, String> hadoopConfig,
+        @Nullable Long version,
+        @Nullable String timestamp) {
+      this.tablePath = tablePath;
+      this.filePath = filePath;
+      this.fileSize = fileSize;
+      this.modificationTime = modificationTime;
+      this.hadoopConfig = hadoopConfig;
+      this.version = version;
+      this.timestamp = timestamp;
+    }
+
+    public String getTablePath() {
+      return tablePath;
+    }
+
+    public String getFilePath() {
+      return filePath;
+    }
+
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    public long getModificationTime() {
+      return modificationTime;
+    }
+
+    public @Nullable Map<String, String> getHadoopConfig() {
+      return hadoopConfig;
+    }
+
+    public @Nullable Long getVersion() {
+      return version;
+    }
+
+    public @Nullable String getTimestamp() {
+      return timestamp;
+    }
+  }
+
+  public static class ReadFileFn extends DoFn<DeltaFileDescriptor, Row> {
+    private final org.apache.beam.sdk.schemas.Schema beamSchema;
+
+    public ReadFileFn(org.apache.beam.sdk.schemas.Schema beamSchema) {
+      this.beamSchema = beamSchema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      DeltaFileDescriptor desc = c.element();
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(desc.getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, desc.getTablePath());
+
+      Snapshot snapshot;
+      if (desc.getVersion() != null) {
+        snapshot = table.getSnapshotAsOfVersion(engine, desc.getVersion());
+      } else if (desc.getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(desc.getTimestamp()).getMillis();
+        snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        snapshot = table.getLatestSnapshot(engine);
+      }
+
+      Scan scan = snapshot.getScanBuilder(engine).build();
+      io.delta.kernel.data.Row scanState = scan.getScanState(engine);
+
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              if (fileStatus.getPath().equals(desc.getFilePath())) {
+                StructType physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(scanState);
+                CloseableIterator<ColumnarBatch> physicalDataIter =
+                    engine.getParquetHandler().readParquetFiles(
+                        singletonCloseableIterator(fileStatus),
+                        physicalReadSchema,
+                        Optional.empty()).map(FilteredColumnarBatch::getData);
+                try (
+                    CloseableIterator<FilteredColumnarBatch> transformedData =
+                        Scan.transformPhysicalData(
+                            engine,
+                            scanState,
+                            scanFileRow,
+                            physicalDataIter)) {
+                  while (transformedData.hasNext()) {
+                    FilteredColumnarBatch filteredData = 
transformedData.next();
+                    try (CloseableIterator<io.delta.kernel.data.Row> rows = 
filteredData.getRows()) {
+                      while (rows.hasNext()) {
+                        io.delta.kernel.data.Row row = rows.next();
+                        c.output(convertKernelRowToBeamRow(row, beamSchema));
+                      }
+                    }
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(
+      @Nullable Map<String, String> configMap) {
+    org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
+    if (configMap != null) {
+      for (Map.Entry<String, String> entry : configMap.entrySet()) {
+        config.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return config;
+  }
+
+  private static org.apache.beam.sdk.schemas.Schema inferBeamSchema(StructType 
structType) {
+    org.apache.beam.sdk.schemas.Schema.Builder builder = 
org.apache.beam.sdk.schemas.Schema.builder();
+    for (StructField field : structType.fields()) {
+      builder.addField(field.getName(), toBeamFieldType(field.getDataType()));
+    }
+    return builder.build();
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `inferBeamSchema` method does not preserve the nullability of the Delta 
fields. It is safer and more correct to explicitly set the nullability of each 
Beam field based on `field.isNullable()`.
   
   ```suggestion
     private static org.apache.beam.sdk.schemas.Schema 
inferBeamSchema(StructType structType) {
       org.apache.beam.sdk.schemas.Schema.Builder builder = 
org.apache.beam.sdk.schemas.Schema.builder();
       for (StructField field : structType.fields()) {
         
builder.addField(org.apache.beam.sdk.schemas.Schema.Field.of(field.getName(), 
toBeamFieldType(field.getDataType()))
             .withNullable(field.isNullable()));
       }
       return builder.build();
     }
   ```



##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java:
##########
@@ -83,9 +126,351 @@ public ReadRows withConfig(Map<String, String> config) {
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      // TODO(https://github.com/apache/beam/issues/38551): Implement 
expansion for
-      // Delta Lake ReadRows
-      throw new UnsupportedOperationException("Not implemented yet.");
+      if (getTablePath() == null) {
+        throw new IllegalArgumentException("Table path must be set.");
+      }
+
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, getTablePath());
+
+      try {
+        Snapshot snapshot = buildSnapshot(engine, table);
+        Scan scan = snapshot.getScanBuilder(engine).build();
+        StructType readSchema = scan.getSchema(engine);
+        org.apache.beam.sdk.schemas.Schema beamSchema = 
inferBeamSchema(readSchema);
+
+        List<DeltaFileDescriptor> fileDescriptors = 
buildFileDescriptors(engine, scan);
+
+        return input
+            .apply("CreateFileDescriptors", Create.of(fileDescriptors)
+                .withCoder(SerializableCoder.of(DeltaFileDescriptor.class)))
+            .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema)))
+            .setRowSchema(beamSchema);
+
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to read Delta table: " + 
getTablePath(), e);
+      }
+    }
+
+    private Snapshot buildSnapshot(Engine engine, Table table) throws 
Exception {
+      if (getVersion() != null) {
+        return table.getSnapshotAsOfVersion(engine, getVersion());
+      } else if (getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(getTimestamp()).getMillis();
+        return table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        return table.getLatestSnapshot(engine);
+      }
+    }
+
+    private List<DeltaFileDescriptor> buildFileDescriptors(Engine engine, Scan 
scan) throws Exception {
+      List<DeltaFileDescriptor> descriptors = new ArrayList<>();
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              descriptors.add(new DeltaFileDescriptor(
+                  getTablePath(),
+                  fileStatus.getPath(),
+                  fileStatus.getSize(),
+                  fileStatus.getModificationTime(),
+                  getHadoopConfig(),
+                  getVersion(),
+                  getTimestamp()
+              ));
+            }
+          }
+        }
+      }
+      return descriptors;
+    }
+  }
+
+  public static class DeltaFileDescriptor implements Serializable {
+    private final String tablePath;
+    private final String filePath;
+    private final long fileSize;
+    private final long modificationTime;
+    private final @Nullable Map<String, String> hadoopConfig;
+    private final @Nullable Long version;
+    private final @Nullable String timestamp;
+
+    public DeltaFileDescriptor(
+        String tablePath,
+        String filePath,
+        long fileSize,
+        long modificationTime,
+        @Nullable Map<String, String> hadoopConfig,
+        @Nullable Long version,
+        @Nullable String timestamp) {
+      this.tablePath = tablePath;
+      this.filePath = filePath;
+      this.fileSize = fileSize;
+      this.modificationTime = modificationTime;
+      this.hadoopConfig = hadoopConfig;
+      this.version = version;
+      this.timestamp = timestamp;
+    }
+
+    public String getTablePath() {
+      return tablePath;
+    }
+
+    public String getFilePath() {
+      return filePath;
+    }
+
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    public long getModificationTime() {
+      return modificationTime;
+    }
+
+    public @Nullable Map<String, String> getHadoopConfig() {
+      return hadoopConfig;
+    }
+
+    public @Nullable Long getVersion() {
+      return version;
+    }
+
+    public @Nullable String getTimestamp() {
+      return timestamp;
+    }
+  }
+
+  public static class ReadFileFn extends DoFn<DeltaFileDescriptor, Row> {
+    private final org.apache.beam.sdk.schemas.Schema beamSchema;
+
+    public ReadFileFn(org.apache.beam.sdk.schemas.Schema beamSchema) {
+      this.beamSchema = beamSchema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      DeltaFileDescriptor desc = c.element();
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(desc.getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, desc.getTablePath());
+
+      Snapshot snapshot;
+      if (desc.getVersion() != null) {
+        snapshot = table.getSnapshotAsOfVersion(engine, desc.getVersion());
+      } else if (desc.getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(desc.getTimestamp()).getMillis();
+        snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        snapshot = table.getLatestSnapshot(engine);
+      }
+
+      Scan scan = snapshot.getScanBuilder(engine).build();
+      io.delta.kernel.data.Row scanState = scan.getScanState(engine);
+
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              if (fileStatus.getPath().equals(desc.getFilePath())) {
+                StructType physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(scanState);
+                CloseableIterator<ColumnarBatch> physicalDataIter =
+                    engine.getParquetHandler().readParquetFiles(
+                        singletonCloseableIterator(fileStatus),
+                        physicalReadSchema,
+                        Optional.empty()).map(FilteredColumnarBatch::getData);
+                try (
+                    CloseableIterator<FilteredColumnarBatch> transformedData =
+                        Scan.transformPhysicalData(
+                            engine,
+                            scanState,
+                            scanFileRow,
+                            physicalDataIter)) {
+                  while (transformedData.hasNext()) {
+                    FilteredColumnarBatch filteredData = 
transformedData.next();
+                    try (CloseableIterator<io.delta.kernel.data.Row> rows = 
filteredData.getRows()) {
+                      while (rows.hasNext()) {
+                        io.delta.kernel.data.Row row = rows.next();
+                        c.output(convertKernelRowToBeamRow(row, beamSchema));
+                      }
+                    }
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(
+      @Nullable Map<String, String> configMap) {
+    org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
+    if (configMap != null) {
+      for (Map.Entry<String, String> entry : configMap.entrySet()) {
+        config.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return config;
+  }
+
+  private static org.apache.beam.sdk.schemas.Schema inferBeamSchema(StructType 
structType) {
+    org.apache.beam.sdk.schemas.Schema.Builder builder = 
org.apache.beam.sdk.schemas.Schema.builder();
+    for (StructField field : structType.fields()) {
+      builder.addField(field.getName(), toBeamFieldType(field.getDataType()));
+    }
+    return builder.build();
+  }
+
+  private static org.apache.beam.sdk.schemas.Schema.FieldType 
toBeamFieldType(DataType dataType) {
+    if (dataType instanceof IntegerType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+    } else if (dataType instanceof LongType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+    } else if (dataType instanceof StringType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+    } else if (dataType instanceof DoubleType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE;
+    } else if (dataType instanceof FloatType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT;
+    } else if (dataType instanceof BooleanType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
+    } else if (dataType instanceof ShortType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.INT16;
+    } else if (dataType instanceof ByteType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.BYTE;
+    } else if (dataType instanceof BinaryType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.BYTES;
+    } else if (dataType instanceof DecimalType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.DECIMAL;
+    } else if (dataType instanceof TimestampType || dataType instanceof 
DateType) {
+      return org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
+    } else if (dataType instanceof StructType) {
+      return 
org.apache.beam.sdk.schemas.Schema.FieldType.row(inferBeamSchema((StructType) 
dataType));
+    } else if (dataType instanceof ArrayType) {
+      return 
org.apache.beam.sdk.schemas.Schema.FieldType.array(toBeamFieldType(((ArrayType) 
dataType).getElementType()));
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      return org.apache.beam.sdk.schemas.Schema.FieldType.map(
+          toBeamFieldType(mapType.getKeyType()),
+          toBeamFieldType(mapType.getValueType()));
+    } else {
+      throw new IllegalArgumentException("Unsupported Delta type: " + 
dataType);
+    }
+  }
+
+  private static Row convertKernelRowToBeamRow(
+      io.delta.kernel.data.Row deltaRow, org.apache.beam.sdk.schemas.Schema 
beamSchema) {
+    List<Object> values = new ArrayList<>();
+    StructType structType = deltaRow.getSchema();
+    for (int i = 0; i < structType.length(); i++) {
+      if (deltaRow.isNullAt(i)) {
+        values.add(null);
+      } else {
+        DataType dataType = structType.at(i).getDataType();
+        values.add(convertValue(deltaRow, i, dataType));
+      }
+    }
+    return Row.withSchema(beamSchema).addValues(values).build();
+  }
+
+  private static Object convertValue(io.delta.kernel.data.Row deltaRow, int 
ordinal, DataType dataType) {
+    if (deltaRow.isNullAt(ordinal)) {
+      return null;
+    }
+    if (dataType instanceof IntegerType) {
+      return deltaRow.getInt(ordinal);
+    } else if (dataType instanceof LongType) {
+      return deltaRow.getLong(ordinal);
+    } else if (dataType instanceof StringType) {
+      return deltaRow.getString(ordinal);
+    } else if (dataType instanceof DoubleType) {
+      return deltaRow.getDouble(ordinal);
+    } else if (dataType instanceof FloatType) {
+      return deltaRow.getFloat(ordinal);
+    } else if (dataType instanceof BooleanType) {
+      return deltaRow.getBoolean(ordinal);
+    } else if (dataType instanceof ShortType) {
+      return deltaRow.getShort(ordinal);
+    } else if (dataType instanceof ByteType) {
+      return deltaRow.getByte(ordinal);
+    } else if (dataType instanceof BinaryType) {
+      return deltaRow.getBinary(ordinal);
+    } else if (dataType instanceof DecimalType) {
+      return deltaRow.getDecimal(ordinal);
+    } else if (dataType instanceof TimestampType) {
+      long micros = deltaRow.getLong(ordinal);
+      return org.joda.time.Instant.ofEpochMilli(micros / 1000);
+    } else if (dataType instanceof DateType) {
+      int days = deltaRow.getInt(ordinal);
+      return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000);
+    } else if (dataType instanceof StructType) {
+      io.delta.kernel.data.Row structRow = deltaRow.getStruct(ordinal);
+      return convertKernelRowToBeamRow(structRow, inferBeamSchema((StructType) 
dataType));
+    } else if (dataType instanceof ArrayType) {
+      ArrayValue arrayVal = deltaRow.getArray(ordinal);
+      int size = arrayVal.getSize();
+      List<Object> list = new ArrayList<>(size);
+      DataType elemType = ((ArrayType) dataType).getElementType();
+      ColumnVector vec = arrayVal.getElements();
+      for (int j = 0; j < size; j++) {
+        if (vec.isNullAt(j)) {
+          list.add(null);
+        } else {
+          list.add(convertVectorValue(vec, j, elemType));
+        }
+      }
+      return list;
+    } else if (dataType instanceof MapType) {
+      MapValue mapVal = deltaRow.getMap(ordinal);
+      int size = mapVal.getSize();
+      Map<Object, Object> map = new HashMap<>(size);
+      DataType keyType = ((MapType) dataType).getKeyType();
+      DataType valueType = ((MapType) dataType).getValueType();
+      ColumnVector keysVec = mapVal.getKeys();
+      ColumnVector valuesVec = mapVal.getValues();
+      for (int j = 0; j < size; j++) {
+        Object key = convertVectorValue(keysVec, j, keyType);
+        Object val = valuesVec.isNullAt(j) ? null : 
convertVectorValue(valuesVec, j, valueType);
+        map.put(key, val);
+      }
+      return map;
+    } else {
+      return deltaRow.toString();
+    }
+  }
+
+  private static Object convertVectorValue(ColumnVector vec, int rowId, 
DataType dataType) {
+    if (dataType instanceof IntegerType) {
+      return vec.getInt(rowId);
+    } else if (dataType instanceof LongType) {
+      return vec.getLong(rowId);
+    } else if (dataType instanceof StringType) {
+      return vec.getString(rowId);
+    } else if (dataType instanceof DoubleType) {
+      return vec.getDouble(rowId);
+    } else if (dataType instanceof FloatType) {
+      return vec.getFloat(rowId);
+    } else if (dataType instanceof BooleanType) {
+      return vec.getBoolean(rowId);
+    } else if (dataType instanceof ShortType) {
+      return vec.getShort(rowId);
+    } else if (dataType instanceof ByteType) {
+      return vec.getByte(rowId);
+    } else if (dataType instanceof BinaryType) {
+      return vec.getBinary(rowId);
+    } else if (dataType instanceof DecimalType) {
+      return vec.getDecimal(rowId);
+    } else {
+      return vec.toString();
     }
   }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The current implementation of `convertVectorValue` is missing support for 
several key data types, including `TimestampType`, `DateType`, `StructType`, 
`ArrayType`, and `MapType`. If any of these types are nested within an array or 
map, the conversion will fall back to `vec.toString()`, causing correctness 
issues. Additionally, `convertValue` re-infers the Beam schema for nested 
structs on every single row, which is a major performance bottleneck. We can 
solve both issues by passing the corresponding Beam `FieldType` down through 
the conversion methods and implementing the missing types in 
`convertVectorValue`.
   
   ```java
     private static Row convertKernelRowToBeamRow(
         io.delta.kernel.data.Row deltaRow, org.apache.beam.sdk.schemas.Schema 
beamSchema) {
       StructType structType = deltaRow.getSchema();
       List<Object> values = new ArrayList<>(structType.length());
       for (int i = 0; i < structType.length(); i++) {
         if (deltaRow.isNullAt(i)) {
           values.add(null);
         } else {
           DataType dataType = structType.at(i).getDataType();
           org.apache.beam.sdk.schemas.Schema.FieldType beamFieldType = 
beamSchema.getField(i).getType();
           values.add(convertValue(deltaRow, i, dataType, beamFieldType));
         }
       }
       return Row.withSchema(beamSchema).addValues(values).build();
     }
   
     private static Object convertValue(
         io.delta.kernel.data.Row deltaRow,
         int ordinal,
         DataType dataType,
         org.apache.beam.sdk.schemas.Schema.FieldType beamFieldType) {
       if (deltaRow.isNullAt(ordinal)) {
         return null;
       }
       if (dataType instanceof IntegerType) {
         return deltaRow.getInt(ordinal);
       } else if (dataType instanceof LongType) {
         return deltaRow.getLong(ordinal);
       } else if (dataType instanceof StringType) {
         return deltaRow.getString(ordinal);
       } else if (dataType instanceof DoubleType) {
         return deltaRow.getDouble(ordinal);
       } else if (dataType instanceof FloatType) {
         return deltaRow.getFloat(ordinal);
       } else if (dataType instanceof BooleanType) {
         return deltaRow.getBoolean(ordinal);
       } else if (dataType instanceof ShortType) {
         return deltaRow.getShort(ordinal);
       } else if (dataType instanceof ByteType) {
         return deltaRow.getByte(ordinal);
       } else if (dataType instanceof BinaryType) {
         return deltaRow.getBinary(ordinal);
       } else if (dataType instanceof DecimalType) {
         return deltaRow.getDecimal(ordinal);
       } else if (dataType instanceof TimestampType) {
         long micros = deltaRow.getLong(ordinal);
         return org.joda.time.Instant.ofEpochMilli(micros / 1000);
       } else if (dataType instanceof DateType) {
         int days = deltaRow.getInt(ordinal);
         return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000);
       } else if (dataType instanceof StructType) {
         io.delta.kernel.data.Row structRow = deltaRow.getStruct(ordinal);
         return convertKernelRowToBeamRow(structRow, 
beamFieldType.getRowSchema());
       } else if (dataType instanceof ArrayType) {
         ArrayValue arrayVal = deltaRow.getArray(ordinal);
         int size = arrayVal.getSize();
         List<Object> list = new ArrayList<>(size);
         DataType elemType = ((ArrayType) dataType).getElementType();
         org.apache.beam.sdk.schemas.Schema.FieldType elemFieldType = 
beamFieldType.getCollectionElementType();
         ColumnVector vec = arrayVal.getElements();
         for (int j = 0; j < size; j++) {
           list.add(convertVectorValue(vec, j, elemType, elemFieldType));
         }
         return list;
       } else if (dataType instanceof MapType) {
         MapValue mapVal = deltaRow.getMap(ordinal);
         int size = mapVal.getSize();
         Map<Object, Object> map = new HashMap<>(size);
         DataType keyType = ((MapType) dataType).getKeyType();
         DataType valueType = ((MapType) dataType).getValueType();
         org.apache.beam.sdk.schemas.Schema.FieldType keyFieldType = 
beamFieldType.getMapKeyType();
         org.apache.beam.sdk.schemas.Schema.FieldType valueFieldType = 
beamFieldType.getMapValueType();
         ColumnVector keysVec = mapVal.getKeys();
         ColumnVector valuesVec = mapVal.getValues();
         for (int j = 0; j < size; j++) {
           Object key = convertVectorValue(keysVec, j, keyType, keyFieldType);
           Object val = convertVectorValue(valuesVec, j, valueType, 
valueFieldType);
           map.put(key, val);
         }
         return map;
       } else {
         return deltaRow.toString();
       }
     }
   
     private static Object convertVectorValue(
         ColumnVector vec,
         int rowId,
         DataType dataType,
         org.apache.beam.sdk.schemas.Schema.FieldType beamFieldType) {
       if (vec.isNullAt(rowId)) {
         return null;
       }
       if (dataType instanceof IntegerType) {
         return vec.getInt(rowId);
       } else if (dataType instanceof LongType) {
         return vec.getLong(rowId);
       } else if (dataType instanceof StringType) { 
         return vec.getString(rowId);
       } else if (dataType instanceof DoubleType) {
         return vec.getDouble(rowId);
       } else if (dataType instanceof FloatType) {
         return vec.getFloat(rowId);
       } else if (dataType instanceof BooleanType) {
         return vec.getBoolean(rowId);
       } else if (dataType instanceof ShortType) {
         return vec.getShort(rowId);
       } else if (dataType instanceof ByteType) {
         return vec.getByte(rowId);
       } else if (dataType instanceof BinaryType) {
         return vec.getBinary(rowId);
       } else if (dataType instanceof DecimalType) {
         return vec.getDecimal(rowId);
       } else if (dataType instanceof TimestampType) {
         long micros = vec.getLong(rowId);
         return org.joda.time.Instant.ofEpochMilli(micros / 1000);
       } else if (dataType instanceof DateType) {
         int days = vec.getInt(rowId);
         return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000);
       } else if (dataType instanceof StructType) {
         io.delta.kernel.data.Row structRow = vec.getStruct(rowId);
         return convertKernelRowToBeamRow(structRow, 
beamFieldType.getRowSchema());
       } else if (dataType instanceof ArrayType) {
         ArrayValue arrayVal = vec.getArray(rowId);
         int size = arrayVal.getSize();
         List<Object> list = new ArrayList<>(size);
         DataType elemType = ((ArrayType) dataType).getElementType();
         org.apache.beam.sdk.schemas.Schema.FieldType elemFieldType = 
beamFieldType.getCollectionElementType();
         ColumnVector elementsVec = arrayVal.getElements();
         for (int j = 0; j < size; j++) {
           list.add(convertVectorValue(elementsVec, j, elemType, 
elemFieldType));
         }
         return list;
       } else if (dataType instanceof MapType) {
         MapValue mapVal = vec.getMap(rowId);
         int size = mapVal.getSize();
         Map<Object, Object> map = new HashMap<>(size);
         DataType keyType = ((MapType) dataType).getKeyType();
         DataType valueType = ((MapType) dataType).getValueType();
         org.apache.beam.sdk.schemas.Schema.FieldType keyFieldType = 
beamFieldType.getMapKeyType();
         org.apache.beam.sdk.schemas.Schema.FieldType valueFieldType = 
beamFieldType.getMapValueType();
         ColumnVector keysVec = mapVal.getKeys();
         ColumnVector valuesVec = mapVal.getValues();
         for (int j = 0; j < size; j++) {
           Object key = convertVectorValue(keysVec, j, keyType, keyFieldType);
           Object val = convertVectorValue(valuesVec, j, valueType, 
valueFieldType);
           map.put(key, val);
         }
         return map;
       } else {
         return vec.toString();
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to