szehon-ho commented on code in PR #7190: URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147018350
########## core/src/main/java/org/apache/iceberg/PartitionsTable.java: ########## @@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) { partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount); } - private static Iterable<Partition> partitions(Table table, StaticTableScan scan) { - CloseableIterable<FileScanTask> tasks = planFiles(scan); + @VisibleForTesting + static Iterable<Partition> partitions(Table table, StaticTableScan scan) { Types.StructType normalizedPartitionType = Partitioning.partitionType(table); PartitionMap partitions = new PartitionMap(); // cache a position map needed by each partition spec to normalize partitions to final schema Map<Integer, int[]> normalizedPositionsBySpec = Maps.newHashMapWithExpectedSize(table.specs().size()); + // logic to handle the partition evolution + int[] normalizedPositions = + normalizedPositionsBySpec.computeIfAbsent( + table.spec().specId(), + specId -> normalizedPositions(table, specId, normalizedPartitionType)); + + // parallelize the manifest read and Review Comment: Comment is incomplete, let's just remove it as I don't see that much value. ########## core/src/main/java/org/apache/iceberg/PartitionsTable.java: ########## @@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) { partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount); } - private static Iterable<Partition> partitions(Table table, StaticTableScan scan) { - CloseableIterable<FileScanTask> tasks = planFiles(scan); + @VisibleForTesting Review Comment: Could we consolidate these two VisibleForTesting methods? Would just exposing dataFiles() work for those tests? I don't really exposing Partition inner class. ########## core/src/main/java/org/apache/iceberg/PartitionsTable.java: ########## @@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) { partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount); } - private static Iterable<Partition> partitions(Table table, StaticTableScan scan) { - CloseableIterable<FileScanTask> tasks = planFiles(scan); + @VisibleForTesting + static Iterable<Partition> partitions(Table table, StaticTableScan scan) { Types.StructType normalizedPartitionType = Partitioning.partitionType(table); PartitionMap partitions = new PartitionMap(); // cache a position map needed by each partition spec to normalize partitions to final schema Map<Integer, int[]> normalizedPositionsBySpec = Maps.newHashMapWithExpectedSize(table.specs().size()); + // logic to handle the partition evolution + int[] normalizedPositions = + normalizedPositionsBySpec.computeIfAbsent( + table.spec().specId(), + specId -> normalizedPositions(table, specId, normalizedPartitionType)); + + // parallelize the manifest read and + CloseableIterable<DataFile> datafiles = planDataFiles(scan); - for (FileScanTask task : tasks) { - PartitionData original = (PartitionData) task.file().partition(); - int[] normalizedPositions = - normalizedPositionsBySpec.computeIfAbsent( - task.spec().specId(), - specId -> normalizedPositions(table, specId, normalizedPartitionType)); + for (DataFile dataFile : datafiles) { + PartitionData original = (PartitionData) dataFile.partition(); PartitionData normalized = normalizePartition(original, normalizedPartitionType, normalizedPositions); - partitions.get(normalized).update(task.file()); + partitions.get(normalized).update(dataFile); } + return partitions.all(); } + @VisibleForTesting + static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) { + Table table = scan.table(); + Snapshot snapshot = scan.snapshot(); + + // read list of data and delete manifests from current snapshot obtained via scan + CloseableIterable<ManifestFile> dataManifests = + CloseableIterable.withNoopClose(snapshot.dataManifests(table.io())); + + LoadingCache<Integer, ManifestEvaluator> evalCache = + Caffeine.newBuilder() + .build( + specId -> { + PartitionSpec spec = table.specs().get(specId); + PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec); + return ManifestEvaluator.forRowFilter( + scan.filter(), transformedSpec, scan.isCaseSensitive()); + }); + + CloseableIterable<ManifestFile> filteredManifests = + CloseableIterable.filter( + dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); + + Iterable<CloseableIterable<DataFile>> tasks = + CloseableIterable.transform( + filteredManifests, + manifest -> + ManifestFiles.read(manifest, table.io(), table.specs()) + .caseSensitive(scan.isCaseSensitive()) + // hardcoded to avoid scan stats column on partition table + .select(BaseScan.SCAN_COLUMNS)); + + return (scan.planExecutor() != null) Review Comment: Nit: do we need extra () here? ########## core/src/main/java/org/apache/iceberg/PartitionsTable.java: ########## @@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) { partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount); } - private static Iterable<Partition> partitions(Table table, StaticTableScan scan) { - CloseableIterable<FileScanTask> tasks = planFiles(scan); + @VisibleForTesting + static Iterable<Partition> partitions(Table table, StaticTableScan scan) { Types.StructType normalizedPartitionType = Partitioning.partitionType(table); PartitionMap partitions = new PartitionMap(); // cache a position map needed by each partition spec to normalize partitions to final schema Map<Integer, int[]> normalizedPositionsBySpec = Maps.newHashMapWithExpectedSize(table.specs().size()); + // logic to handle the partition evolution Review Comment: Nit: can we add one line before this? And also, if we keep comment, we can just do something shorter like : ```handle partition evolution``` ########## core/src/main/java/org/apache/iceberg/PartitionsTable.java: ########## @@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) { partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount); } - private static Iterable<Partition> partitions(Table table, StaticTableScan scan) { - CloseableIterable<FileScanTask> tasks = planFiles(scan); + @VisibleForTesting + static Iterable<Partition> partitions(Table table, StaticTableScan scan) { Types.StructType normalizedPartitionType = Partitioning.partitionType(table); PartitionMap partitions = new PartitionMap(); // cache a position map needed by each partition spec to normalize partitions to final schema Map<Integer, int[]> normalizedPositionsBySpec = Maps.newHashMapWithExpectedSize(table.specs().size()); + // logic to handle the partition evolution + int[] normalizedPositions = + normalizedPositionsBySpec.computeIfAbsent( + table.spec().specId(), + specId -> normalizedPositions(table, specId, normalizedPartitionType)); + + // parallelize the manifest read and + CloseableIterable<DataFile> datafiles = planDataFiles(scan); - for (FileScanTask task : tasks) { - PartitionData original = (PartitionData) task.file().partition(); - int[] normalizedPositions = - normalizedPositionsBySpec.computeIfAbsent( - task.spec().specId(), - specId -> normalizedPositions(table, specId, normalizedPartitionType)); + for (DataFile dataFile : datafiles) { + PartitionData original = (PartitionData) dataFile.partition(); PartitionData normalized = normalizePartition(original, normalizedPartitionType, normalizedPositions); - partitions.get(normalized).update(task.file()); + partitions.get(normalized).update(dataFile); } + return partitions.all(); } + @VisibleForTesting + static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) { + Table table = scan.table(); + Snapshot snapshot = scan.snapshot(); + + // read list of data and delete manifests from current snapshot obtained via scan Review Comment: Comment seems wrong , it's only data files. I would suggest to remove it, as its pretty obvious what its's doing? ########## core/src/main/java/org/apache/iceberg/PartitionsTable.java: ########## @@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) { partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount); } - private static Iterable<Partition> partitions(Table table, StaticTableScan scan) { - CloseableIterable<FileScanTask> tasks = planFiles(scan); + @VisibleForTesting + static Iterable<Partition> partitions(Table table, StaticTableScan scan) { Types.StructType normalizedPartitionType = Partitioning.partitionType(table); PartitionMap partitions = new PartitionMap(); // cache a position map needed by each partition spec to normalize partitions to final schema Map<Integer, int[]> normalizedPositionsBySpec = Maps.newHashMapWithExpectedSize(table.specs().size()); + // logic to handle the partition evolution + int[] normalizedPositions = + normalizedPositionsBySpec.computeIfAbsent( + table.spec().specId(), + specId -> normalizedPositions(table, specId, normalizedPartitionType)); + + // parallelize the manifest read and + CloseableIterable<DataFile> datafiles = planDataFiles(scan); - for (FileScanTask task : tasks) { - PartitionData original = (PartitionData) task.file().partition(); - int[] normalizedPositions = - normalizedPositionsBySpec.computeIfAbsent( - task.spec().specId(), - specId -> normalizedPositions(table, specId, normalizedPartitionType)); + for (DataFile dataFile : datafiles) { + PartitionData original = (PartitionData) dataFile.partition(); PartitionData normalized = normalizePartition(original, normalizedPartitionType, normalizedPositions); - partitions.get(normalized).update(task.file()); + partitions.get(normalized).update(dataFile); } + return partitions.all(); } + @VisibleForTesting + static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) { + Table table = scan.table(); + Snapshot snapshot = scan.snapshot(); + + // read list of data and delete manifests from current snapshot obtained via scan + CloseableIterable<ManifestFile> dataManifests = + CloseableIterable.withNoopClose(snapshot.dataManifests(table.io())); + + LoadingCache<Integer, ManifestEvaluator> evalCache = + Caffeine.newBuilder() + .build( + specId -> { + PartitionSpec spec = table.specs().get(specId); + PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec); + return ManifestEvaluator.forRowFilter( + scan.filter(), transformedSpec, scan.isCaseSensitive()); + }); + + CloseableIterable<ManifestFile> filteredManifests = + CloseableIterable.filter( + dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); + + Iterable<CloseableIterable<DataFile>> tasks = + CloseableIterable.transform( + filteredManifests, + manifest -> + ManifestFiles.read(manifest, table.io(), table.specs()) + .caseSensitive(scan.isCaseSensitive()) + // hardcoded to avoid scan stats column on partition table Review Comment: Nit: I'd also vote to remove this comment, not sure if this line in particular needs explanation over other ones. If needed maybe we can put it after the select like ```.select(SCAN_COLUMNS); // don't select stats columns``` ########## core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java: ########## @@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals } } - protected void validateIncludesPartitionScan( - CloseableIterable<FileScanTask> tasks, int partValue) { - validateIncludesPartitionScan(tasks, 0, partValue); - } - - protected void validateIncludesPartitionScan( - CloseableIterable<FileScanTask> tasks, int position, int partValue) { + protected void validatePartition( + Iterable<PartitionsTable.Partition> parts, int position, int partitionValue) { Review Comment: Id actually prefer this to be CloseableIterable<DataFile>, as Partition seems a bit internal class and shouldn't be used. I think it would be the same, unless I'm mistaken? ########## core/src/main/java/org/apache/iceberg/PartitionsTable.java: ########## @@ -238,5 +241,9 @@ void update(DataFile file) { this.dataFileCount += 1; this.specId = file.specId(); } + + StructLike key() { Review Comment: Can remove, if we decide to make tests use 'planDataFiles' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org