szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147018350


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+
+    // parallelize the manifest read and

Review Comment:
   Comment is incomplete, let's just remove it as I don't see that much value.



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting

Review Comment:
   Could we consolidate these two VisibleForTesting methods?  Would just 
exposing dataFiles() work for those tests?  I don't really exposing Partition 
inner class.



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, 
normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained 
via scan
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = 
transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    // hardcoded to avoid scan stats column on partition table
+                    .select(BaseScan.SCAN_COLUMNS));
+
+    return (scan.planExecutor() != null)

Review Comment:
   Nit: do we need extra () here?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution

Review Comment:
   Nit: can we add one line before this?
   
   And also, if we keep comment, we can just do something shorter like : 
```handle partition evolution```



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, 
normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained 
via scan

Review Comment:
   Comment seems wrong , it's only data files.  I would suggest to remove it, 
as its pretty obvious what its's doing?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, 
normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained 
via scan
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = 
transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    // hardcoded to avoid scan stats column on partition table

Review Comment:
   Nit: I'd also vote to remove this comment, not sure if this line in 
particular needs explanation over other ones.  
   
   If needed maybe we can put it after the select like
   
   ```.select(SCAN_COLUMNS); // don't select stats columns```



##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan, 
boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
-  }
-
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+  protected void validatePartition(
+      Iterable<PartitionsTable.Partition> parts, int position, int 
partitionValue) {

Review Comment:
   Id actually prefer this to be CloseableIterable<DataFile>, as Partition 
seems a bit internal class and shouldn't be used.  I think it would be the 
same, unless I'm mistaken?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -238,5 +241,9 @@ void update(DataFile file) {
       this.dataFileCount += 1;
       this.specId = file.specId();
     }
+
+    StructLike key() {

Review Comment:
   Can remove, if we decide to make tests use 'planDataFiles'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to