flyrain commented on a change in pull request #3400: URL: https://github.com/apache/iceberg/pull/3400#discussion_r739407786
########## File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java ########## @@ -104,30 +144,100 @@ } } - if (splitSize != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); + for (Expression filter : filterExpressions()) { + scan = scan.filter(filter); } - if (splitLookback != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); + try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) { + this.files = Lists.newArrayList(filesIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); } + } - if (splitOpenFileCost != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); - } + return files; + } - for (Expression filter : filterExpressions()) { - scan = scan.filter(filter); + @Override + protected List<CombinedScanTask> tasks() { + if (tasks == null) { + CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(files()), + splitSize); + CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks( + splitFiles, splitSize, + splitLookback, splitOpenFileCost); + tasks = Lists.newArrayList(scanTasks); + } + + return tasks; + } + + @Override + public NamedReference[] filterAttributes() { + Set<Integer> partitionFieldSourceIds = Sets.newHashSet(); + + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + for (PartitionField field : spec.fields()) { + partitionFieldSourceIds.add(field.sourceId()); } + } - try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + Map<Integer, String> nameById = TypeUtil.indexQuotedNameById(table().schema().asStruct()); + return partitionFieldSourceIds.stream() + .filter(fieldId -> expectedSchema().findField(fieldId) != null) + .map(fieldId -> org.apache.spark.sql.connector.expressions.Expressions.column(nameById.get(fieldId))) + .toArray(NamedReference[]::new); + } + + @Override + public void filter(Filter[] filters) { + Expression runtimeFilterExpr = Expressions.alwaysTrue(); + + for (Filter filter : filters) { + Expression expr = SparkFilters.convert(filter); + if (expr != null) { + try { + Binder.bind(expectedSchema().asStruct(), expr, caseSensitive()); + runtimeFilterExpr = Expressions.and(runtimeFilterExpr, expr); + } catch (ValidationException e) { + LOG.error("Failed to bind {} to expected schema, skipping runtime filter", expr, e); + } + } else { + LOG.warn("Unsupported runtime filter {}", filter); } } Review comment: A minor suggestion: may extract the code above into a method like `filtersToExpressions`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org