aokolnychyi commented on code in PR #4520:
URL: https://github.com/apache/iceberg/pull/4520#discussion_r846537774


##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -63,6 +64,7 @@ public TestMetadataTableFilters(MetadataTableType type, int 
formatVersion) {
   @Override
   public void setupTable() throws Exception {
     super.setupTable();
+    table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, 
"false").commit();

Review Comment:
   nit: can we split this into multiple lines like the invocation below?
   
   ```
   table.updateProperties()
       .set(..)
       .commit();
   ```



##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -252,6 +254,310 @@ public void testPlanTasks() {
     validateCombinedScanTasks(tasks, 0);
   }
 
+  @Test
+  public void testPartitionSpecEvolutionRemovalV1() {
+    Assume.assumeTrue(formatVersion == 1);
+
+    // Change spec and add two data and delete files each
+    table.updateSpec().removeField(Expressions.bucket("data", 16))
+        .addField("id").commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files and two delete files with new spec
+    PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 10);
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .build();
+    PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 11);
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .build();
+
+    DeleteFile delete10 = FileMetadata.deleteFileBuilder(newSpec)
+        .ofPositionDeletes()
+        .withPath("/path/to/data-10-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .withRecordCount(1)
+        .build();
+    DeleteFile delete11 = FileMetadata.deleteFileBuilder(newSpec)
+        .ofPositionDeletes()
+        .withPath("/path/to/data-11-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .withRecordCount(1)
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    if (formatVersion == 2) {
+      table.newRowDelta().addDeletes(delete10).commit();
+      table.newRowDelta().addDeletes(delete11).commit();
+    }
+
+    if (type.equals(MetadataTableType.ALL_DATA_FILES)) {
+      // Clear all files from current snapshot to test whether 'all' Files 
tables scans previous files
+      
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();  // 
Moves file entries to DELETED state
+      
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();  // 
Removes all entries
+      Assert.assertEquals("Current snapshot should be made empty",
+          0, table.currentSnapshot().allManifests().size());
+    }
+
+    Table metadataTable = createMetadataTable();
+    Expression filter = Expressions.and(
+        Expressions.equal("partition.id", 10),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = metadataTable.newScan().filter(filter);
+    CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+    // All 4 original data/delete files written by old spec, plus one data 
file/delete file written by new spec
+    Assert.assertEquals(expectedScanTaskCount(5), Iterables.size(tasks));
+
+    filter = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    scan = metadataTable.newScan().filter(filter);
+    tasks = scan.planFiles();
+
+    // 1 original data/delete files written by old spec (V1 filters out new 
specs which don't have this value)
+    Assert.assertEquals(expectedScanTaskCount(1), Iterables.size(tasks));
+  }
+
+  @Test
+  public void testPartitionSpecEvolutionRemovalV2() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    // Change spec and add two data and delete files each
+    table.updateSpec().removeField(Expressions.bucket("data", 16))

Review Comment:
   nit: same as above



##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -92,7 +93,16 @@ public TableScan appendsAfter(long fromSnapshotId) {
     protected CloseableIterable<FileScanTask> planFiles(TableOperations ops, 
Snapshot snapshot, Expression rowFilter,
                                                         boolean 
ignoreResiduals, boolean caseSensitive,
                                                         boolean colStats) {
-      CloseableIterable<ManifestFile> filtered = filterManifests(manifests(), 
rowFilter, caseSensitive);
+      Map<Integer, PartitionSpec> specsById = table().specs();
+
+      LoadingCache<Integer, ManifestEvaluator> evalCache = 
Caffeine.newBuilder().build(specId -> {
+        PartitionSpec spec = specsById.get(specId);
+        PartitionSpec transformedSpec = transformSpec(fileSchema, spec);
+        return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, 
caseSensitive);
+      });
+
+      CloseableIterable<ManifestFile> filtered = 
CloseableIterable.filter(manifests(),

Review Comment:
   nit: shall we call it `filteredManifests`?



##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -252,6 +254,310 @@ public void testPlanTasks() {
     validateCombinedScanTasks(tasks, 0);
   }
 
+  @Test
+  public void testPartitionSpecEvolutionRemovalV1() {
+    Assume.assumeTrue(formatVersion == 1);
+
+    // Change spec and add two data and delete files each
+    table.updateSpec().removeField(Expressions.bucket("data", 16))

Review Comment:
   nit: same here, I think it would be more readable to have each call on a 
separate line.



##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -252,6 +254,310 @@ public void testPlanTasks() {
     validateCombinedScanTasks(tasks, 0);
   }
 
+  @Test
+  public void testPartitionSpecEvolutionRemovalV1() {
+    Assume.assumeTrue(formatVersion == 1);
+
+    // Change spec and add two data and delete files each
+    table.updateSpec().removeField(Expressions.bucket("data", 16))
+        .addField("id").commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files and two delete files with new spec
+    PartitionKey data10Key = new PartitionKey(newSpec, table.schema());

Review Comment:
   An alternative way to define partitioning is to use `withPartitionPath` and 
specify a string.
   I don't have a preference, just FYI.



##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -252,6 +254,310 @@ public void testPlanTasks() {
     validateCombinedScanTasks(tasks, 0);
   }
 
+  @Test
+  public void testPartitionSpecEvolutionRemovalV1() {
+    Assume.assumeTrue(formatVersion == 1);
+
+    // Change spec and add two data and delete files each
+    table.updateSpec().removeField(Expressions.bucket("data", 16))
+        .addField("id").commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files and two delete files with new spec
+    PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 10);
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .build();
+    PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 11);
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .build();
+
+    DeleteFile delete10 = FileMetadata.deleteFileBuilder(newSpec)
+        .ofPositionDeletes()
+        .withPath("/path/to/data-10-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .withRecordCount(1)
+        .build();
+    DeleteFile delete11 = FileMetadata.deleteFileBuilder(newSpec)
+        .ofPositionDeletes()
+        .withPath("/path/to/data-11-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .withRecordCount(1)
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    if (formatVersion == 2) {
+      table.newRowDelta().addDeletes(delete10).commit();
+      table.newRowDelta().addDeletes(delete11).commit();
+    }
+
+    if (type.equals(MetadataTableType.ALL_DATA_FILES)) {
+      // Clear all files from current snapshot to test whether 'all' Files 
tables scans previous files
+      
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();  // 
Moves file entries to DELETED state
+      
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();  // 
Removes all entries
+      Assert.assertEquals("Current snapshot should be made empty",
+          0, table.currentSnapshot().allManifests().size());
+    }
+
+    Table metadataTable = createMetadataTable();
+    Expression filter = Expressions.and(
+        Expressions.equal("partition.id", 10),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = metadataTable.newScan().filter(filter);
+    CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+    // All 4 original data/delete files written by old spec, plus one data 
file/delete file written by new spec
+    Assert.assertEquals(expectedScanTaskCount(5), Iterables.size(tasks));
+
+    filter = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    scan = metadataTable.newScan().filter(filter);
+    tasks = scan.planFiles();
+
+    // 1 original data/delete files written by old spec (V1 filters out new 
specs which don't have this value)
+    Assert.assertEquals(expectedScanTaskCount(1), Iterables.size(tasks));
+  }
+
+  @Test
+  public void testPartitionSpecEvolutionRemovalV2() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    // Change spec and add two data and delete files each
+    table.updateSpec().removeField(Expressions.bucket("data", 16))
+        .addField("id").commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files and two delete files with new spec
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartitionPath("id=10")
+        .build();
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartitionPath("id=11")
+        .build();
+
+    DeleteFile delete10 = FileMetadata.deleteFileBuilder(newSpec)
+        .ofPositionDeletes()
+        .withPath("/path/to/data-10-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("id=10")
+        .withRecordCount(1)
+        .build();
+    DeleteFile delete11 = FileMetadata.deleteFileBuilder(newSpec)
+        .ofPositionDeletes()
+        .withPath("/path/to/data-11-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("id=11")
+        .withRecordCount(1)
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    if (formatVersion == 2) {
+      table.newRowDelta().addDeletes(delete10).commit();
+      table.newRowDelta().addDeletes(delete11).commit();
+    }
+
+    if (type.equals(MetadataTableType.ALL_DATA_FILES)) {
+      // Clear all files from current snapshot to test whether 'all' Files 
tables scans previous files
+      
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();  // 
Moves file entries to DELETED state
+      
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();  // 
Removes all entries
+      Assert.assertEquals("Current snapshot should be made empty",
+          0, table.currentSnapshot().allManifests().size());
+    }
+
+    Table metadataTable = createMetadataTable();
+    Expression filter = Expressions.and(
+        Expressions.equal("partition.id", 10),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = metadataTable.newScan().filter(filter);
+    CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+    // All 4 original data/delete files written by old spec, plus one new data 
file/delete file written by new spec
+    Assert.assertEquals(expectedScanTaskCount(5), Iterables.size(tasks));
+
+    filter = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    scan = metadataTable.newScan().filter(filter);
+    tasks = scan.planFiles();
+
+    // 1 original data/delete files written by old spec, plus both of new data 
file/delete file written by new spec
+    Assert.assertEquals(expectedScanTaskCount(3), Iterables.size(tasks));
+  }
+
+  @Test
+  public void testPartitionSpecEvolutionAdditiveV1() {

Review Comment:
   some of the above comments to earlier tests apply to this one too



##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -252,6 +254,310 @@ public void testPlanTasks() {
     validateCombinedScanTasks(tasks, 0);
   }
 
+  @Test
+  public void testPartitionSpecEvolutionRemovalV1() {

Review Comment:
   nit: some comments in this test refer to delete files which cannot be part 
of v1 tables



##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -252,6 +254,310 @@ public void testPlanTasks() {
     validateCombinedScanTasks(tasks, 0);
   }
 
+  @Test
+  public void testPartitionSpecEvolutionRemovalV1() {
+    Assume.assumeTrue(formatVersion == 1);
+
+    // Change spec and add two data and delete files each
+    table.updateSpec().removeField(Expressions.bucket("data", 16))
+        .addField("id").commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files and two delete files with new spec
+    PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 10);
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .build();
+    PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 11);
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .build();
+
+    DeleteFile delete10 = FileMetadata.deleteFileBuilder(newSpec)

Review Comment:
   I don't think you need delete files for v1 tests.



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java:
##########
@@ -92,17 +105,11 @@
                 "cache-enabled", "false" // Spark will delete tables using v1, 
leaving the cache out of sync
             ),
             AVRO,
-            formatVersion()
+            1

Review Comment:
   Shall we test v2 for Avro as well?



##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -252,6 +254,310 @@ public void testPlanTasks() {
     validateCombinedScanTasks(tasks, 0);
   }
 
+  @Test
+  public void testPartitionSpecEvolutionRemovalV1() {
+    Assume.assumeTrue(formatVersion == 1);
+
+    // Change spec and add two data and delete files each
+    table.updateSpec().removeField(Expressions.bucket("data", 16))
+        .addField("id").commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files and two delete files with new spec
+    PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 10);
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .build();
+    PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 11);
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .build();
+
+    DeleteFile delete10 = FileMetadata.deleteFileBuilder(newSpec)
+        .ofPositionDeletes()
+        .withPath("/path/to/data-10-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .withRecordCount(1)
+        .build();
+    DeleteFile delete11 = FileMetadata.deleteFileBuilder(newSpec)
+        .ofPositionDeletes()
+        .withPath("/path/to/data-11-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .withRecordCount(1)
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    if (formatVersion == 2) {

Review Comment:
   nit: do we need this branch? looks like the test is for v1.



##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -92,7 +93,16 @@ public TableScan appendsAfter(long fromSnapshotId) {
     protected CloseableIterable<FileScanTask> planFiles(TableOperations ops, 
Snapshot snapshot, Expression rowFilter,
                                                         boolean 
ignoreResiduals, boolean caseSensitive,
                                                         boolean colStats) {
-      CloseableIterable<ManifestFile> filtered = filterManifests(manifests(), 
rowFilter, caseSensitive);
+      Map<Integer, PartitionSpec> specsById = table().specs();

Review Comment:
   In a few other places, we copy specs into a new map via `Maps.newHashMap` to 
make sure Java collections are used. This is done to avoid any potential Kryo 
issues as `ScanTasks` are serializable. Maybe, we should do the same here.
   
   At the end of the method, we call `ops.io(), ops.current().specsById()` to 
construct a task. In theory, we could pass `Table` object to `ManifestReadTask` 
and then access `FileIO` via `table.io()` and copy specs into a Java map in the 
constructor. Up to you. At least, no need to call `ops.current().specsById()` 
as we already have specs.



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java:
##########
@@ -189,6 +196,109 @@ public void testFilesMetadataTable() throws 
ParseException {
     }
   }
 
+  @Test
+  public void testFilesMetadataTableFilter() throws ParseException {
+    sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) 
USING iceberg " +
+        "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName);
+    initTable();
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+    // verify the metadata tables while the current spec is still unpartitioned
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      Dataset<Row> df = loadMetadataTable(tableType);
+      Assert.assertTrue("Partition must be skipped", 
df.schema().getFieldIndex("partition").isEmpty());
+    }
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateSpec()
+        .addField("data")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+    // verify the metadata tables after adding the first partition column
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row("d2")),
+          "STRUCT<data:STRING>",
+          tableType,
+          "partition.data = 'd2'");
+    }
+
+    table.updateSpec()
+        .addField("category")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+    Dataset<Row> result = loadMetadataTable(ALL_ENTRIES);
+    result.collectAsList().forEach(f -> System.out.println(f));

Review Comment:
   Do we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to