szehon-ho commented on code in PR #4520:
URL: https://github.com/apache/iceberg/pull/4520#discussion_r846556675
##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -252,6 +254,310 @@ public void testPlanTasks() {
validateCombinedScanTasks(tasks, 0);
}
+ @Test
+ public void testPartitionSpecEvolutionRemovalV1() {
+ Assume.assumeTrue(formatVersion == 1);
+
+ // Change spec and add two data and delete files each
+ table.updateSpec().removeField(Expressions.bucket("data", 16))
+ .addField("id").commit();
+ PartitionSpec newSpec = table.spec();
+
+ // Add two data files and two delete files with new spec
+ PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
Review Comment:
Yea I couldn't get it to work due to the void transform being there, didn't
try that hard though.
##########
core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java:
##########
@@ -252,6 +254,310 @@ public void testPlanTasks() {
validateCombinedScanTasks(tasks, 0);
}
+ @Test
+ public void testPartitionSpecEvolutionRemovalV1() {
+ Assume.assumeTrue(formatVersion == 1);
+
+ // Change spec and add two data and delete files each
+ table.updateSpec().removeField(Expressions.bucket("data", 16))
+ .addField("id").commit();
+ PartitionSpec newSpec = table.spec();
+
+ // Add two data files and two delete files with new spec
+ PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+ data10Key.set(1, 10);
+ DataFile data10 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-10.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartition(data10Key)
+ .build();
+ PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+ data10Key.set(1, 11);
+ DataFile data11 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-11.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartition(data11Key)
+ .build();
+
+ DeleteFile delete10 = FileMetadata.deleteFileBuilder(newSpec)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-10-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(data10Key)
+ .withRecordCount(1)
+ .build();
+ DeleteFile delete11 = FileMetadata.deleteFileBuilder(newSpec)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-11-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(data11Key)
+ .withRecordCount(1)
+ .build();
+
+ table.newFastAppend().appendFile(data10).commit();
+ table.newFastAppend().appendFile(data11).commit();
+
+ if (formatVersion == 2) {
Review Comment:
Good catch
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java:
##########
@@ -92,17 +105,11 @@
"cache-enabled", "false" // Spark will delete tables using v1,
leaving the cache out of sync
),
AVRO,
- formatVersion()
+ 1
Review Comment:
Yea was trying to not increase the test time (seems it was original
concern), but added it.
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java:
##########
@@ -189,6 +196,109 @@ public void testFilesMetadataTable() throws
ParseException {
}
}
+ @Test
+ public void testFilesMetadataTableFilter() throws ParseException {
+ sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string)
USING iceberg " +
+ "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName);
+ initTable();
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+ // verify the metadata tables while the current spec is still unpartitioned
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ Dataset<Row> df = loadMetadataTable(tableType);
+ Assert.assertTrue("Partition must be skipped",
df.schema().getFieldIndex("partition").isEmpty());
+ }
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateSpec()
+ .addField("data")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+ // verify the metadata tables after adding the first partition column
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row("d2")),
+ "STRUCT<data:STRING>",
+ tableType,
+ "partition.data = 'd2'");
+ }
+
+ table.updateSpec()
+ .addField("category")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+ Dataset<Row> result = loadMetadataTable(ALL_ENTRIES);
+ result.collectAsList().forEach(f -> System.out.println(f));
Review Comment:
Yea sorry left over debug statement
##########
core/src/main/java/org/apache/iceberg/BaseFilesTable.java:
##########
@@ -92,7 +93,16 @@ public TableScan appendsAfter(long fromSnapshotId) {
protected CloseableIterable<FileScanTask> planFiles(TableOperations ops,
Snapshot snapshot, Expression rowFilter,
boolean
ignoreResiduals, boolean caseSensitive,
boolean colStats) {
- CloseableIterable<ManifestFile> filtered = filterManifests(manifests(),
rowFilter, caseSensitive);
+ Map<Integer, PartitionSpec> specsById = table().specs();
Review Comment:
Hm, not sure I completely see, this specsById is just used to lookup a cache
right, before the ManifestReadTask is returned?
So I didn't change this line but changed the ManifestReadTask constructor to
take table and there do Maps.newHashMap, let me know if thats what you wanted
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]