aokolnychyi commented on a change in pull request #3400: URL: https://github.com/apache/iceberg/pull/3400#discussion_r739427391
########## File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java ########## @@ -104,30 +144,100 @@ } } - if (splitSize != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); + for (Expression filter : filterExpressions()) { + scan = scan.filter(filter); } - if (splitLookback != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); + try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) { + this.files = Lists.newArrayList(filesIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); } + } - if (splitOpenFileCost != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); - } + return files; + } - for (Expression filter : filterExpressions()) { - scan = scan.filter(filter); + @Override + protected List<CombinedScanTask> tasks() { + if (tasks == null) { + CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(files()), + splitSize); + CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks( + splitFiles, splitSize, + splitLookback, splitOpenFileCost); + tasks = Lists.newArrayList(scanTasks); + } + + return tasks; + } + + @Override + public NamedReference[] filterAttributes() { + Set<Integer> partitionFieldSourceIds = Sets.newHashSet(); + + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + for (PartitionField field : spec.fields()) { + partitionFieldSourceIds.add(field.sourceId()); } + } - try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + Map<Integer, String> nameById = TypeUtil.indexQuotedNameById(table().schema().asStruct()); + return partitionFieldSourceIds.stream() + .filter(fieldId -> expectedSchema().findField(fieldId) != null) + .map(fieldId -> org.apache.spark.sql.connector.expressions.Expressions.column(nameById.get(fieldId))) + .toArray(NamedReference[]::new); + } + + @Override + public void filter(Filter[] filters) { + Expression runtimeFilterExpr = Expressions.alwaysTrue(); + + for (Filter filter : filters) { + Expression expr = SparkFilters.convert(filter); + if (expr != null) { + try { + Binder.bind(expectedSchema().asStruct(), expr, caseSensitive()); + runtimeFilterExpr = Expressions.and(runtimeFilterExpr, expr); + } catch (ValidationException e) { + LOG.error("Failed to bind {} to expected schema, skipping runtime filter", expr, e); + } + } else { + LOG.warn("Unsupported runtime filter {}", filter); Review comment: Since runtime filtering is an optimization that is optional, I'd say we better don't fail the query. ########## File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java ########## @@ -104,30 +144,100 @@ } } - if (splitSize != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); + for (Expression filter : filterExpressions()) { + scan = scan.filter(filter); } - if (splitLookback != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); + try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) { + this.files = Lists.newArrayList(filesIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); } + } - if (splitOpenFileCost != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); - } + return files; + } - for (Expression filter : filterExpressions()) { - scan = scan.filter(filter); + @Override + protected List<CombinedScanTask> tasks() { + if (tasks == null) { + CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(files()), + splitSize); + CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks( + splitFiles, splitSize, + splitLookback, splitOpenFileCost); + tasks = Lists.newArrayList(scanTasks); + } + + return tasks; + } + + @Override + public NamedReference[] filterAttributes() { + Set<Integer> partitionFieldSourceIds = Sets.newHashSet(); + + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + for (PartitionField field : spec.fields()) { + partitionFieldSourceIds.add(field.sourceId()); } + } - try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + Map<Integer, String> nameById = TypeUtil.indexQuotedNameById(table().schema().asStruct()); + return partitionFieldSourceIds.stream() + .filter(fieldId -> expectedSchema().findField(fieldId) != null) + .map(fieldId -> org.apache.spark.sql.connector.expressions.Expressions.column(nameById.get(fieldId))) + .toArray(NamedReference[]::new); + } + + @Override + public void filter(Filter[] filters) { + Expression runtimeFilterExpr = Expressions.alwaysTrue(); + + for (Filter filter : filters) { + Expression expr = SparkFilters.convert(filter); + if (expr != null) { + try { + Binder.bind(expectedSchema().asStruct(), expr, caseSensitive()); + runtimeFilterExpr = Expressions.and(runtimeFilterExpr, expr); + } catch (ValidationException e) { + LOG.error("Failed to bind {} to expected schema, skipping runtime filter", expr, e); + } + } else { + LOG.warn("Unsupported runtime filter {}", filter); } } - return tasks; + if (runtimeFilterExpr != Expressions.alwaysTrue()) { + Map<Integer, Evaluator> evaluatorsBySpecId = Maps.newHashMap(); + + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + Expression inclusiveExpr = Projections.inclusive(spec).project(runtimeFilterExpr); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + evaluatorsBySpecId.put(specId, inclusive); + } + + LOG.info("Trying to filter {} files using runtime filter {}", files().size(), runtimeFilterExpr); + + List<FileScanTask> filteredFiles = files().stream() + .filter(file -> { + Evaluator evaluator = evaluatorsBySpecId.get(file.spec().specId()); + return evaluator.eval(file.file().partition()); + }) + .collect(Collectors.toList()); + + LOG.info("{} files matched runtime filter {}", filteredFiles.size(), runtimeFilterExpr); Review comment: Fixed. ########## File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java ########## @@ -104,30 +144,100 @@ } } - if (splitSize != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); + for (Expression filter : filterExpressions()) { + scan = scan.filter(filter); } - if (splitLookback != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); + try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) { + this.files = Lists.newArrayList(filesIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); } + } - if (splitOpenFileCost != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); - } + return files; + } - for (Expression filter : filterExpressions()) { - scan = scan.filter(filter); + @Override + protected List<CombinedScanTask> tasks() { + if (tasks == null) { + CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(files()), + splitSize); + CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks( + splitFiles, splitSize, + splitLookback, splitOpenFileCost); + tasks = Lists.newArrayList(scanTasks); + } + + return tasks; + } + + @Override + public NamedReference[] filterAttributes() { + Set<Integer> partitionFieldSourceIds = Sets.newHashSet(); + + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + for (PartitionField field : spec.fields()) { + partitionFieldSourceIds.add(field.sourceId()); } + } - try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + Map<Integer, String> nameById = TypeUtil.indexQuotedNameById(table().schema().asStruct()); + return partitionFieldSourceIds.stream() + .filter(fieldId -> expectedSchema().findField(fieldId) != null) + .map(fieldId -> org.apache.spark.sql.connector.expressions.Expressions.column(nameById.get(fieldId))) + .toArray(NamedReference[]::new); + } + + @Override + public void filter(Filter[] filters) { + Expression runtimeFilterExpr = Expressions.alwaysTrue(); + + for (Filter filter : filters) { + Expression expr = SparkFilters.convert(filter); + if (expr != null) { + try { + Binder.bind(expectedSchema().asStruct(), expr, caseSensitive()); + runtimeFilterExpr = Expressions.and(runtimeFilterExpr, expr); + } catch (ValidationException e) { + LOG.error("Failed to bind {} to expected schema, skipping runtime filter", expr, e); + } + } else { + LOG.warn("Unsupported runtime filter {}", filter); } } - return tasks; + if (runtimeFilterExpr != Expressions.alwaysTrue()) { + Map<Integer, Evaluator> evaluatorsBySpecId = Maps.newHashMap(); + + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + Expression inclusiveExpr = Projections.inclusive(spec).project(runtimeFilterExpr); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + evaluatorsBySpecId.put(specId, inclusive); + } + + LOG.info("Trying to filter {} files using runtime filter {}", files().size(), runtimeFilterExpr); + + List<FileScanTask> filteredFiles = files().stream() + .filter(file -> { + Evaluator evaluator = evaluatorsBySpecId.get(file.spec().specId()); + return evaluator.eval(file.file().partition()); + }) + .collect(Collectors.toList()); + + LOG.info("{} files matched runtime filter {}", filteredFiles.size(), runtimeFilterExpr); + + if (filteredFiles.size() < files().size()) { Review comment: Added a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org