rdblue commented on a change in pull request #3400: URL: https://github.com/apache/iceberg/pull/3400#discussion_r740531075
########## File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java ########## @@ -74,60 +101,158 @@ throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan"); } - // look for split behavior overrides in options - this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null); - this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null); - this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null); + this.splitSize = table instanceof BaseMetadataTable ? readConf.metadataSplitSize() : readConf.splitSize(); + this.splitLookback = readConf.splitLookback(); + this.splitOpenFileCost = readConf.splitOpenFileCost(); + this.runtimeFilterExpressions = Lists.newArrayList(); + } + + private Set<Integer> specIds() { + if (specIds == null) { + Set<Integer> specIdSet = Sets.newHashSet(); + for (FileScanTask file : files()) { + specIdSet.add(file.spec().specId()); + } + this.specIds = specIdSet; + } + + return specIds; + } + + private List<FileScanTask> files() { + if (files == null) { + this.files = planFiles(); + } + + return files; + } + + private List<FileScanTask> planFiles() { + TableScan scan = table() + .newScan() + .option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize)) + .option(TableProperties.SPLIT_LOOKBACK, String.valueOf(splitLookback)) + .option(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(splitOpenFileCost)) + .caseSensitive(caseSensitive()) + .project(expectedSchema()); + + if (snapshotId != null) { + scan = scan.useSnapshot(snapshotId); + } + + if (asOfTimestamp != null) { + scan = scan.asOfTime(asOfTimestamp); + } + + if (startSnapshotId != null) { + if (endSnapshotId != null) { + scan = scan.appendsBetween(startSnapshotId, endSnapshotId); + } else { + scan = scan.appendsAfter(startSnapshotId); + } + } + + for (Expression filter : filterExpressions()) { + scan = scan.filter(filter); + } + + try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) { + return Lists.newArrayList(filesIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); + } } @Override protected List<CombinedScanTask> tasks() { if (tasks == null) { - TableScan scan = table() - .newScan() - .caseSensitive(caseSensitive()) - .project(expectedSchema()); + CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(files()), + splitSize); + CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks( + splitFiles, splitSize, + splitLookback, splitOpenFileCost); + tasks = Lists.newArrayList(scanTasks); + } - if (snapshotId != null) { - scan = scan.useSnapshot(snapshotId); - } + return tasks; + } - if (asOfTimestamp != null) { - scan = scan.asOfTime(asOfTimestamp); - } + @Override + public NamedReference[] filterAttributes() { + Set<Integer> partitionFieldSourceIds = Sets.newHashSet(); - if (startSnapshotId != null) { - if (endSnapshotId != null) { - scan = scan.appendsBetween(startSnapshotId, endSnapshotId); - } else { - scan = scan.appendsAfter(startSnapshotId); - } + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + for (PartitionField field : spec.fields()) { + partitionFieldSourceIds.add(field.sourceId()); } + } - if (splitSize != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); - } + Map<Integer, String> quotedNameById = TypeUtil.indexQuotedNameById( + expectedSchema().asStruct(), + name -> String.format("`%s`", name.replace("`", "``"))); - if (splitLookback != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); - } + return partitionFieldSourceIds.stream() + .filter(fieldId -> expectedSchema().findField(fieldId) != null) + .map(fieldId -> Spark3Util.toNamedReference(quotedNameById.get(fieldId))) + .toArray(NamedReference[]::new); + } - if (splitOpenFileCost != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); + @Override + public void filter(Filter[] filters) { + Expression runtimeFilterExpr = convertRuntimeFilters(filters); + + if (runtimeFilterExpr != Expressions.alwaysTrue()) { + Map<Integer, Evaluator> evaluatorsBySpecId = Maps.newHashMap(); + + for (Integer specId : specIds()) { + PartitionSpec spec = table().specs().get(specId); + Expression inclusiveExpr = Projections.inclusive(spec).project(runtimeFilterExpr); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + evaluatorsBySpecId.put(specId, inclusive); } - for (Expression filter : filterExpressions()) { - scan = scan.filter(filter); + LOG.info("Trying to filter {} files using runtime filter {}", files().size(), runtimeFilterExpr); + + List<FileScanTask> filteredFiles = files().stream() + .filter(file -> { + Evaluator evaluator = evaluatorsBySpecId.get(file.spec().specId()); + return evaluator.eval(file.file().partition()); + }) + .collect(Collectors.toList()); + + LOG.info("{}/{} files matched runtime filter {}", filteredFiles.size(), files().size(), runtimeFilterExpr); Review comment: I agree, but I think that's something that we can possibly add later. I'm not sure how to make data appear in the Spark UI from data sources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org