szehon-ho commented on code in PR #4637:
URL: https://github.com/apache/iceberg/pull/4637#discussion_r865134889


##########
core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java:
##########
@@ -550,6 +550,119 @@ public void testDeleteFilesTableSelection() throws 
IOException {
     Assert.assertEquals(expected, scan.schema().asStruct());
   }
 
+  @Test
+  public void testPartitionSpecEvolutionAdditive() {
+    preparePartitionedTable();
+
+    // Change spec and add two data files
+    table.updateSpec()
+        .addField("id")
+        .commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files with new spec
+    PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(0, 0); // data=0
+    data10Key.set(1, 10); // id=10
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .build();
+    PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+    data11Key.set(0, 1); // data=0
+    data10Key.set(1, 11); // id=11
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    Table metadataTable = new PartitionsTable(table.ops(), table);
+    Expression filter = Expressions.and(
+        Expressions.equal("partition.id", 10),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = metadataTable.newScan().filter(filter);
+    CloseableIterable<FileScanTask> tasks = 
PartitionsTable.planFiles((StaticTableScan) scan);
+
+    // Four data files of old spec, one new data file of new spec
+    Assert.assertEquals(5, Iterables.size(tasks));
+
+    filter = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    scan = metadataTable.newScan().filter(filter);
+    tasks = PartitionsTable.planFiles((StaticTableScan) scan);
+
+    // 1 original data file written by old spec, plus 1 new data file written 
by new spec
+    Assert.assertEquals(2, Iterables.size(tasks));
+  }
+
+  @Test
+  public void testPartitionSpecEvolutionRemoval() {
+    preparePartitionedTable();
+
+    // Remove partition field
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField("id")
+        .commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files with new spec
+    // Partition Fields are replaced in V1 with void and actually removed in V2
+    int partIndex = (formatVersion == 1) ? 1 : 0;
+    PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(partIndex, 10);
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .build();
+    PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+    data11Key.set(partIndex, 11);
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    Table metadataTable = new PartitionsTable(table.ops(), table);
+    Expression filter = Expressions.and(
+        Expressions.equal("partition.id", 10),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = metadataTable.newScan().filter(filter);
+    CloseableIterable<FileScanTask> tasks = 
PartitionsTable.planFiles((StaticTableScan) scan);
+
+    // Four original files of original spec, one data file written by new spec
+    Assert.assertEquals(5, Iterables.size(tasks));
+
+    filter = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    scan = metadataTable.newScan().filter(filter);
+    tasks = PartitionsTable.planFiles((StaticTableScan) scan);
+
+    if (formatVersion == 1) {
+      // 1 original data file written by old spec
+      Assert.assertEquals(1, Iterables.size(tasks));
+    } else {
+      // V2 drops the partition field so it is not used the planning, though 
data is still filtered out later
+      // 1 original data/delete files written by old spec, plus both of new 
data file/delete file written by new spec
+      Assert.assertEquals(3, Iterables.size(tasks));

Review Comment:
   I tried to clarify the comment, can you see if it makes sense?
   
   It's a bit confusing, but the background is, this occurs when trying to 
query filtering on a dropped partition field (data).  The correct behavior, 
because this is a partition table, is that only partitions of old spec are 
returned and partitions of new spec without data should not be returned.
   
   In V1, new files are written with void transform for the dropped field 
(data=null), so the predicate pushdown can filter them out early.
   
   In V2 new files do not write any values for data, so predicate pushdown 
cannot filter them out.
   
   However, they are filtered out later by Spark data filtering, because the 
partition values are normalized to the Partioning,partitionType (union of all 
specs), and data is filled in as 'null' when returning to Spark.  That was done 
in https://github.com/apache/iceberg/pull/4560.  This is shown in the new test 
added added in TestMetadataTablesWithPartitionEvolution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to