flyrain commented on a change in pull request #3400: URL: https://github.com/apache/iceberg/pull/3400#discussion_r739445342
########## File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java ########## @@ -104,30 +144,100 @@ } } - if (splitSize != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); + for (Expression filter : filterExpressions()) { + scan = scan.filter(filter); } - if (splitLookback != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); + try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) { + this.files = Lists.newArrayList(filesIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); } + } - if (splitOpenFileCost != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); - } + return files; + } - for (Expression filter : filterExpressions()) { - scan = scan.filter(filter); + @Override + protected List<CombinedScanTask> tasks() { + if (tasks == null) { + CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(files()), + splitSize); + CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks( + splitFiles, splitSize, + splitLookback, splitOpenFileCost); + tasks = Lists.newArrayList(scanTasks); + } + + return tasks; + } + + @Override + public NamedReference[] filterAttributes() { + Set<Integer> partitionFieldSourceIds = Sets.newHashSet(); + + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + for (PartitionField field : spec.fields()) { + partitionFieldSourceIds.add(field.sourceId()); } + } - try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + Map<Integer, String> nameById = TypeUtil.indexQuotedNameById(table().schema().asStruct()); + return partitionFieldSourceIds.stream() + .filter(fieldId -> expectedSchema().findField(fieldId) != null) + .map(fieldId -> org.apache.spark.sql.connector.expressions.Expressions.column(nameById.get(fieldId))) + .toArray(NamedReference[]::new); + } + + @Override + public void filter(Filter[] filters) { + Expression runtimeFilterExpr = Expressions.alwaysTrue(); + + for (Filter filter : filters) { + Expression expr = SparkFilters.convert(filter); + if (expr != null) { + try { + Binder.bind(expectedSchema().asStruct(), expr, caseSensitive()); + runtimeFilterExpr = Expressions.and(runtimeFilterExpr, expr); Review comment: Can we use the expression returned by `Binder.bind()` as the input of this line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org