flyrain commented on code in PR #5248:
URL: https://github.com/apache/iceberg/pull/5248#discussion_r925946053


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java:
##########
@@ -60,35 +64,54 @@
  *
  * @param <T> is the Java class returned by this reader whose objects contain 
one or more rows.
  */
-abstract class BaseDataReader<T> implements Closeable {
+abstract class BaseDataReader<T, CST extends ContentScanTask<?>, G extends 
ScanTaskGroup<CST>>
+    implements Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseDataReader.class);
 
   private final Table table;
-  private final Iterator<FileScanTask> tasks;
+  private final Iterator<CST> tasks;
   private final Map<String, InputFile> inputFiles;
 
   private CloseableIterator<T> currentIterator;
   private T current = null;
-  private FileScanTask currentTask = null;
+  private CST currentTask = null;
 
-  BaseDataReader(Table table, CombinedScanTask task) {
+  BaseDataReader(Table table, G task) {
     this.table = table;
-    this.tasks = task.files().iterator();
+    this.tasks = task.tasks().iterator();
+    this.inputFiles = inputFiles(task);
+    this.currentIterator = CloseableIterator.empty();
+  }
+
+  private Map<String, InputFile> inputFiles(G task) {
     Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
-    task.files().stream()
-        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), 
fileScanTask.deletes().stream()))
+    Stream<ContentFile> dataFileStream = task.tasks().stream()
+        .flatMap(contentScanTask -> {
+          Stream<ContentFile> stream = Stream.of(contentScanTask.file());
+          if (contentScanTask.isFileScanTask()) {
+            stream = Stream.concat(stream, 
contentScanTask.asFileScanTask().deletes().stream());
+          } else if (contentScanTask instanceof AddedRowsScanTask) {
+            stream = Stream.concat(stream, ((AddedRowsScanTask) 
contentScanTask).deletes().stream());
+          } else if (contentScanTask instanceof DeletedDataFileScanTask) {
+            stream = Stream.concat(stream, ((DeletedDataFileScanTask) 
contentScanTask).existingDeletes().stream());
+          } else if (contentScanTask instanceof DeletedRowsScanTask) {
+            stream = Stream.concat(stream, ((DeletedRowsScanTask) 
contentScanTask).addedDeletes().stream());
+            stream = Stream.concat(stream, ((DeletedRowsScanTask) 
contentScanTask).existingDeletes().stream());
+          }

Review Comment:
   I'm OK with these solutions. Adding them to ScanTask also makes sense.  
   For `inputFile` map in the `BaseDataReader`, we don't actually need to know 
if it is a `DataFile` or `DeleteFile`. But to avoid `?` in the public APIs, we 
still need two methods, one for `DataFile` and another for `DeleteFile`. Just 
wondering if we can remove the type parameter in `interface ContentFile<F>` so 
that we can use `ContentFile` in this use case. 
   Here is another use case to unify `DataFile` and `DeleteFile`, 
https://github.com/apache/iceberg/pull/4142#pullrequestreview-1042227343.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to