This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new a8ec43d975 Core, Spark 3.5: Remove dangling deletes as part of
RewriteDataFilesAction (#9724)
a8ec43d975 is described below
commit a8ec43d975d8d3bbacb6880b487208b00cb7361f
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Tue Oct 22 15:28:57 2024 -0700
Core, Spark 3.5: Remove dangling deletes as part of RewriteDataFilesAction
(#9724)
---
.../apache/iceberg/actions/ActionsProvider.java | 6 +
.../iceberg/actions/RemoveDanglingDeleteFiles.java | 35 ++
.../apache/iceberg/actions/RewriteDataFiles.java | 16 +
.../actions/BaseRemoveDanglingDeleteFiles.java | 33 ++
.../iceberg/actions/BaseRewriteDataFiles.java | 6 +
.../org/apache/iceberg/spark/SparkContentFile.java | 7 +-
.../actions/RemoveDanglingDeletesSparkAction.java | 179 +++++++++
.../spark/actions/RewriteDataFilesSparkAction.java | 34 +-
.../apache/iceberg/spark/actions/SparkActions.java | 6 +
.../actions/TestRemoveDanglingDeleteAction.java | 447 +++++++++++++++++++++
.../spark/actions/TestRewriteDataFilesAction.java | 215 +++++++++-
.../TestRewritePositionDeleteFilesAction.java | 3 +-
12 files changed, 974 insertions(+), 13 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
index bcc77b25d6..61750d83fc 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
@@ -82,4 +82,10 @@ public interface ActionsProvider {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewriteTablePath");
}
+
+ /** Instantiates an action to remove dangling delete files from current
snapshot. */
+ default RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement
removeDanglingDeleteFiles");
+ }
}
diff --git
a/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java
b/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java
new file mode 100644
index 0000000000..b0ef0d5e35
--- /dev/null
+++
b/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import org.apache.iceberg.DeleteFile;
+
+/**
+ * An action that removes dangling delete files from the current snapshot. A
delete file is dangling
+ * if its deletes no longer applies to any live data files.
+ */
+public interface RemoveDanglingDeleteFiles
+ extends Action<RemoveDanglingDeleteFiles,
RemoveDanglingDeleteFiles.Result> {
+
+ /** An action that remove dangling deletes. */
+ interface Result {
+ /** Return removed deletes. */
+ Iterable<DeleteFile> removedDeleteFiles();
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index f6ef402708..589b901774 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -106,6 +106,18 @@ public interface RewriteDataFiles
boolean USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true;
+ /**
+ * Remove dangling delete files from the current snapshot after compaction.
A delete file is
+ * considered dangling if it does not apply to any live data files.
+ *
+ * <p>Both equality and position dangling delete files will be removed.
+ *
+ * <p>Defaults to false.
+ */
+ String REMOVE_DANGLING_DELETES = "remove-dangling-deletes";
+
+ boolean REMOVE_DANGLING_DELETES_DEFAULT = false;
+
/**
* Forces the rewrite job order based on the value.
*
@@ -216,6 +228,10 @@ public interface RewriteDataFiles
default int failedDataFilesCount() {
return
rewriteFailures().stream().mapToInt(FileGroupFailureResult::dataFilesCount).sum();
}
+
+ default int removedDeleteFilesCount() {
+ return 0;
+ }
}
/**
diff --git
a/core/src/main/java/org/apache/iceberg/actions/BaseRemoveDanglingDeleteFiles.java
b/core/src/main/java/org/apache/iceberg/actions/BaseRemoveDanglingDeleteFiles.java
new file mode 100644
index 0000000000..3b5ce9e79a
--- /dev/null
+++
b/core/src/main/java/org/apache/iceberg/actions/BaseRemoveDanglingDeleteFiles.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import org.immutables.value.Value;
+
[email protected]
+@SuppressWarnings("ImmutablesStyle")
[email protected](
+ typeImmutableEnclosing = "ImmutableRemoveDanglingDeleteFiles",
+ visibilityString = "PUBLIC",
+ builderVisibilityString = "PUBLIC")
+interface BaseRemoveDanglingDeleteFiles extends RemoveDanglingDeleteFiles {
+
+ @Value.Immutable
+ interface Result extends RemoveDanglingDeleteFiles.Result {}
+}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFiles.java
b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFiles.java
index 953439484a..2faa1f1b75 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFiles.java
@@ -55,6 +55,12 @@ interface BaseRewriteDataFiles extends RewriteDataFiles {
return RewriteDataFiles.Result.super.rewrittenBytesCount();
}
+ @Override
+ @Value.Default
+ default int removedDeleteFilesCount() {
+ return RewriteDataFiles.Result.super.removedDeleteFilesCount();
+ }
+
@Override
@Value.Default
default int failedDataFilesCount() {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
index f756c4cde0..99586f2503 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
@@ -52,6 +52,7 @@ public abstract class SparkContentFile<F> implements
ContentFile<F> {
private final int keyMetadataPosition;
private final int splitOffsetsPosition;
private final int sortOrderIdPosition;
+ private final int fileSpecIdPosition;
private final int equalityIdsPosition;
private final Type lowerBoundsType;
private final Type upperBoundsType;
@@ -100,6 +101,7 @@ public abstract class SparkContentFile<F> implements
ContentFile<F> {
this.keyMetadataPosition = positions.get(DataFile.KEY_METADATA.name());
this.splitOffsetsPosition = positions.get(DataFile.SPLIT_OFFSETS.name());
this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
+ this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name());
this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
}
@@ -120,7 +122,10 @@ public abstract class SparkContentFile<F> implements
ContentFile<F> {
@Override
public int specId() {
- return -1;
+ if (wrapped.isNullAt(fileSpecIdPosition)) {
+ return -1;
+ }
+ return wrapped.getAs(fileSpecIdPosition);
}
@Override
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java
new file mode 100644
index 0000000000..bbf65f58e1
--- /dev/null
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.min;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ImmutableRemoveDanglingDeleteFiles;
+import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDeleteFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An action that removes dangling delete files from the current snapshot. A
delete file is dangling
+ * if its deletes no longer applies to any live data files.
+ *
+ * <p>The following dangling delete files are removed:
+ *
+ * <ul>
+ * <li>Position delete files with a data sequence number less than that of
any data file in the
+ * same partition
+ * <li>Equality delete files with a data sequence number less than or equal
to that of any data
+ * file in the same partition
+ * </ul>
+ */
+class RemoveDanglingDeletesSparkAction
+ extends BaseSnapshotUpdateSparkAction<RemoveDanglingDeletesSparkAction>
+ implements RemoveDanglingDeleteFiles {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoveDanglingDeletesSparkAction.class);
+ private final Table table;
+
+ protected RemoveDanglingDeletesSparkAction(SparkSession spark, Table table) {
+ super(spark);
+ this.table = table;
+ }
+
+ @Override
+ protected RemoveDanglingDeletesSparkAction self() {
+ return this;
+ }
+
+ public Result execute() {
+ if (table.specs().size() == 1 && table.spec().isUnpartitioned()) {
+ // ManifestFilterManager already performs this table-wide delete on each
commit
+ return ImmutableRemoveDanglingDeleteFiles.Result.builder()
+ .removedDeleteFiles(Collections.emptyList())
+ .build();
+ }
+
+ String desc = String.format("Removing dangling delete files in %s",
table.name());
+ JobGroupInfo info = newJobGroupInfo("REMOVE-DELETES", desc);
+ return withJobGroupInfo(info, this::doExecute);
+ }
+
+ Result doExecute() {
+ RewriteFiles rewriteFiles = table.newRewrite();
+ List<DeleteFile> danglingDeletes = findDanglingDeletes();
+ for (DeleteFile deleteFile : danglingDeletes) {
+ LOG.debug("Removing dangling delete file {}", deleteFile.path());
+ rewriteFiles.deleteFile(deleteFile);
+ }
+
+ if (!danglingDeletes.isEmpty()) {
+ commit(rewriteFiles);
+ }
+
+ return ImmutableRemoveDanglingDeleteFiles.Result.builder()
+ .removedDeleteFiles(danglingDeletes)
+ .build();
+ }
+
+ /**
+ * Dangling delete files can be identified with following steps
+ *
+ * <ol>
+ * <li>Group data files by partition keys and find the minimum data
sequence number in each
+ * group.
+ * <li>Left outer join delete files with partition-grouped data files on
partition keys.
+ * <li>Find dangling deletes by comparing each delete file's sequence
number to its partition's
+ * minimum data sequence number.
+ * <li>Collect results row to driver and use {@link SparkDeleteFile
SparkDeleteFile} to wrap
+ * rows to valid delete files
+ * </ol>
+ */
+ private List<DeleteFile> findDanglingDeletes() {
+ Dataset<Row> minSequenceNumberByPartition =
+ loadMetadataTable(table, MetadataTableType.ENTRIES)
+ // find live data files
+ .filter("data_file.content == 0 AND status < 2")
+ .selectExpr(
+ "data_file.partition as partition",
+ "data_file.spec_id as spec_id",
+ "sequence_number")
+ .groupBy("partition", "spec_id")
+ .agg(min("sequence_number"))
+ .toDF("grouped_partition", "grouped_spec_id",
"min_data_sequence_number");
+
+ Dataset<Row> deleteEntries =
+ loadMetadataTable(table, MetadataTableType.ENTRIES)
+ // find live delete files
+ .filter("data_file.content != 0 AND status < 2");
+
+ Column joinOnPartition =
+ deleteEntries
+ .col("data_file.spec_id")
+ .equalTo(minSequenceNumberByPartition.col("grouped_spec_id"))
+ .and(
+ deleteEntries
+ .col("data_file.partition")
+
.equalTo(minSequenceNumberByPartition.col("grouped_partition")));
+
+ Column filterOnDanglingDeletes =
+ col("min_data_sequence_number")
+ // delete fies without any data files in partition
+ .isNull()
+ // position delete files without any applicable data files in
partition
+ .or(
+ col("data_file.content")
+ .equalTo("1")
+
.and(col("sequence_number").$less(col("min_data_sequence_number"))))
+ // equality delete files without any applicable data files in the
partition
+ .or(
+ col("data_file.content")
+ .equalTo("2")
+
.and(col("sequence_number").$less$eq(col("min_data_sequence_number"))));
+
+ Dataset<Row> danglingDeletes =
+ deleteEntries
+ .join(minSequenceNumberByPartition, joinOnPartition, "left")
+ .filter(filterOnDanglingDeletes)
+ .select("data_file.*");
+ return danglingDeletes.collectAsList().stream()
+ // map on driver because SparkDeleteFile is not serializable
+ .map(row -> deleteFileWrapper(danglingDeletes.schema(), row))
+ .collect(Collectors.toList());
+ }
+
+ private DeleteFile deleteFileWrapper(StructType sparkFileType, Row row) {
+ int specId = row.getInt(row.fieldIndex("spec_id"));
+ Types.StructType combinedFileType =
DataFile.getType(Partitioning.partitionType(table));
+ // Set correct spec id
+ Types.StructType projection =
DataFile.getType(table.specs().get(specId).partitionType());
+ return new SparkDeleteFile(combinedFileType, projection,
sparkFileType).wrap(row);
+ }
+}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index d33e5e5408..4e381a7bd3 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.FileRewriter;
import org.apache.iceberg.actions.ImmutableRewriteDataFiles;
+import org.apache.iceberg.actions.ImmutableRewriteDataFiles.Result.Builder;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
@@ -53,6 +54,7 @@ import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
@@ -83,7 +85,8 @@ public class RewriteDataFilesSparkAction
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER,
- OUTPUT_SPEC_ID);
+ OUTPUT_SPEC_ID,
+ REMOVE_DANGLING_DELETES);
private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
@@ -95,6 +98,7 @@ public class RewriteDataFilesSparkAction
private int maxCommits;
private int maxFailedCommits;
private boolean partialProgressEnabled;
+ private boolean removeDanglingDeletes;
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private FileRewriter<FileScanTask, DataFile> rewriter = null;
@@ -175,11 +179,18 @@ public class RewriteDataFilesSparkAction
Stream<RewriteFileGroup> groupStream = toGroupStream(ctx,
fileGroupsByPartition);
- if (partialProgressEnabled) {
- return doExecuteWithPartialProgress(ctx, groupStream,
commitManager(startingSnapshotId));
- } else {
- return doExecute(ctx, groupStream, commitManager(startingSnapshotId));
+ Builder resultBuilder =
+ partialProgressEnabled
+ ? doExecuteWithPartialProgress(ctx, groupStream,
commitManager(startingSnapshotId))
+ : doExecute(ctx, groupStream, commitManager(startingSnapshotId));
+
+ if (removeDanglingDeletes) {
+ RemoveDanglingDeletesSparkAction action =
+ new RemoveDanglingDeletesSparkAction(spark(), table);
+ int removedCount = Iterables.size(action.execute().removedDeleteFiles());
+ resultBuilder.removedDeleteFilesCount(removedCount);
}
+ return resultBuilder.build();
}
StructLikeMap<List<List<FileScanTask>>> planFileGroups(long
startingSnapshotId) {
@@ -264,7 +275,7 @@ public class RewriteDataFilesSparkAction
table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
}
- private Result doExecute(
+ private Builder doExecute(
RewriteExecutionContext ctx,
Stream<RewriteFileGroup> groupStream,
RewriteDataFilesCommitManager commitManager) {
@@ -326,10 +337,10 @@ public class RewriteDataFilesSparkAction
List<FileGroupRewriteResult> rewriteResults =
rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
- return
ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build();
+ return
ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults);
}
- private Result doExecuteWithPartialProgress(
+ private Builder doExecuteWithPartialProgress(
RewriteExecutionContext ctx,
Stream<RewriteFileGroup> groupStream,
RewriteDataFilesCommitManager commitManager) {
@@ -386,8 +397,7 @@ public class RewriteDataFilesSparkAction
return ImmutableRewriteDataFiles.Result.builder()
.rewriteResults(toRewriteResults(commitService.results()))
- .rewriteFailures(rewriteFailures)
- .build();
+ .rewriteFailures(rewriteFailures);
}
Stream<RewriteFileGroup> toGroupStream(
@@ -456,6 +466,10 @@ public class RewriteDataFilesSparkAction
PropertyUtil.propertyAsBoolean(
options(), USE_STARTING_SEQUENCE_NUMBER,
USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
+ removeDanglingDeletes =
+ PropertyUtil.propertyAsBoolean(
+ options(), REMOVE_DANGLING_DELETES,
REMOVE_DANGLING_DELETES_DEFAULT);
+
rewriteJobOrder =
RewriteJobOrder.fromName(
PropertyUtil.propertyAsString(options(), REWRITE_JOB_ORDER,
REWRITE_JOB_ORDER_DEFAULT));
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index f845386d30..ba9fa2e7b4 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.actions;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.spark.sql.SparkSession;
@@ -102,4 +103,9 @@ public class SparkActions implements ActionsProvider {
public ComputeTableStats computeTableStats(Table table) {
return new ComputeTableStatsSparkAction(spark, table);
}
+
+ @Override
+ public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
+ return new RemoveDanglingDeletesSparkAction(spark, table);
+ }
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
new file mode 100644
index 0000000000..e15b2fb217
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Encoders;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import scala.Tuple2;
+
+public class TestRemoveDanglingDeleteAction extends TestBase {
+
+ private static final HadoopTables TABLES = new HadoopTables(new
Configuration());
+ private static final Schema SCHEMA =
+ new Schema(
+ optional(1, "c1", Types.StringType.get()),
+ optional(2, "c2", Types.StringType.get()),
+ optional(3, "c3", Types.StringType.get()));
+
+ private static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+
+ static final DataFile FILE_A =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_A2 =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_B =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_B2 =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_C =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-c.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=c") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_C2 =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-c.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=c") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_D =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-d.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=d") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DataFile FILE_D2 =
+ DataFiles.builder(SPEC)
+ .withPath("/path/to/data-d.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=d") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_A_POS_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-a-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_A2_POS_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-a2-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_A_EQ_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-a-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_A2_EQ_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-a2-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=a") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_B_POS_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-b-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_B2_POS_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-b2-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_B_EQ_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-b-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_B2_EQ_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-b2-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=b") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+
+ static final DataFile FILE_UNPARTITIONED =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-unpartitioned.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_UNPARTITIONED_POS_DELETE =
+ FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-unpartitioned-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ static final DeleteFile FILE_UNPARTITIONED_EQ_DELETE =
+ FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+ .ofEqualityDeletes()
+ .withPath("/path/to/data-unpartitioned-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ @TempDir private Path temp;
+
+ private String tableLocation = null;
+ private Table table;
+
+ @BeforeEach
+ public void before() throws Exception {
+ File tableDir = temp.resolve("junit").toFile();
+ this.tableLocation = tableDir.toURI().toString();
+ }
+
+ @AfterEach
+ public void after() {
+ TABLES.dropTable(tableLocation);
+ }
+
+ private void setupPartitionedTable() {
+ this.table =
+ TABLES.create(
+ SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION,
"2"), tableLocation);
+ }
+
+ private void setupUnpartitionedTable() {
+ this.table =
+ TABLES.create(
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"),
+ tableLocation);
+ }
+
+ @Test
+ public void testPartitionedDeletesWithLesserSeqNo() {
+ setupPartitionedTable();
+
+ // Add Data Files
+
table.newAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+ // Add Delete Files
+ table
+ .newRowDelta()
+ .addDeletes(FILE_A_POS_DELETES)
+ .addDeletes(FILE_A2_POS_DELETES)
+ .addDeletes(FILE_B_POS_DELETES)
+ .addDeletes(FILE_B2_POS_DELETES)
+ .addDeletes(FILE_A_EQ_DELETES)
+ .addDeletes(FILE_A2_EQ_DELETES)
+ .addDeletes(FILE_B_EQ_DELETES)
+ .addDeletes(FILE_B2_EQ_DELETES)
+ .commit();
+
+ // Add More Data Files
+ table
+ .newAppend()
+ .appendFile(FILE_A2)
+ .appendFile(FILE_B2)
+ .appendFile(FILE_C2)
+ .appendFile(FILE_D2)
+ .commit();
+
+ List<Tuple2<Long, String>> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation + "#entries")
+ .select("sequence_number", "data_file.file_path")
+ .sort("sequence_number", "data_file.file_path")
+ .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+ .collectAsList();
+ List<Tuple2<Long, String>> expected =
+ ImmutableList.of(
+ Tuple2.apply(1L, FILE_B.path().toString()),
+ Tuple2.apply(1L, FILE_C.path().toString()),
+ Tuple2.apply(1L, FILE_D.path().toString()),
+ Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+ Tuple2.apply(3L, FILE_A2.path().toString()),
+ Tuple2.apply(3L, FILE_B2.path().toString()),
+ Tuple2.apply(3L, FILE_C2.path().toString()),
+ Tuple2.apply(3L, FILE_D2.path().toString()));
+ assertThat(actual).isEqualTo(expected);
+
+ RemoveDanglingDeleteFiles.Result result =
+ SparkActions.get().removeDanglingDeleteFiles(table).execute();
+
+ // All Delete files of the FILE A partition should be removed
+ // because there are no data files in partition with a lesser sequence
number
+
+ Set<CharSequence> removedDeleteFiles =
+ StreamSupport.stream(result.removedDeleteFiles().spliterator(), false)
+ .map(DeleteFile::path)
+ .collect(Collectors.toSet());
+ assertThat(removedDeleteFiles)
+ .as("Expected 4 delete files removed")
+ .hasSize(4)
+ .containsExactlyInAnyOrder(
+ FILE_A_POS_DELETES.path(),
+ FILE_A2_POS_DELETES.path(),
+ FILE_A_EQ_DELETES.path(),
+ FILE_A2_EQ_DELETES.path());
+
+ List<Tuple2<Long, String>> actualAfter =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation + "#entries")
+ .filter("status < 2") // live files
+ .select("sequence_number", "data_file.file_path")
+ .sort("sequence_number", "data_file.file_path")
+ .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+ .collectAsList();
+ List<Tuple2<Long, String>> expectedAfter =
+ ImmutableList.of(
+ Tuple2.apply(1L, FILE_B.path().toString()),
+ Tuple2.apply(1L, FILE_C.path().toString()),
+ Tuple2.apply(1L, FILE_D.path().toString()),
+ Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+ Tuple2.apply(3L, FILE_A2.path().toString()),
+ Tuple2.apply(3L, FILE_B2.path().toString()),
+ Tuple2.apply(3L, FILE_C2.path().toString()),
+ Tuple2.apply(3L, FILE_D2.path().toString()));
+ assertThat(actualAfter).isEqualTo(expectedAfter);
+ }
+
+ @Test
+ public void testPartitionedDeletesWithEqSeqNo() {
+ setupPartitionedTable();
+
+ // Add Data Files
+
table.newAppend().appendFile(FILE_A).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+ // Add Data Files with EQ and POS deletes
+ table
+ .newRowDelta()
+ .addRows(FILE_A2)
+ .addRows(FILE_B2)
+ .addRows(FILE_C2)
+ .addRows(FILE_D2)
+ .addDeletes(FILE_A_POS_DELETES)
+ .addDeletes(FILE_A2_POS_DELETES)
+ .addDeletes(FILE_A_EQ_DELETES)
+ .addDeletes(FILE_A2_EQ_DELETES)
+ .addDeletes(FILE_B_POS_DELETES)
+ .addDeletes(FILE_B2_POS_DELETES)
+ .addDeletes(FILE_B_EQ_DELETES)
+ .addDeletes(FILE_B2_EQ_DELETES)
+ .commit();
+
+ List<Tuple2<Long, String>> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation + "#entries")
+ .select("sequence_number", "data_file.file_path")
+ .sort("sequence_number", "data_file.file_path")
+ .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+ .collectAsList();
+ List<Tuple2<Long, String>> expected =
+ ImmutableList.of(
+ Tuple2.apply(1L, FILE_A.path().toString()),
+ Tuple2.apply(1L, FILE_C.path().toString()),
+ Tuple2.apply(1L, FILE_D.path().toString()),
+ Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2.path().toString()),
+ Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2.path().toString()),
+ Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_C2.path().toString()),
+ Tuple2.apply(2L, FILE_D2.path().toString()));
+ assertThat(actual).isEqualTo(expected);
+
+ RemoveDanglingDeleteFiles.Result result =
+ SparkActions.get().removeDanglingDeleteFiles(table).execute();
+
+ // Eq Delete files of the FILE B partition should be removed
+ // because there are no data files in partition with a lesser sequence
number
+ Set<CharSequence> removedDeleteFiles =
+ StreamSupport.stream(result.removedDeleteFiles().spliterator(), false)
+ .map(DeleteFile::path)
+ .collect(Collectors.toSet());
+ assertThat(removedDeleteFiles)
+ .as("Expected two delete files removed")
+ .hasSize(2)
+ .containsExactlyInAnyOrder(FILE_B_EQ_DELETES.path(),
FILE_B2_EQ_DELETES.path());
+
+ List<Tuple2<Long, String>> actualAfter =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation + "#entries")
+ .filter("status < 2") // live files
+ .select("sequence_number", "data_file.file_path")
+ .sort("sequence_number", "data_file.file_path")
+ .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+ .collectAsList();
+ List<Tuple2<Long, String>> expectedAfter =
+ ImmutableList.of(
+ Tuple2.apply(1L, FILE_A.path().toString()),
+ Tuple2.apply(1L, FILE_C.path().toString()),
+ Tuple2.apply(1L, FILE_D.path().toString()),
+ Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2.path().toString()),
+ Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_B2.path().toString()),
+ Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+ Tuple2.apply(2L, FILE_C2.path().toString()),
+ Tuple2.apply(2L, FILE_D2.path().toString()));
+ assertThat(actualAfter).isEqualTo(expectedAfter);
+ }
+
+ @Test
+ public void testUnpartitionedTable() {
+ setupUnpartitionedTable();
+
+ table
+ .newRowDelta()
+ .addDeletes(FILE_UNPARTITIONED_POS_DELETE)
+ .addDeletes(FILE_UNPARTITIONED_EQ_DELETE)
+ .commit();
+ table.newAppend().appendFile(FILE_UNPARTITIONED).commit();
+
+ RemoveDanglingDeleteFiles.Result result =
+ SparkActions.get().removeDanglingDeleteFiles(table).execute();
+ assertThat(result.removedDeleteFiles()).as("No-op for unpartitioned
tables").isEmpty();
+ }
+}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index b67ee87c7d..2de83f8b35 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -24,6 +24,7 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import static org.apache.spark.sql.functions.current_date;
import static org.apache.spark.sql.functions.date_add;
import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.min;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
@@ -56,6 +57,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
@@ -73,7 +75,9 @@ import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.actions.SizeBasedDataRewriter;
import org.apache.iceberg.actions.SizeBasedFileRewriter;
import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
@@ -86,6 +90,7 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -105,9 +110,11 @@ import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.jupiter.api.BeforeAll;
@@ -128,6 +135,8 @@ public class TestRewriteDataFilesAction extends TestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));
+ private static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+
@TempDir private Path temp;
private final FileRewriteCoordinator coordinator =
FileRewriteCoordinator.get();
@@ -336,6 +345,125 @@ public class TestRewriteDataFilesAction extends TestBase {
assertThat(actualRecords).as("7 rows are removed").hasSize(total - 7);
}
+ @Test
+ public void testRemoveDangledEqualityDeletesPartitionEvolution() {
+ Table table =
+ TABLES.create(
+ SCHEMA,
+ SPEC,
+ Collections.singletonMap(TableProperties.FORMAT_VERSION, "2"),
+ tableLocation);
+
+ // data seq = 1, write 4 files in 2 partitions
+ List<ThreeColumnRecord> records1 =
+ Lists.newArrayList(
+ new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1,
"BBBBBBBBBB", "BBBB"));
+ writeRecords(records1);
+ List<ThreeColumnRecord> records2 =
+ Lists.newArrayList(
+ new ThreeColumnRecord(0, "CCCCCCCCCC", "CCCC"),
+ new ThreeColumnRecord(0, "DDDDDDDDDD", "DDDD"));
+ writeRecords(records2);
+ table.refresh();
+ shouldHaveFiles(table, 4);
+
+ // data seq = 2 & 3, write 2 equality deletes in both partitions
+ writeEqDeleteRecord(table, "c1", 1, "c3", "AAAA");
+ writeEqDeleteRecord(table, "c1", 2, "c3", "CCCC");
+ table.refresh();
+ Set<DeleteFile> existingDeletes = TestHelpers.deleteFiles(table);
+ assertThat(existingDeletes)
+ .as("Only one equality delete c1=1 is used in query planning")
+ .hasSize(1);
+
+ // partition evolution
+ table.refresh();
+ table.updateSpec().addField(Expressions.ref("c3")).commit();
+
+ // data seq = 4, write 2 new data files in both partitions for evolved spec
+ List<ThreeColumnRecord> records3 =
+ Lists.newArrayList(
+ new ThreeColumnRecord(1, "A", "CCCC"), new ThreeColumnRecord(2,
"D", "DDDD"));
+ writeRecords(records3);
+
+ List<Object[]> originalData = currentData();
+
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .filter(Expressions.equal("c1", 1))
+ .option(RewriteDataFiles.REMOVE_DANGLING_DELETES, "true")
+ .execute();
+
+ existingDeletes = TestHelpers.deleteFiles(table);
+ assertThat(existingDeletes).as("Shall pruned dangling deletes after
rewrite").hasSize(0);
+
+ assertThat(result)
+ .extracting(
+ Result::addedDataFilesCount,
+ Result::rewrittenDataFilesCount,
+ Result::removedDeleteFilesCount)
+ .as("Should compact 3 data files into 2 and remove both dangled
equality delete file")
+ .containsExactly(2, 3, 2);
+ shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 ==
1", 5);
+
+ List<Object[]> postRewriteData = currentData();
+ assertEquals("We shouldn't have changed the data", originalData,
postRewriteData);
+
+ shouldHaveSnapshots(table, 7);
+ shouldHaveFiles(table, 5);
+ }
+
+ @Test
+ public void testRemoveDangledPositionDeletesPartitionEvolution() {
+ Table table =
+ TABLES.create(
+ SCHEMA,
+ SPEC,
+ Collections.singletonMap(TableProperties.FORMAT_VERSION, "2"),
+ tableLocation);
+
+ // data seq = 1, write 4 files in 2 partitions
+ writeRecords(2, 2, 2);
+ List<DataFile> dataFilesBefore = TestHelpers.dataFiles(table, null);
+ shouldHaveFiles(table, 4);
+
+ // data seq = 2, write 1 position deletes in c1=1
+ table
+ .newRowDelta()
+ .addDeletes(writePosDeletesToFile(table, dataFilesBefore.get(3),
1).get(0))
+ .commit();
+
+ // partition evolution
+ table.updateSpec().addField(Expressions.ref("c3")).commit();
+
+ // data seq = 3, write 1 new data files in c1=1 for evolved spec
+ writeRecords(1, 1, 1);
+ shouldHaveFiles(table, 5);
+ List<Object[]> expectedRecords = currentData();
+
+ Result result =
+ actions()
+ .rewriteDataFiles(table)
+ .filter(Expressions.equal("c1", 1))
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .option(RewriteDataFiles.REMOVE_DANGLING_DELETES, "true")
+ .execute();
+
+ assertThat(result)
+ .extracting(
+ Result::addedDataFilesCount,
+ Result::rewrittenDataFilesCount,
+ Result::removedDeleteFilesCount)
+ .as("Should rewrite 2 data files into 1 and remove 1 dangled position
delete file")
+ .containsExactly(1, 2, 1);
+ shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 ==
1", 3);
+
+ shouldHaveSnapshots(table, 5);
+
assertThat(table.currentSnapshot().summary().get("total-position-deletes")).isEqualTo("0");
+ assertEquals("Rows must match", expectedRecords, currentData());
+ }
+
@Test
public void testBinPackWithDeleteAllData() {
Map<String, String> options = Maps.newHashMap();
@@ -1697,6 +1825,21 @@ public class TestRewriteDataFilesAction extends TestBase
{
assertThat(numFiles).as("Did not have the expected number of
files").isEqualTo(numExpected);
}
+ protected long shouldHaveMinSequenceNumberInPartition(
+ Table table, String partitionFilter, long expected) {
+ long actual =
+ SparkTableUtil.loadMetadataTable(spark, table,
MetadataTableType.ENTRIES)
+ .filter("status != 2")
+ .filter(partitionFilter)
+ .select("sequence_number")
+ .agg(min("sequence_number"))
+ .as(Encoders.LONG())
+ .collectAsList()
+ .get(0);
+ assertThat(actual).as("Did not have the expected min sequence
number").isEqualTo(expected);
+ return actual;
+ }
+
protected void shouldHaveSnapshots(Table table, int expectedSnapshots) {
table.refresh();
int actualSnapshots = Iterables.size(table.snapshots());
@@ -1893,6 +2036,11 @@ public class TestRewriteDataFilesAction extends TestBase
{
.getAsDouble();
}
+ private void writeRecords(List<ThreeColumnRecord> records) {
+ Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
+ writeDF(df);
+ }
+
private void writeRecords(int files, int numRecords) {
writeRecords(files, numRecords, 0);
}
@@ -1946,7 +2094,10 @@ public class TestRewriteDataFilesAction extends TestBase
{
table
.io()
.newOutputFile(
-
table.locationProvider().newDataLocation(UUID.randomUUID().toString()));
+ table
+ .locationProvider()
+ .newDataLocation(
+
FileFormat.PARQUET.addExtension(UUID.randomUUID().toString())));
EncryptedOutputFile encryptedOutputFile =
EncryptedFiles.encryptedOutput(outputFile,
EncryptionKeyMetadata.EMPTY);
@@ -1972,6 +2123,68 @@ public class TestRewriteDataFilesAction extends TestBase
{
return results;
}
+ private void writeEqDeleteRecord(
+ Table table, String partCol, Object partVal, String delCol, Object
delVal) {
+ List<Integer> equalityFieldIds =
Lists.newArrayList(table.schema().findField(delCol).fieldId());
+ Schema eqDeleteRowSchema = table.schema().select(delCol);
+ Record partitionRecord =
+ GenericRecord.create(table.schema().select(partCol))
+ .copy(ImmutableMap.of(partCol, partVal));
+ Record record =
GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of(delCol, delVal));
+ writeEqDeleteRecord(table, equalityFieldIds, partitionRecord,
eqDeleteRowSchema, record);
+ }
+
+ private void writeEqDeleteRecord(
+ Table table,
+ List<Integer> equalityFieldIds,
+ Record partitionRecord,
+ Schema eqDeleteRowSchema,
+ Record deleteRecord) {
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PARQUET).build();
+ GenericAppenderFactory appenderFactory =
+ new GenericAppenderFactory(
+ table.schema(),
+ table.spec(),
+ ArrayUtil.toIntArray(equalityFieldIds),
+ eqDeleteRowSchema,
+ null);
+
+ EncryptedOutputFile file =
+ createEncryptedOutputFile(createPartitionKey(table, partitionRecord),
fileFactory);
+
+ EqualityDeleteWriter<Record> eqDeleteWriter =
+ appenderFactory.newEqDeleteWriter(
+ file, FileFormat.PARQUET, createPartitionKey(table,
partitionRecord));
+
+ try (EqualityDeleteWriter<Record> clsEqDeleteWriter = eqDeleteWriter) {
+ clsEqDeleteWriter.write(deleteRecord);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit();
+ }
+
+ private PartitionKey createPartitionKey(Table table, Record record) {
+ if (table.spec().isUnpartitioned()) {
+ return null;
+ }
+
+ PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+ partitionKey.partition(record);
+
+ return partitionKey;
+ }
+
+ private EncryptedOutputFile createEncryptedOutputFile(
+ PartitionKey partition, OutputFileFactory fileFactory) {
+ if (partition == null) {
+ return fileFactory.newOutputFile();
+ } else {
+ return fileFactory.newOutputFile(partition);
+ }
+ }
+
private SparkActions actions() {
return SparkActions.get();
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 37b6cd86fb..8547f9753f 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -862,6 +862,7 @@ public class TestRewritePositionDeleteFilesAction extends
CatalogTestBase {
files.stream().collect(Collectors.groupingBy(ContentFile::partition));
List<DeleteFile> deleteFiles =
Lists.newArrayListWithCapacity(deleteFilesPerPartition *
filesByPartition.size());
+ String suffix = String.format(".%s",
FileFormat.PARQUET.name().toLowerCase());
for (Map.Entry<StructLike, List<DataFile>> filesByPartitionEntry :
filesByPartition.entrySet()) {
@@ -886,7 +887,7 @@ public class TestRewritePositionDeleteFilesAction extends
CatalogTestBase {
if (counter == deleteFileSize) {
// Dump to file and reset variables
OutputFile output =
- Files.localOutput(File.createTempFile("junit", null,
temp.toFile()));
+ Files.localOutput(File.createTempFile("junit", suffix,
temp.toFile()));
deleteFiles.add(FileHelpers.writeDeleteFile(table, output,
partition, deletes).first());
counter = 0;
deletes.clear();