flyrain commented on code in PR #5248:
URL: https://github.com/apache/iceberg/pull/5248#discussion_r918178977
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java:
##########
@@ -60,35 +64,54 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
+abstract class BaseDataReader<T, CST extends ContentScanTask<?>, G extends
ScanTaskGroup<CST>>
+ implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(BaseDataReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
+ private final Iterator<CST> tasks;
private final Map<String, InputFile> inputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- private FileScanTask currentTask = null;
+ private CST currentTask = null;
- BaseDataReader(Table table, CombinedScanTask task) {
+ BaseDataReader(Table table, G task) {
this.table = table;
- this.tasks = task.files().iterator();
+ this.tasks = task.tasks().iterator();
+ this.inputFiles = inputFiles(task);
+ this.currentIterator = CloseableIterator.empty();
+ }
+
+ private Map<String, InputFile> inputFiles(G task) {
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
- task.files().stream()
- .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()),
fileScanTask.deletes().stream()))
+ Stream<ContentFile> dataFileStream = task.tasks().stream()
+ .flatMap(contentScanTask -> {
+ Stream<ContentFile> stream = Stream.of(contentScanTask.file());
+ if (contentScanTask.isFileScanTask()) {
+ stream = Stream.concat(stream,
contentScanTask.asFileScanTask().deletes().stream());
+ } else if (contentScanTask instanceof AddedRowsScanTask) {
+ stream = Stream.concat(stream, ((AddedRowsScanTask)
contentScanTask).deletes().stream());
+ } else if (contentScanTask instanceof DeletedDataFileScanTask) {
+ stream = Stream.concat(stream, ((DeletedDataFileScanTask)
contentScanTask).existingDeletes().stream());
+ } else if (contentScanTask instanceof DeletedRowsScanTask) {
+ stream = Stream.concat(stream, ((DeletedRowsScanTask)
contentScanTask).addedDeletes().stream());
+ stream = Stream.concat(stream, ((DeletedRowsScanTask)
contentScanTask).existingDeletes().stream());
+ }
Review Comment:
I'm not happy with this down casting. To eliminate it, we can have a method
returning all related content files in class `ContentScanTask`. For example,
1. The method returns data file as well as all delete files in
`FileScanTask`.
2. The method returns data file, `addedDeletes` files, `existingDeletes`
files and in `DeletedRowsScanTask`
We name the method as `allContentFiles()`, `relatedContentFiles()`, etc
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]