szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1159072818


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,30 +94,72 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);

Review Comment:
   Nit: there's a bit more newline here than in normal Iceberg code base, I 
would just move this line up, as its also initializing some variables same as 
'normalizedPartitionsBySpec'



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -156,43 +196,6 @@ private static PartitionData normalizePartition(
     return normalizedPartition;
   }
 
-  @VisibleForTesting
-  static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {

Review Comment:
   Thought about keeping this method as deprecated, but VisibleForTesting 
implies its a private method and should not be used outside unit tests.  So my 
thought is I'm ok with removing it.



##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan, 
boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
-  }
-
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+  protected void validatePartition(

Review Comment:
   Can we keep the other method that delegated to this one with 0 as default ?  
Having 0 as default made the calls a bit easier to read.  Just a minor style 
preference



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,30 +94,72 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
+
       int[] normalizedPositions =
           normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
+              dataFile.specId(),
               specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, 
normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = 
transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    .select(BaseScan.SCAN_COLUMNS)); // don't select stats 
columns
+
+    return new ParallelIterable<>(tasks, scan.planExecutor());
+  }
+
   /**
-   * Builds an array of the field position of positions in the normalized 
partition type indexed by
-   * field position in the original partition type
+   * Builds an integer array to map the partition field from original to 
normalized where index of

Review Comment:
   I think this is a bit run-on and missing some sentence breaks.  What do you 
think about?
   ```
   Builds an integer array for a specific partition type to map its partitions 
to the final normalized type.
   
   The array represents fields in the original partition type, with the index 
being the field's index in the original partition type, and the value being the 
field's index in the normalized partiiton type.
   
   @table iceberg table
   @specId id of the specified partition type
   ...
   ```



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,30 +94,72 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
+

Review Comment:
   Same, I would keep the original structure and remove the newlines from the 
for block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to