This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 82cedbbb19 API, Core: Enable removing rewritten delete files in 
RowDelta (#11166)
82cedbbb19 is described below

commit 82cedbbb1935db6a765d9029df4e54c38e95bedc
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Sep 19 13:38:03 2024 -0700

    API, Core: Enable removing rewritten delete files in RowDelta (#11166)
---
 api/src/main/java/org/apache/iceberg/RowDelta.java |  11 ++
 .../iceberg/ReplaceDeleteFilesBenchmark.java       | 130 ++++++++++++++++++
 .../main/java/org/apache/iceberg/BaseRowDelta.java |   6 +
 .../test/java/org/apache/iceberg/TestRowDelta.java | 147 +++++++++++++++++++++
 4 files changed, 294 insertions(+)

diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java 
b/api/src/main/java/org/apache/iceberg/RowDelta.java
index 624f6c15d2..a5e3fa477b 100644
--- a/api/src/main/java/org/apache/iceberg/RowDelta.java
+++ b/api/src/main/java/org/apache/iceberg/RowDelta.java
@@ -46,6 +46,17 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> {
    */
   RowDelta addDeletes(DeleteFile deletes);
 
+  /**
+   * Removes a rewritten {@link DeleteFile} from the table.
+   *
+   * @param deletes a delete file that can be removed from the table
+   * @return this for method chaining
+   */
+  default RowDelta removeDeletes(DeleteFile deletes) {
+    throw new UnsupportedOperationException(
+        getClass().getName() + " does not implement removeDeletes");
+  }
+
   /**
    * Set the snapshot ID used in any reads for this operation.
    *
diff --git 
a/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java
new file mode 100644
index 0000000000..a899b870a9
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of replacing delete files in the 
table.
+ *
+ * <p>To run this benchmark: <code>
+ *   ./gradlew :iceberg-core:jmh
+ *       -PjmhIncludeRegex=ReplaceDeleteFilesBenchmark
+ *       -PjmhOutputPath=benchmark/replace-delete-files-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class ReplaceDeleteFilesBenchmark {
+
+  private static final String TABLE_IDENT = "tbl";
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "int_col", Types.IntegerType.get()),
+          required(2, "long_col", Types.LongType.get()),
+          required(3, "decimal_col", Types.DecimalType.of(10, 10)),
+          required(4, "date_col", Types.DateType.get()),
+          required(5, "timestamp_col", Types.TimestampType.withoutZone()),
+          required(6, "timestamp_tz_col", Types.TimestampType.withZone()),
+          required(7, "str_col", Types.StringType.get()));
+  private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
+  private static final HadoopTables TABLES = new HadoopTables();
+
+  private Table table;
+  private List<DeleteFile> deleteFiles;
+  private List<DeleteFile> pendingDeleteFiles;
+
+  @Param({"50000", "100000", "500000", "1000000", "2500000"})
+  private int numFiles;
+
+  @Setup
+  public void setupBenchmark() {
+    initTable();
+    initFiles();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    dropTable();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void replaceDeleteFiles() {
+    RowDelta rowDelta = table.newRowDelta();
+    deleteFiles.forEach(rowDelta::removeDeletes);
+    pendingDeleteFiles.forEach(rowDelta::addDeletes);
+    rowDelta.commit();
+  }
+
+  private void initTable() {
+    this.table = TABLES.create(SCHEMA, SPEC, TABLE_IDENT);
+  }
+
+  private void dropTable() {
+    TABLES.dropTable(TABLE_IDENT);
+  }
+
+  private void initFiles() {
+    List<DeleteFile> generatedDeleteFiles = 
Lists.newArrayListWithExpectedSize(numFiles);
+    List<DeleteFile> generatedPendingDeleteFiles = 
Lists.newArrayListWithExpectedSize(numFiles);
+
+    RowDelta rowDelta = table.newRowDelta();
+
+    for (int ordinal = 0; ordinal < numFiles; ordinal++) {
+      DataFile dataFile = FileGenerationUtil.generateDataFile(table, null);
+      rowDelta.addRows(dataFile);
+
+      DeleteFile deleteFile = 
FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
+      rowDelta.addDeletes(deleteFile);
+      generatedDeleteFiles.add(deleteFile);
+
+      DeleteFile pendingDeleteFile = 
FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
+      generatedPendingDeleteFiles.add(pendingDeleteFile);
+    }
+
+    rowDelta.commit();
+
+    this.deleteFiles = generatedDeleteFiles;
+    this.pendingDeleteFiles = generatedPendingDeleteFiles;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java 
b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
index 42fd17f032..85c2269ee5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
@@ -62,6 +62,12 @@ class BaseRowDelta extends MergingSnapshotProducer<RowDelta> 
implements RowDelta
     return this;
   }
 
+  @Override
+  public RowDelta removeDeletes(DeleteFile deletes) {
+    delete(deletes);
+    return this;
+  }
+
   @Override
   public RowDelta validateFromSnapshot(long snapshotId) {
     this.startingSnapshotId = snapshotId;
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java 
b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index a2a043e630..1d67e48a2c 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -1409,4 +1409,151 @@ public class TestRowDelta extends V2TableTestBase {
         .isInstanceOf(ValidationException.class)
         .hasMessageStartingWith("Found new conflicting delete files");
   }
+
+  @TestTemplate
+  public void testRewrittenDeleteFiles() {
+    DataFile dataFile = newDataFile("data_bucket=0");
+    DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
+    RowDelta baseRowDelta = 
table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
+    Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
+    assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);
+
+    DeleteFile newDeleteFile = newDeleteFile(dataFile.specId(), 
"data_bucket=0");
+    RowDelta rowDelta =
+        table
+            .newRowDelta()
+            .removeDeletes(deleteFile)
+            .addDeletes(newDeleteFile)
+            .validateFromSnapshot(baseSnapshot.snapshotId());
+    Snapshot snapshot = commit(table, rowDelta, branch);
+    assertThat(snapshot.operation()).isEqualTo(DataOperations.DELETE);
+
+    List<ManifestFile> dataManifests = snapshot.dataManifests(table.io());
+    assertThat(dataManifests).hasSize(1);
+    validateManifest(
+        dataManifests.get(0),
+        dataSeqs(1L),
+        fileSeqs(1L),
+        ids(baseSnapshot.snapshotId()),
+        files(dataFile),
+        statuses(Status.ADDED));
+
+    List<ManifestFile> deleteManifests = snapshot.deleteManifests(table.io());
+    assertThat(deleteManifests).hasSize(2);
+    validateDeleteManifest(
+        deleteManifests.get(0),
+        dataSeqs(2L),
+        fileSeqs(2L),
+        ids(snapshot.snapshotId()),
+        files(newDeleteFile),
+        statuses(Status.ADDED));
+    validateDeleteManifest(
+        deleteManifests.get(1),
+        dataSeqs(1L),
+        fileSeqs(1L),
+        ids(snapshot.snapshotId()),
+        files(deleteFile),
+        statuses(Status.DELETED));
+  }
+
+  @TestTemplate
+  public void testConcurrentDeletesRewriteSameDeleteFile() {
+    DataFile dataFile = newDataFile("data_bucket=0");
+    DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
+    RowDelta baseRowDelta = 
table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
+    Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
+    assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);
+
+    // commit the first DELETE operation that replaces `deleteFile`
+    DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(), 
"data_bucket=0");
+    RowDelta delete1 =
+        table
+            .newRowDelta()
+            .addDeletes(newDeleteFile1)
+            .removeDeletes(deleteFile)
+            .validateFromSnapshot(baseSnapshot.snapshotId())
+            .validateNoConflictingDataFiles();
+    Snapshot snapshot1 = commit(table, delete1, branch);
+    assertThat(snapshot1.operation()).isEqualTo(DataOperations.DELETE);
+    assertThat(snapshot1.sequenceNumber()).isEqualTo(2L);
+
+    // commit the second DELETE operation that replaces `deleteFile`
+    DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(), 
"data_bucket=0");
+    RowDelta delete2 =
+        table
+            .newRowDelta()
+            .addDeletes(newDeleteFile2)
+            .removeDeletes(deleteFile)
+            .validateFromSnapshot(baseSnapshot.snapshotId())
+            .validateNoConflictingDataFiles();
+    Snapshot snapshot2 = commit(table, delete2, branch);
+    assertThat(snapshot2.operation()).isEqualTo(DataOperations.DELETE);
+    assertThat(snapshot2.sequenceNumber()).isEqualTo(3L);
+
+    List<ManifestFile> dataManifests = snapshot2.dataManifests(table.io());
+    assertThat(dataManifests).hasSize(1);
+    validateManifest(
+        dataManifests.get(0),
+        dataSeqs(1L),
+        fileSeqs(1L),
+        ids(baseSnapshot.snapshotId()),
+        files(dataFile),
+        statuses(Status.ADDED));
+
+    // verify both new delete files have been added
+    List<ManifestFile> deleteManifests = snapshot2.deleteManifests(table.io());
+    assertThat(deleteManifests).hasSize(2);
+    validateDeleteManifest(
+        deleteManifests.get(0),
+        dataSeqs(3L),
+        fileSeqs(3L),
+        ids(snapshot2.snapshotId()),
+        files(newDeleteFile2),
+        statuses(Status.ADDED));
+    validateDeleteManifest(
+        deleteManifests.get(1),
+        dataSeqs(2L),
+        fileSeqs(2L),
+        ids(snapshot1.snapshotId()),
+        files(newDeleteFile1),
+        statuses(Status.ADDED));
+  }
+
+  @TestTemplate
+  public void testConcurrentMergeRewriteSameDeleteFile() {
+    DataFile dataFile = newDataFile("data_bucket=0");
+    DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
+    RowDelta baseRowDelta = 
table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
+    Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
+    assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);
+
+    // commit a DELETE operation that replaces `deleteFile`
+    DeleteFile newDeleteFile1 = newDeleteFile(dataFile.specId(), 
"data_bucket=0");
+    RowDelta delete =
+        table
+            .newRowDelta()
+            .addDeletes(newDeleteFile1)
+            .removeDeletes(deleteFile)
+            .validateFromSnapshot(baseSnapshot.snapshotId())
+            .validateNoConflictingDataFiles();
+    commit(table, delete, branch);
+
+    // attempt to commit a MERGE operation that replaces `deleteFile`
+    DataFile newDataFile2 = newDataFile("data_bucket=0");
+    DeleteFile newDeleteFile2 = newDeleteFile(dataFile.specId(), 
"data_bucket=0");
+    RowDelta merge =
+        table
+            .newRowDelta()
+            .addRows(newDataFile2)
+            .addDeletes(newDeleteFile2)
+            .removeDeletes(deleteFile)
+            .validateFromSnapshot(baseSnapshot.snapshotId())
+            .validateNoConflictingDataFiles()
+            .validateNoConflictingDeleteFiles();
+
+    // MERGE must fail as DELETE could have deleted more positions
+    assertThatThrownBy(() -> commit(table, merge, branch))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageStartingWith("Found new conflicting delete files that can 
apply");
+  }
 }

Reply via email to