This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 99b1d0ee4c Spark 3.5: Add Support for Providing output-spec-id During
Rewrite Datafiles
99b1d0ee4c is described below
commit 99b1d0ee4c8fb67d457c4d7a03065e4cb63af435
Author: Himadri Pal <[email protected]>
AuthorDate: Wed Mar 6 21:58:54 2024 -0800
Spark 3.5: Add Support for Providing output-spec-id During Rewrite Datafiles
---
.../apache/iceberg/actions/RewriteDataFiles.java | 9 ++
.../iceberg/actions/SizeBasedFileRewriter.java | 22 ++++
.../spark/actions/RewriteDataFilesSparkAction.java | 3 +-
.../spark/actions/SparkBinPackDataRewriter.java | 3 +-
.../spark/actions/SparkShufflingDataRewriter.java | 20 +++-
.../spark/actions/SparkZOrderDataRewriter.java | 16 +++
.../spark/actions/TestRewriteDataFilesAction.java | 126 +++++++++++++++++++++
7 files changed, 194 insertions(+), 5 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index 854b099351..40dc24318c 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -119,6 +119,15 @@ public interface RewriteDataFiles
String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName();
+ /**
+ * The partition specification ID to be used for rewritten files
+ *
+ * <p>output-spec-id ID is used by the file rewriter during the rewrite
operation to identify the
+ * specific output partition spec. Data will be reorganized during the
rewrite to align with the
+ * output partitioning. Defaults to the current table specification.
+ */
+ String OUTPUT_SPEC_ID = "output-spec-id";
+
/**
* Choose BINPACK as a strategy for this rewrite operation
*
diff --git
a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
index cf98c5266a..fb3c27220c 100644
--- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -111,6 +112,8 @@ public abstract class SizeBasedFileRewriter<T extends
ContentScanTask<F>, F exte
private boolean rewriteAll;
private long maxGroupSize;
+ private int outputSpecId;
+
protected SizeBasedFileRewriter(Table table) {
this.table = table;
}
@@ -146,6 +149,7 @@ public abstract class SizeBasedFileRewriter<T extends
ContentScanTask<F>, F exte
this.minInputFiles = minInputFiles(options);
this.rewriteAll = rewriteAll(options);
this.maxGroupSize = maxGroupSize(options);
+ this.outputSpecId = outputSpecId(options);
if (rewriteAll) {
LOG.info("Configured to rewrite all provided files in table {}",
table.name());
@@ -258,6 +262,24 @@ public abstract class SizeBasedFileRewriter<T extends
ContentScanTask<F>, F exte
return (long) (targetFileSize + ((maxFileSize - targetFileSize) * 0.5));
}
+ protected PartitionSpec outputSpec() {
+ return table.specs().get(outputSpecId);
+ }
+
+ protected int outputSpecId() {
+ return outputSpecId;
+ }
+
+ private int outputSpecId(Map<String, String> options) {
+ int specId =
+ PropertyUtil.propertyAsInt(options, RewriteDataFiles.OUTPUT_SPEC_ID,
table.spec().specId());
+ Preconditions.checkArgument(
+ table.specs().containsKey(specId),
+ "Cannot use output spec id %s because the table does not contain a
reference to this spec-id.",
+ specId);
+ return specId;
+ }
+
private Map<String, Long> sizeThresholds(Map<String, String> options) {
long target =
PropertyUtil.propertyAsLong(options, TARGET_FILE_SIZE_BYTES,
defaultTargetFileSize());
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index a2a585db78..bf1a901dbd 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -81,7 +81,8 @@ public class RewriteDataFilesSparkAction
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
- REWRITE_JOB_ORDER);
+ REWRITE_JOB_ORDER,
+ OUTPUT_SPEC_ID);
private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
index 9a96f44ebd..d256bf2794 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
@@ -58,13 +58,14 @@ class SparkBinPackDataRewriter extends
SparkSizeBasedDataRewriter {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.DISTRIBUTION_MODE,
distributionMode(group).modeName())
+ .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}
// invoke a shuffle if the original spec does not match the output spec
private DistributionMode distributionMode(List<FileScanTask> group) {
- boolean requiresRepartition = !group.get(0).spec().equals(table().spec());
+ boolean requiresRepartition = !group.get(0).spec().equals(outputSpec());
return requiresRepartition ? DistributionMode.RANGE :
DistributionMode.NONE;
}
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
index c9c962526e..ce572c6486 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
@@ -23,6 +23,8 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -86,6 +88,16 @@ abstract class SparkShufflingDataRewriter extends
SparkSizeBasedDataRewriter {
protected abstract org.apache.iceberg.SortOrder sortOrder();
+ /**
+ * Retrieves and returns the schema for the rewrite using the current table
schema.
+ *
+ * <p>The schema with all columns required for correctly sorting the table.
This may include
+ * additional computed columns which are not written to the table but are
used for sorting.
+ */
+ protected Schema sortSchema() {
+ return table().schema();
+ }
+
protected abstract Dataset<Row> sortedDF(
Dataset<Row> df, Function<Dataset<Row>, Dataset<Row>> sortFunc);
@@ -122,6 +134,7 @@ abstract class SparkShufflingDataRewriter extends
SparkSizeBasedDataRewriter {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+ .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}
@@ -152,11 +165,12 @@ abstract class SparkShufflingDataRewriter extends
SparkSizeBasedDataRewriter {
}
private org.apache.iceberg.SortOrder outputSortOrder(List<FileScanTask>
group) {
- boolean includePartitionColumns =
!group.get(0).spec().equals(table().spec());
- if (includePartitionColumns) {
+ PartitionSpec spec = outputSpec();
+ boolean requiresRepartitioning = !group.get(0).spec().equals(spec);
+ if (requiresRepartitioning) {
// build in the requirement for partition sorting into our sort order
// as the original spec for this group does not match the output spec
- return SortOrderUtil.buildSortOrder(table(), sortOrder());
+ return SortOrderUtil.buildSortOrder(sortSchema(), spec, sortOrder());
} else {
return sortOrder();
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
index 9a618661fe..cc4fb78ebd 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkUtil;
@@ -108,6 +109,21 @@ class SparkZOrderDataRewriter extends
SparkShufflingDataRewriter {
return Z_SORT_ORDER;
}
+ /**
+ * Overrides the sortSchema method to include columns from Z_SCHEMA.
+ *
+ * <p>This method generates a new Schema object which consists of columns
from the original table
+ * schema and Z_SCHEMA.
+ */
+ @Override
+ protected Schema sortSchema() {
+ return new Schema(
+ new ImmutableList.Builder<Types.NestedField>()
+ .addAll(table().schema().columns())
+ .addAll(Z_SCHEMA.columns())
+ .build());
+ }
+
@Override
protected Dataset<Row> sortedDF(Dataset<Row> df, Function<Dataset<Row>,
Dataset<Row>> sortFunc) {
Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zValue(df));
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 82b32f2ce0..500092c044 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -55,6 +55,7 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
@@ -108,6 +109,7 @@ import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.internal.SQLConf;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -1463,6 +1465,130 @@ public class TestRewriteDataFilesAction extends
TestBase {
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
}
+ @Test
+ public void testBinPackRewriterWithSpecificUnparitionedOutputSpec() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .binPack()
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ @Test
+ public void testBinPackRewriterWithSpecificOutputSpec() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .binPack()
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ @Test
+ public void testBinpackRewriteWithInvalidOutputSpecId() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ Assertions.assertThatThrownBy(
+ () ->
+ actions()
+ .rewriteDataFiles(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(1234))
+ .binPack()
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot use output spec id 1234 because the table does not contain
a reference to this spec-id.");
+ }
+
+ @Test
+ public void testSortRewriterWithSpecificOutputSpecId() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+
.sort(SortOrder.builderFor(table.schema()).asc("c2").asc("c3").build())
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ @Test
+ public void testZOrderRewriteWithSpecificOutputSpecId() {
+ Table table = createTable(10);
+ shouldHaveFiles(table, 10);
+ table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
+ int outputSpecId = table.spec().specId();
+ table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();
+
+ long dataSizeBefore = testDataSize(table);
+ long count = currentData().size();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.OUTPUT_SPEC_ID,
String.valueOf(outputSpecId))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .zOrder("c2", "c3")
+ .execute();
+
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
+ assertThat(currentData().size()).isEqualTo(count);
+ shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
+ }
+
+ protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int
outputSpecId) {
+ List<DataFile> rewrittenFiles = currentDataFiles(table);
+ assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId);
+ assertThat(rewrittenFiles)
+ .allMatch(
+ file ->
+ ((PartitionData) file.partition())
+ .getPartitionType()
+ .equals(table.specs().get(outputSpecId).partitionType()));
+ }
+
+ protected List<DataFile> currentDataFiles(Table table) {
+ return Streams.stream(table.newScan().planFiles())
+ .map(FileScanTask::file)
+ .collect(Collectors.toList());
+ }
+
private Stream<RewriteFileGroup> toGroupStream(Table table,
RewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =