This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 82cedbbb19 API, Core: Enable removing rewritten delete files in
RowDelta (#11166)
82cedbbb19 is described below
commit 82cedbbb1935db6a765d9029df4e54c38e95bedc
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Sep 19 13:38:03 2024 -0700
API, Core: Enable removing rewritten delete files in RowDelta (#11166)
---
api/src/main/java/org/apache/iceberg/RowDelta.java | 11 ++
.../iceberg/ReplaceDeleteFilesBenchmark.java | 130 ++++++++++++++++++
.../main/java/org/apache/iceberg/BaseRowDelta.java | 6 +
.../test/java/org/apache/iceberg/TestRowDelta.java | 147 +++++++++++++++++++++
4 files changed, 294 insertions(+)
diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java
b/api/src/main/java/org/apache/iceberg/RowDelta.java
index 624f6c15d2..a5e3fa477b 100644
--- a/api/src/main/java/org/apache/iceberg/RowDelta.java
+++ b/api/src/main/java/org/apache/iceberg/RowDelta.java
@@ -46,6 +46,17 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> {
*/
RowDelta addDeletes(DeleteFile deletes);
+ /**
+ * Removes a rewritten {@link DeleteFile} from the table.
+ *
+ * @param deletes a delete file that can be removed from the table
+ * @return this for method chaining
+ */
+ default RowDelta removeDeletes(DeleteFile deletes) {
+ throw new UnsupportedOperationException(
+ getClass().getName() + " does not implement removeDeletes");
+ }
+
/**
* Set the snapshot ID used in any reads for this operation.
*
diff --git
a/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java
new file mode 100644
index 0000000000..a899b870a9
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of replacing delete files in the
table.
+ *
+ * <p>To run this benchmark: <code>
+ * ./gradlew :iceberg-core:jmh
+ * -PjmhIncludeRegex=ReplaceDeleteFilesBenchmark
+ * -PjmhOutputPath=benchmark/replace-delete-files-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class ReplaceDeleteFilesBenchmark {
+
+ private static final String TABLE_IDENT = "tbl";
+ private static final Schema SCHEMA =
+ new Schema(
+ required(1, "int_col", Types.IntegerType.get()),
+ required(2, "long_col", Types.LongType.get()),
+ required(3, "decimal_col", Types.DecimalType.of(10, 10)),
+ required(4, "date_col", Types.DateType.get()),
+ required(5, "timestamp_col", Types.TimestampType.withoutZone()),
+ required(6, "timestamp_tz_col", Types.TimestampType.withZone()),
+ required(7, "str_col", Types.StringType.get()));
+ private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
+ private static final HadoopTables TABLES = new HadoopTables();
+
+ private Table table;
+ private List<DeleteFile> deleteFiles;
+ private List<DeleteFile> pendingDeleteFiles;
+
+ @Param({"50000", "100000", "500000", "1000000", "2500000"})
+ private int numFiles;
+
+ @Setup
+ public void setupBenchmark() {
+ initTable();
+ initFiles();
+ }
+
+ @TearDown
+ public void tearDownBenchmark() {
+ dropTable();
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void replaceDeleteFiles() {
+ RowDelta rowDelta = table.newRowDelta();
+ deleteFiles.forEach(rowDelta::removeDeletes);
+ pendingDeleteFiles.forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+ }
+
+ private void initTable() {
+ this.table = TABLES.create(SCHEMA, SPEC, TABLE_IDENT);
+ }
+
+ private void dropTable() {
+ TABLES.dropTable(TABLE_IDENT);
+ }
+
+ private void initFiles() {
+ List<DeleteFile> generatedDeleteFiles =
Lists.newArrayListWithExpectedSize(numFiles);
+ List<DeleteFile> generatedPendingDeleteFiles =
Lists.newArrayListWithExpectedSize(numFiles);
+
+ RowDelta rowDelta = table.newRowDelta();
+
+ for (int ordinal = 0; ordinal < numFiles; ordinal++) {
+ DataFile dataFile = FileGenerationUtil.generateDataFile(table, null);
+ rowDelta.addRows(dataFile);
+
+ DeleteFile deleteFile =
FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
+ rowDelta.addDeletes(deleteFile);
+ generatedDeleteFiles.add(deleteFile);
+
+ DeleteFile pendingDeleteFile =
FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
+ generatedPendingDeleteFiles.add(pendingDeleteFile);
+ }
+
+ rowDelta.commit();
+
+ this.deleteFiles = generatedDeleteFiles;
+ this.pendingDeleteFiles = generatedPendingDeleteFiles;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
index 42fd17f032..85c2269ee5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
@@ -62,6 +62,12 @@ class BaseRowDelta extends MergingSnapshotProducer<RowDelta>
implements RowDelta
return this;
}
+ @Override
+ public RowDelta removeDeletes(DeleteFile deletes) {
+ delete(deletes);
+ return this;
+ }
+
@Override
public RowDelta validateFromSnapshot(long snapshotId) {
this.startingSnapshotId = snapshotId;
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index a2a043e630..1d67e48a2c 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -1409,4 +1409,151 @@ public class TestRowDelta extends V2TableTestBase {
.isInstanceOf(ValidationException.class)
.hasMessageStartingWith("Found new conflicting delete files");
}
+
+ @TestTemplate
+ public void testRewrittenDeleteFiles() {
+ DataFile dataFile = newDataFile("data_bucket=0");
+ DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
+ RowDelta baseRowDelta =
table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
+ Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
+ assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);
+
+ DeleteFile newDeleteFile = newDeleteFile(dataFile.specId(),
"data_bucket=0");
+ RowDelta rowDelta =
+ table
+ .newRowDelta()
+ .removeDeletes(deleteFile)
+ .addDeletes(newDeleteFile)
+ .validateFromSnapshot(baseSnapshot.snapshotId());
+ Snapshot snapshot = commit(table, rowDelta, branch);
+ assertThat(snapshot.operation()).isEqualTo(DataOperations.DELETE);
+
+ List<ManifestFile> dataManifests = snapshot.dataManifests(table.io());
+ assertThat(dataManifests).hasSize(1);
+ validateManifest(
+ dataManifests.get(0),
+ dataSeqs(1L),
+ fileSeqs(1L),
+ ids(baseSnapshot.snapshotId()),
+ files(dataFile),
+ statuses(Status.ADDED));
+
+ List<ManifestFile> deleteManifests = snapshot.deleteManifests(table.io());
+ assertThat(deleteManifests).hasSize(2);
+ validateDeleteManifest(
+ deleteManifests.get(0),
+ dataSeqs(2L),
+ fileSeqs(2L),
+ ids(snapshot.snapshotId()),
+ files(newDeleteFile),
+ statuses(Status.ADDED));
+ validateDeleteManifest(
+ deleteManifests.get(1),
+ dataSeqs(1L),
+ fileSeqs(1L),
+ ids(snapshot.snapshotId()),
+ files(deleteFile),
+ statuses(Status.DELETED));
+ }
+
+ @TestTemplate
+ public void testConcurrentDeletesRewriteSameDeleteFile() {
+ DataFile dataFile = newDataFile("data_bucket=0");
+ DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
+ RowDelta baseRowDelta =
table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
+ Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
+ assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);
+
+ // commit the first DELETE operation that replaces `deleteFile`
+ DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(),
"data_bucket=0");
+ RowDelta delete1 =
+ table
+ .newRowDelta()
+ .addDeletes(newDeleteFile1)
+ .removeDeletes(deleteFile)
+ .validateFromSnapshot(baseSnapshot.snapshotId())
+ .validateNoConflictingDataFiles();
+ Snapshot snapshot1 = commit(table, delete1, branch);
+ assertThat(snapshot1.operation()).isEqualTo(DataOperations.DELETE);
+ assertThat(snapshot1.sequenceNumber()).isEqualTo(2L);
+
+ // commit the second DELETE operation that replaces `deleteFile`
+ DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(),
"data_bucket=0");
+ RowDelta delete2 =
+ table
+ .newRowDelta()
+ .addDeletes(newDeleteFile2)
+ .removeDeletes(deleteFile)
+ .validateFromSnapshot(baseSnapshot.snapshotId())
+ .validateNoConflictingDataFiles();
+ Snapshot snapshot2 = commit(table, delete2, branch);
+ assertThat(snapshot2.operation()).isEqualTo(DataOperations.DELETE);
+ assertThat(snapshot2.sequenceNumber()).isEqualTo(3L);
+
+ List<ManifestFile> dataManifests = snapshot2.dataManifests(table.io());
+ assertThat(dataManifests).hasSize(1);
+ validateManifest(
+ dataManifests.get(0),
+ dataSeqs(1L),
+ fileSeqs(1L),
+ ids(baseSnapshot.snapshotId()),
+ files(dataFile),
+ statuses(Status.ADDED));
+
+ // verify both new delete files have been added
+ List<ManifestFile> deleteManifests = snapshot2.deleteManifests(table.io());
+ assertThat(deleteManifests).hasSize(2);
+ validateDeleteManifest(
+ deleteManifests.get(0),
+ dataSeqs(3L),
+ fileSeqs(3L),
+ ids(snapshot2.snapshotId()),
+ files(newDeleteFile2),
+ statuses(Status.ADDED));
+ validateDeleteManifest(
+ deleteManifests.get(1),
+ dataSeqs(2L),
+ fileSeqs(2L),
+ ids(snapshot1.snapshotId()),
+ files(newDeleteFile1),
+ statuses(Status.ADDED));
+ }
+
+ @TestTemplate
+ public void testConcurrentMergeRewriteSameDeleteFile() {
+ DataFile dataFile = newDataFile("data_bucket=0");
+ DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
+ RowDelta baseRowDelta =
table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
+ Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
+ assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);
+
+ // commit a DELETE operation that replaces `deleteFile`
+ DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(),
"data_bucket=0");
+ RowDelta delete =
+ table
+ .newRowDelta()
+ .addDeletes(newDeleteFile1)
+ .removeDeletes(deleteFile)
+ .validateFromSnapshot(baseSnapshot.snapshotId())
+ .validateNoConflictingDataFiles();
+ commit(table, delete, branch);
+
+ // attempt to commit a MERGE operation that replaces `deleteFile`
+ DataFile newDataFile2 = newDataFile("data_bucket=0");
+ DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(),
"data_bucket=0");
+ RowDelta merge =
+ table
+ .newRowDelta()
+ .addRows(newDataFile2)
+ .addDeletes(newDeleteFile2)
+ .removeDeletes(deleteFile)
+ .validateFromSnapshot(baseSnapshot.snapshotId())
+ .validateNoConflictingDataFiles()
+ .validateNoConflictingDeleteFiles();
+
+ // MERGE must fail as DELETE could have deleted more positions
+ assertThatThrownBy(() -> commit(table, merge, branch))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageStartingWith("Found new conflicting delete files that can
apply");
+ }
}