durgaprasadml commented on code in PR #38750:
URL: https://github.com/apache/beam/pull/38750#discussion_r3328574401


##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java:
##########
@@ -83,9 +126,351 @@ public ReadRows withConfig(Map<String, String> config) {
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      // TODO(https://github.com/apache/beam/issues/38551): Implement 
expansion for
-      // Delta Lake ReadRows
-      throw new UnsupportedOperationException("Not implemented yet.");
+      if (getTablePath() == null) {
+        throw new IllegalArgumentException("Table path must be set.");
+      }
+
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, getTablePath());
+
+      try {
+        Snapshot snapshot = buildSnapshot(engine, table);
+        Scan scan = snapshot.getScanBuilder(engine).build();
+        StructType readSchema = scan.getSchema(engine);
+        org.apache.beam.sdk.schemas.Schema beamSchema = 
inferBeamSchema(readSchema);
+
+        List<DeltaFileDescriptor> fileDescriptors = 
buildFileDescriptors(engine, scan);
+
+        return input
+            .apply("CreateFileDescriptors", Create.of(fileDescriptors)
+                .withCoder(SerializableCoder.of(DeltaFileDescriptor.class)))
+            .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema)))
+            .setRowSchema(beamSchema);
+
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to read Delta table: " + 
getTablePath(), e);
+      }
+    }
+
+    private Snapshot buildSnapshot(Engine engine, Table table) throws 
Exception {
+      if (getVersion() != null) {
+        return table.getSnapshotAsOfVersion(engine, getVersion());
+      } else if (getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(getTimestamp()).getMillis();
+        return table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        return table.getLatestSnapshot(engine);
+      }
+    }
+
+    private List<DeltaFileDescriptor> buildFileDescriptors(Engine engine, Scan 
scan) throws Exception {
+      List<DeltaFileDescriptor> descriptors = new ArrayList<>();
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              descriptors.add(new DeltaFileDescriptor(
+                  getTablePath(),
+                  fileStatus.getPath(),
+                  fileStatus.getSize(),
+                  fileStatus.getModificationTime(),
+                  getHadoopConfig(),
+                  getVersion(),
+                  getTimestamp()
+              ));
+            }
+          }
+        }
+      }
+      return descriptors;
+    }
+  }
+
+  public static class DeltaFileDescriptor implements Serializable {
+    private final String tablePath;
+    private final String filePath;
+    private final long fileSize;
+    private final long modificationTime;
+    private final @Nullable Map<String, String> hadoopConfig;
+    private final @Nullable Long version;
+    private final @Nullable String timestamp;
+
+    public DeltaFileDescriptor(
+        String tablePath,
+        String filePath,
+        long fileSize,
+        long modificationTime,
+        @Nullable Map<String, String> hadoopConfig,
+        @Nullable Long version,
+        @Nullable String timestamp) {
+      this.tablePath = tablePath;
+      this.filePath = filePath;
+      this.fileSize = fileSize;
+      this.modificationTime = modificationTime;
+      this.hadoopConfig = hadoopConfig;
+      this.version = version;
+      this.timestamp = timestamp;
+    }
+
+    public String getTablePath() {
+      return tablePath;
+    }
+
+    public String getFilePath() {
+      return filePath;
+    }
+
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    public long getModificationTime() {
+      return modificationTime;
+    }
+
+    public @Nullable Map<String, String> getHadoopConfig() {
+      return hadoopConfig;
+    }
+
+    public @Nullable Long getVersion() {
+      return version;
+    }
+
+    public @Nullable String getTimestamp() {
+      return timestamp;
+    }
+  }
+
+  public static class ReadFileFn extends DoFn<DeltaFileDescriptor, Row> {
+    private final org.apache.beam.sdk.schemas.Schema beamSchema;
+
+    public ReadFileFn(org.apache.beam.sdk.schemas.Schema beamSchema) {
+      this.beamSchema = beamSchema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      DeltaFileDescriptor desc = c.element();
+      org.apache.hadoop.conf.Configuration hadoopConfig = 
getHadoopConfiguration(desc.getHadoopConfig());
+      Engine engine = DefaultEngine.create(hadoopConfig);
+      Table table = Table.forPath(engine, desc.getTablePath());
+
+      Snapshot snapshot;
+      if (desc.getVersion() != null) {
+        snapshot = table.getSnapshotAsOfVersion(engine, desc.getVersion());
+      } else if (desc.getTimestamp() != null) {
+        long epochMillis = 
org.joda.time.Instant.parse(desc.getTimestamp()).getMillis();
+        snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis);
+      } else {
+        snapshot = table.getLatestSnapshot(engine);
+      }
+
+      Scan scan = snapshot.getScanBuilder(engine).build();
+      io.delta.kernel.data.Row scanState = scan.getScanState(engine);
+
+      try (CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(engine)) {
+        while (scanFileIter.hasNext()) {
+          FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+          try (CloseableIterator<io.delta.kernel.data.Row> scanFileRows = 
scanFilesBatch.getRows()) {
+            while (scanFileRows.hasNext()) {
+              io.delta.kernel.data.Row scanFileRow = scanFileRows.next();
+              FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+              if (fileStatus.getPath().equals(desc.getFilePath())) {
+                StructType physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(scanState);
+                CloseableIterator<ColumnarBatch> physicalDataIter =
+                    engine.getParquetHandler().readParquetFiles(
+                        singletonCloseableIterator(fileStatus),
+                        physicalReadSchema,
+                        Optional.empty()).map(FilteredColumnarBatch::getData);
+                try (
+                    CloseableIterator<FilteredColumnarBatch> transformedData =
+                        Scan.transformPhysicalData(
+                            engine,
+                            scanState,
+                            scanFileRow,
+                            physicalDataIter)) {
+                  while (transformedData.hasNext()) {
+                    FilteredColumnarBatch filteredData = 
transformedData.next();
+                    try (CloseableIterator<io.delta.kernel.data.Row> rows = 
filteredData.getRows()) {
+                      while (rows.hasNext()) {
+                        io.delta.kernel.data.Row row = rows.next();
+                        c.output(convertKernelRowToBeamRow(row, beamSchema));
+                      }
+                    }
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(
+      @Nullable Map<String, String> configMap) {
+    org.apache.hadoop.conf.Configuration config = new 
org.apache.hadoop.conf.Configuration();
+    if (configMap != null) {
+      for (Map.Entry<String, String> entry : configMap.entrySet()) {
+        config.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return config;
+  }
+
+  private static org.apache.beam.sdk.schemas.Schema inferBeamSchema(StructType 
structType) {
+    org.apache.beam.sdk.schemas.Schema.Builder builder = 
org.apache.beam.sdk.schemas.Schema.builder();
+    for (StructField field : structType.fields()) {
+      builder.addField(field.getName(), toBeamFieldType(field.getDataType()));
+    }
+    return builder.build();
+  }

Review Comment:
   Thanks for the detailed review and the thoughtful feedback.
   
   You’re absolutely right about the architectural concerns around planning the 
scan during pipeline construction and the O(N²) behavior caused by re-scanning 
the Delta log for every file descriptor. I’ll rework the implementation to move 
scan planning closer to runtime and avoid repeated scan enumeration by carrying 
the required metadata directly within the file descriptor/state.
   
   I’ll also address:
   - nested complex type handling in vector conversion
   - schema nullability preservation
   - avoiding repeated nested schema inference during row conversion
   
   I appreciate the detailed suggestions and example improvements — they’re 
very helpful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to