This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 008d1731cb Spark 3.5: Propagate snapshot properties in compaction
(#9449)
008d1731cb is described below
commit 008d1731cb0d9c2dfbcab640633651dc920953ea
Author: advancedxy <[email protected]>
AuthorDate: Fri Jan 19 03:35:48 2024 +0800
Spark 3.5: Propagate snapshot properties in compaction (#9449)
---
.../actions/RewriteDataFilesCommitManager.java | 14 +++++++++
.../RewritePositionDeletesCommitManager.java | 10 +++++++
.../actions/BaseSnapshotUpdateSparkAction.java | 5 ++++
.../spark/actions/RewriteDataFilesSparkAction.java | 3 +-
.../RewritePositionDeleteFilesSparkAction.java | 2 +-
.../spark/actions/TestRewriteDataFilesAction.java | 18 ++++++++++++
.../TestRewritePositionDeleteFilesAction.java | 34 ++++++++++++++++++++++
7 files changed, 84 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
index 265b5c5c27..7f89db467d 100644
---
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
+++
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
@@ -18,12 +18,14 @@
*/
package org.apache.iceberg.actions;
+import java.util.Map;
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
@@ -36,6 +38,7 @@ public class RewriteDataFilesCommitManager {
private final Table table;
private final long startingSnapshotId;
private final boolean useStartingSequenceNumber;
+ private final Map<String, String> snapshotProperties;
// constructor used for testing
public RewriteDataFilesCommitManager(Table table) {
@@ -48,9 +51,18 @@ public class RewriteDataFilesCommitManager {
public RewriteDataFilesCommitManager(
Table table, long startingSnapshotId, boolean useStartingSequenceNumber)
{
+ this(table, startingSnapshotId, useStartingSequenceNumber,
ImmutableMap.of());
+ }
+
+ public RewriteDataFilesCommitManager(
+ Table table,
+ long startingSnapshotId,
+ boolean useStartingSequenceNumber,
+ Map<String, String> snapshotProperties) {
this.table = table;
this.startingSnapshotId = startingSnapshotId;
this.useStartingSequenceNumber = useStartingSequenceNumber;
+ this.snapshotProperties = snapshotProperties;
}
/**
@@ -75,6 +87,8 @@ public class RewriteDataFilesCommitManager {
rewrite.rewriteFiles(rewrittenDataFiles, addedDataFiles);
}
+ snapshotProperties.forEach(rewrite::set);
+
rewrite.commit();
}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
index c55532692e..01b2f7528e 100644
---
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
+++
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.actions;
+import java.util.Map;
import java.util.Set;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DeleteFile;
@@ -25,6 +26,7 @@ import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +41,16 @@ public class RewritePositionDeletesCommitManager {
private final Table table;
private final long startingSnapshotId;
+ private final Map<String, String> snapshotProperties;
public RewritePositionDeletesCommitManager(Table table) {
+ this(table, ImmutableMap.of());
+ }
+
+ public RewritePositionDeletesCommitManager(Table table, Map<String, String>
snapshotProperties) {
this.table = table;
this.startingSnapshotId = table.currentSnapshot().snapshotId();
+ this.snapshotProperties = snapshotProperties;
}
/**
@@ -64,6 +72,8 @@ public class RewritePositionDeletesCommitManager {
}
}
+ snapshotProperties.forEach(rewriteFiles::set);
+
rewriteFiles.commit();
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
index 77debe1e58..b69b80a8d3 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.actions;
import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.spark.sql.SparkSession;
@@ -39,4 +40,8 @@ abstract class BaseSnapshotUpdateSparkAction<ThisT> extends
BaseSparkAction<This
summary.forEach(update::set);
update.commit();
}
+
+ protected Map<String, String> commitSummary() {
+ return ImmutableMap.copyOf(summary);
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 6b5628a1f4..a2a585db78 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -257,7 +257,8 @@ public class RewriteDataFilesSparkAction
@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
- return new RewriteDataFilesCommitManager(table, startingSnapshotId,
useStartingSequenceNumber);
+ return new RewriteDataFilesCommitManager(
+ table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
}
private Result doExecute(
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index f3dfd2dcc3..539f6de920 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -215,7 +215,7 @@ public class RewritePositionDeleteFilesSparkAction
}
private RewritePositionDeletesCommitManager commitManager() {
- return new RewritePositionDeletesCommitManager(table);
+ return new RewritePositionDeletesCommitManager(table, commitSummary());
}
private Result doExecute(
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index d0d22e46ff..82b32f2ce0 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -60,6 +60,7 @@ import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
@@ -1445,6 +1446,23 @@ public class TestRewriteDataFilesAction extends TestBase
{
assertThat(actual).as("Number of files order should not be
ascending").isNotEqualTo(expected);
}
+ @Test
+ public void testSnapshotProperty() {
+ Table table = createTable(4);
+ Result ignored = basicRewrite(table).snapshotProperty("key",
"value").execute();
+ assertThat(table.currentSnapshot().summary())
+ .containsAllEntriesOf(ImmutableMap.of("key", "value"));
+ // make sure internal produced properties are not lost
+ String[] commitMetricsKeys =
+ new String[] {
+ SnapshotSummary.ADDED_FILES_PROP,
+ SnapshotSummary.DELETED_FILES_PROP,
+ SnapshotSummary.TOTAL_DATA_FILES_PROP,
+ SnapshotSummary.CHANGED_PARTITION_COUNT_PROP
+ };
+
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
+ }
+
private Stream<RewriteFileGroup> toGroupStream(Table table,
RewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 7c55ff82df..89c44dbfcc 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -613,6 +614,39 @@ public class TestRewritePositionDeleteFilesAction extends
CatalogTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
+ @TestTemplate
+ public void testSnapshotProperty() throws Exception {
+ Table table = createTableUnpartitioned(2, SCALE);
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
+ assertThat(dataFiles).hasSize(2);
+
+ List<DeleteFile> deleteFiles = deleteFiles(table);
+ assertThat(deleteFiles).hasSize(2);
+
+ Result ignored =
+ SparkActions.get(spark)
+ .rewritePositionDeletes(table)
+ .snapshotProperty("key", "value")
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .execute();
+ assertThat(table.currentSnapshot().summary())
+ .containsAllEntriesOf(ImmutableMap.of("key", "value"));
+
+ // make sure internal produced properties are not lost
+ String[] commitMetricsKeys =
+ new String[] {
+ SnapshotSummary.ADDED_DELETE_FILES_PROP,
+ SnapshotSummary.ADDED_POS_DELETES_PROP,
+ SnapshotSummary.CHANGED_PARTITION_COUNT_PROP,
+ SnapshotSummary.REMOVED_DELETE_FILES_PROP,
+ SnapshotSummary.REMOVED_POS_DELETES_PROP,
+ SnapshotSummary.TOTAL_DATA_FILES_PROP,
+ SnapshotSummary.TOTAL_DELETE_FILES_PROP,
+ };
+
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
+ }
+
private Table createTablePartitioned(int partitions, int files, int
numRecords) {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
Table table =