RussellSpitzer commented on code in PR #7585:
URL: https://github.com/apache/iceberg/pull/7585#discussion_r1204806850
##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -1384,6 +1384,104 @@ public void testRewriteJobOrderFilesDesc() {
Assert.assertNotEquals("Number of files order should not be ascending",
actual, expected);
}
+ @Test
+ public void testZOrderSortPartitionEvolution() {
+ int originalFiles = 20;
+ Table table = createTable(originalFiles);
+ shouldHaveLastCommitUnsorted(table, "c2");
+ shouldHaveFiles(table, originalFiles);
+
+ table
+ .updateSpec()
+ .addField(Expressions.bucket("c1", 2))
+ .addField(Expressions.bucket("c2", 2))
+ .commit();
+
+ long dataSizeBefore = testDataSize(table);
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .zOrder("c2", "c3")
+ .option(
+ SortStrategy.MAX_FILE_SIZE_BYTES,
+ Integer.toString((averageFileSize(table) / 2) + 2))
+ // Divide files in 2
+ .option(
+ RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
+ Integer.toString(averageFileSize(table) / 2))
+ .option(SortStrategy.MIN_INPUT_FILES, "1")
+ .execute();
+
+ Assert.assertEquals("Should have 1 fileGroups", 1,
result.rewriteResults().size());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ }
+
+ @Test
+ public void testRewriteWithDifferentOutputSpecIds() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+
+ // simulate multiple partition specs with different commit
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ performRewriteAndAssertForAllTableSpecs(table, "bin-pack");
+ performRewriteAndAssertForAllTableSpecs(table, "sort");
+ performRewriteAndAssertForAllTableSpecs(table, "zOrder");
+ }
+
+ private void performRewriteAndAssertForAllTableSpecs(Table table, String
strategy) {
+ assertThat(table.specs()).hasSize(3);
+
+ table
+ .specs()
+ .entrySet()
+ .forEach(
+ specEntry -> {
+ long dataSize = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ executeRewriteStrategy(table, specEntry.getKey(), strategy);
+ assertThat(dataSize).isEqualTo(result.rewrittenBytesCount());
+
+ long postRewriteCount = currentData().size();
+ assertThat(postRewriteCount).isEqualTo(count);
+
+ assertSpecIdFromDataFiles(specEntry, currentDataFiles(table));
+ });
+ }
+
+ private RewriteDataFiles.Result executeRewriteStrategy(
+ Table table, Integer outputSpecId, String strategy) {
+
+ RewriteDataFiles rewriteDataFiles =
+ basicRewrite(table)
+ .option(SparkWriteOptions.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(BinPackStrategy.REWRITE_ALL, "true");
+
+ RewriteDataFiles.Result result = null;
+ if (strategy.equals("bin-pack")) {
+ result = rewriteDataFiles.binPack().execute();
+ } else if (strategy.equals("sort")) {
+ result =
+ rewriteDataFiles
+
.sort(SortOrder.builderFor(table.schema()).asc("c2").asc("c3").build())
+ .execute();
+ } else if (strategy.equals("zOrder")) {
+ result = rewriteDataFiles.zOrder("c2", "c3").execute();
+ }
+ return result;
+ }
+
+ private void assertSpecIdFromDataFiles(
Review Comment:
Not sure what this function name is checking. Probably needs a rename and
defined purpose. Seems like at the moment it's checking whether a single id is
used for any data file in a list of data files. We probably want a function
that is just checking whether only a single partition spec id is used.
The function arg is also confusing, why do we take "specEntry" when we don't
use the PartitionSpec
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]