szehon-ho commented on a change in pull request #3400: URL: https://github.com/apache/iceberg/pull/3400#discussion_r739592859
########## File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java ########## @@ -104,30 +144,100 @@ } } - if (splitSize != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); + for (Expression filter : filterExpressions()) { + scan = scan.filter(filter); } - if (splitLookback != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); + try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) { + this.files = Lists.newArrayList(filesIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); } + } - if (splitOpenFileCost != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); - } + return files; + } - for (Expression filter : filterExpressions()) { - scan = scan.filter(filter); + @Override + protected List<CombinedScanTask> tasks() { + if (tasks == null) { + CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(files()), + splitSize); + CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks( + splitFiles, splitSize, + splitLookback, splitOpenFileCost); + tasks = Lists.newArrayList(scanTasks); + } + + return tasks; + } + + @Override + public NamedReference[] filterAttributes() { + Set<Integer> partitionFieldSourceIds = Sets.newHashSet(); + + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + for (PartitionField field : spec.fields()) { + partitionFieldSourceIds.add(field.sourceId()); } + } - try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + Map<Integer, String> nameById = TypeUtil.indexQuotedNameById(table().schema().asStruct()); Review comment: So I was more asking about table().schema().asStruct(), that's only the current schema, and will have the wrong map for this case? As we are looking through all historical partition-spec, some of which may have been in time when partition field had a different name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org