This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c13c0c94da Spark 3.4, 3.3 : Support output-spec-id in rewrite data
files(#9901)(Backport #9803)
c13c0c94da is described below
commit c13c0c94daf9f6abc34cb109a1a4fd15c8a47a04
Author: Himadri Pal <[email protected]>
AuthorDate: Fri Mar 8 23:29:09 2024 -0800
Spark 3.4, 3.3 : Support output-spec-id in rewrite data
files(#9901)(Backport #9803)
Co-authored-by: hpal <[email protected]>
---
.../spark/actions/RewriteDataFilesSparkAction.java | 4 +-
.../spark/actions/SparkBinPackDataRewriter.java | 3 +-
.../spark/actions/SparkShufflingDataRewriter.java | 22 +++-
.../spark/actions/SparkSortDataRewriter.java | 5 +
.../spark/actions/SparkZOrderDataRewriter.java | 21 ++++
.../spark/actions/TestRewriteDataFilesAction.java | 126 +++++++++++++++++++++
.../spark/actions/RewriteDataFilesSparkAction.java | 3 +-
.../spark/actions/SparkBinPackDataRewriter.java | 3 +-
.../spark/actions/SparkShufflingDataRewriter.java | 20 +++-
.../spark/actions/SparkZOrderDataRewriter.java | 16 +++
.../spark/actions/TestRewriteDataFilesAction.java | 125 ++++++++++++++++++++
11 files changed, 337 insertions(+), 11 deletions(-)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index a5a69dea95..ae547e2063 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -80,8 +80,8 @@ public class RewriteDataFilesSparkAction
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
- REWRITE_JOB_ORDER);
-
+ REWRITE_JOB_ORDER,
+ OUTPUT_SPEC_ID);
private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
index 9a96f44ebd..d256bf2794 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
@@ -58,13 +58,14 @@ class SparkBinPackDataRewriter extends
SparkSizeBasedDataRewriter {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.DISTRIBUTION_MODE,
distributionMode(group).modeName())
+ .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}
// invoke a shuffle if the original spec does not match the output spec
private DistributionMode distributionMode(List<FileScanTask> group) {
- boolean requiresRepartition = !group.get(0).spec().equals(table().spec());
+ boolean requiresRepartition = !group.get(0).spec().equals(outputSpec());
return requiresRepartition ? DistributionMode.RANGE :
DistributionMode.NONE;
}
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
index 1add6383c6..63f2c88d6b 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -61,6 +63,18 @@ abstract class SparkShufflingDataRewriter extends
SparkSizeBasedDataRewriter {
super(spark, table);
}
+ protected abstract org.apache.iceberg.SortOrder sortOrder();
+
+ /**
+ * Retrieves and returns the schema for the rewrite using the current table
schema.
+ *
+ * <p>The schema with all columns required for correctly sorting the table.
This may include
+ * additional computed columns which are not written to the table but are
used for sorting.
+ */
+ protected Schema sortSchema() {
+ return table().schema();
+ }
+
protected abstract Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask>
group);
@Override
@@ -97,6 +111,7 @@ abstract class SparkShufflingDataRewriter extends
SparkSizeBasedDataRewriter {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+ .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}
@@ -113,11 +128,12 @@ abstract class SparkShufflingDataRewriter extends
SparkSizeBasedDataRewriter {
protected org.apache.iceberg.SortOrder outputSortOrder(
List<FileScanTask> group, org.apache.iceberg.SortOrder sortOrder) {
- boolean includePartitionColumns =
!group.get(0).spec().equals(table().spec());
- if (includePartitionColumns) {
+ PartitionSpec spec = outputSpec();
+ boolean requiresRepartitioning = !group.get(0).spec().equals(spec);
+ if (requiresRepartitioning) {
// build in the requirement for partition sorting into our sort order
// as the original spec for this group does not match the output spec
- return SortOrderUtil.buildSortOrder(table(), sortOrder);
+ return SortOrderUtil.buildSortOrder(sortSchema(), spec, sortOrder());
} else {
return sortOrder;
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java
index 4615f3cebc..621e73bc60 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java
@@ -53,6 +53,11 @@ class SparkSortDataRewriter extends
SparkShufflingDataRewriter {
return "SORT";
}
+ @Override
+ protected SortOrder sortOrder() {
+ return sortOrder;
+ }
+
@Override
protected Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group) {
return sort(df, outputSortOrder(group, sortOrder));
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
index 138e126f0a..52b3f9e158 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkUtil;
@@ -103,6 +104,26 @@ class SparkZOrderDataRewriter extends
SparkShufflingDataRewriter {
this.varLengthContribution = varLengthContribution(options);
}
+ @Override
+ protected SortOrder sortOrder() {
+ return Z_SORT_ORDER;
+ }
+
+ /**
+ * Overrides the sortSchema method to include columns from Z_SCHEMA.
+ *
+ * <p>This method generates a new Schema object which consists of columns
from the original table
+ * schema and Z_SCHEMA.
+ */
+ @Override
+ protected Schema sortSchema() {
+ return new Schema(
+ new ImmutableList.Builder<Types.NestedField>()
+ .addAll(table().schema().columns())
+ .addAll(Z_SCHEMA.columns())
+ .build());
+ }
+
@Override
protected Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group) {
Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zValue(df));
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index f17e8c6ed8..cfa1c9da95 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -54,6 +54,7 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
@@ -105,6 +106,7 @@ import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -1384,6 +1386,130 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Assert.assertNotEquals("Number of files order should not be ascending",
actual, expected);
}
+ @Test
+ public void testBinPackRewriterWithSpecificUnparitionedOutputSpec() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .binPack()
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ @Test
+ public void testBinPackRewriterWithSpecificOutputSpec() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .binPack()
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ @Test
+ public void testBinpackRewriteWithInvalidOutputSpecId() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ Assertions.assertThatThrownBy(
+ () ->
+ actions()
+ .rewriteDataFiles(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(1234))
+ .binPack()
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot use output spec id 1234 because the table does not contain
a reference to this spec-id.");
+ }
+
+ @Test
+ public void testSortRewriterWithSpecificOutputSpecId() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+
.sort(SortOrder.builderFor(table.schema()).asc("c2").asc("c3").build())
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ @Test
+ public void testZOrderRewriteWithSpecificOutputSpecId() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .zOrder("c2", "c3")
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int
outputSpecId) {
+ List<DataFile> rewrittenFiles = currentDataFiles(table);
+ assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId);
+ assertThat(rewrittenFiles)
+ .allMatch(
+ file ->
+ ((PartitionData) file.partition())
+ .getPartitionType()
+ .equals(table.specs().get(outputSpecId).partitionType()));
+ }
+
+ protected List<DataFile> currentDataFiles(Table table) {
+ return Streams.stream(table.newScan().planFiles())
+ .map(FileScanTask::file)
+ .collect(Collectors.toList());
+ }
+
private Stream<RewriteFileGroup> toGroupStream(Table table,
RewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 6b5628a1f4..7c516b9675 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -81,7 +81,8 @@ public class RewriteDataFilesSparkAction
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
- REWRITE_JOB_ORDER);
+ REWRITE_JOB_ORDER,
+ OUTPUT_SPEC_ID);
private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
index 9a96f44ebd..d256bf2794 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
@@ -58,13 +58,14 @@ class SparkBinPackDataRewriter extends
SparkSizeBasedDataRewriter {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.DISTRIBUTION_MODE,
distributionMode(group).modeName())
+ .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}
// invoke a shuffle if the original spec does not match the output spec
private DistributionMode distributionMode(List<FileScanTask> group) {
- boolean requiresRepartition = !group.get(0).spec().equals(table().spec());
+ boolean requiresRepartition = !group.get(0).spec().equals(outputSpec());
return requiresRepartition ? DistributionMode.RANGE :
DistributionMode.NONE;
}
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
index c9c962526e..ce572c6486 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
@@ -23,6 +23,8 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -86,6 +88,16 @@ abstract class SparkShufflingDataRewriter extends
SparkSizeBasedDataRewriter {
protected abstract org.apache.iceberg.SortOrder sortOrder();
+ /**
+ * Retrieves and returns the schema for the rewrite using the current table
schema.
+ *
+ * <p>The schema with all columns required for correctly sorting the table.
This may include
+ * additional computed columns which are not written to the table but are
used for sorting.
+ */
+ protected Schema sortSchema() {
+ return table().schema();
+ }
+
protected abstract Dataset<Row> sortedDF(
Dataset<Row> df, Function<Dataset<Row>, Dataset<Row>> sortFunc);
@@ -122,6 +134,7 @@ abstract class SparkShufflingDataRewriter extends
SparkSizeBasedDataRewriter {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+ .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}
@@ -152,11 +165,12 @@ abstract class SparkShufflingDataRewriter extends
SparkSizeBasedDataRewriter {
}
private org.apache.iceberg.SortOrder outputSortOrder(List<FileScanTask>
group) {
- boolean includePartitionColumns =
!group.get(0).spec().equals(table().spec());
- if (includePartitionColumns) {
+ PartitionSpec spec = outputSpec();
+ boolean requiresRepartitioning = !group.get(0).spec().equals(spec);
+ if (requiresRepartitioning) {
// build in the requirement for partition sorting into our sort order
// as the original spec for this group does not match the output spec
- return SortOrderUtil.buildSortOrder(table(), sortOrder());
+ return SortOrderUtil.buildSortOrder(sortSchema(), spec, sortOrder());
} else {
return sortOrder();
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
index 9a618661fe..cc4fb78ebd 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkUtil;
@@ -108,6 +109,21 @@ class SparkZOrderDataRewriter extends
SparkShufflingDataRewriter {
return Z_SORT_ORDER;
}
+ /**
+ * Overrides the sortSchema method to include columns from Z_SCHEMA.
+ *
+ * <p>This method generates a new Schema object which consists of columns
from the original table
+ * schema and Z_SCHEMA.
+ */
+ @Override
+ protected Schema sortSchema() {
+ return new Schema(
+ new ImmutableList.Builder<Types.NestedField>()
+ .addAll(table().schema().columns())
+ .addAll(Z_SCHEMA.columns())
+ .build());
+ }
+
@Override
protected Dataset<Row> sortedDF(Dataset<Row> df, Function<Dataset<Row>,
Dataset<Row>> sortFunc) {
Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zValue(df));
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index bfffa65acc..0eea495323 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -53,6 +53,7 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
@@ -1426,6 +1427,130 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Assert.assertNotEquals("Number of files order should not be ascending",
actual, expected);
}
+ @Test
+ public void testBinPackRewriterWithSpecificUnparitionedOutputSpec() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .binPack()
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ @Test
+ public void testBinPackRewriterWithSpecificOutputSpec() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .binPack()
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ @Test
+ public void testBinpackRewriteWithInvalidOutputSpecId() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ Assertions.assertThatThrownBy(
+ () ->
+ actions()
+ .rewriteDataFiles(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(1234))
+ .binPack()
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot use output spec id 1234 because the table does not contain
a reference to this spec-id.");
+ }
+
+ @Test
+ public void testSortRewriterWithSpecificOutputSpecId() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+
.sort(SortOrder.builderFor(table.schema()).asc("c2").asc("c3").build())
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ @Test
+ public void testZOrderRewriteWithSpecificOutputSpecId() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .zOrder("c2", "c3")
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int
outputSpecId) {
+ List<DataFile> rewrittenFiles = currentDataFiles(table);
+ assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId);
+ assertThat(rewrittenFiles)
+ .allMatch(
+ file ->
+ ((PartitionData) file.partition())
+ .getPartitionType()
+ .equals(table.specs().get(outputSpecId).partitionType()));
+ }
+
+ protected List<DataFile> currentDataFiles(Table table) {
+ return Streams.stream(table.newScan().planFiles())
+ .map(FileScanTask::file)
+ .collect(Collectors.toList());
+ }
+
private Stream<RewriteFileGroup> toGroupStream(Table table,
RewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =