This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5eb45117fa Core: Add writer for unordered position deletes (#7692)
5eb45117fa is described below

commit 5eb45117fae738ff087e38aaf6b961b6a9cd38ec
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu May 25 08:05:50 2023 -0700

    Core: Add writer for unordered position deletes (#7692)
---
 .../iceberg/deletes/PositionDeleteWriter.java      |   7 ++
 .../deletes/SortingPositionOnlyDeleteWriter.java   | 111 +++++++++++++++++++++
 .../iceberg/io/FanoutPositionOnlyDeleteWriter.java |  79 +++++++++++++++
 .../apache/iceberg/io/TestPartitioningWriters.java |  95 ++++++++++++++++++
 .../iceberg/spark/source/WritersBenchmark.java     |  86 +++++++++++++++-
 5 files changed, 375 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java 
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
index 36a8949c75..d3e01bcd04 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
@@ -32,6 +32,13 @@ import org.apache.iceberg.io.FileWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.util.CharSequenceSet;
 
+/**
+ * A position delete writer that can handle deletes ordered by file and 
position.
+ *
+ * <p>This writer does not keep track of seen deletes and assumes all incoming 
records are ordered
+ * by file and position as required by the spec. If there is no external 
process to order the
+ * records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
+ */
 public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, 
DeleteWriteResult> {
   private final FileAppender<StructLike> appender;
   private final FileFormat format;
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
 
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
new file mode 100644
index 0000000000..728eff097e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.CharSequenceWrapper;
+import org.roaringbitmap.longlong.PeekableLongIterator;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * A position delete writer that is capable of handling unordered deletes 
without rows.
+ *
+ * <p>This writer keeps an in-memory bitmap of deleted positions per each seen 
data file and flushes
+ * the result into a file when closed. This enables writing position delete 
files when the incoming
+ * records are not ordered by file and position as required by the spec. If 
the incoming deletes are
+ * ordered by an external process, use {@link PositionDeleteWriter} instead.
+ *
+ * <p>Note this writer stores only positions. It does not store deleted 
records.
+ */
+public class SortingPositionOnlyDeleteWriter<T>
+    implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
+
+  private final FileWriter<PositionDelete<T>, DeleteWriteResult> writer;
+  private final Map<CharSequenceWrapper, Roaring64Bitmap> positionsByPath;
+  private final CharSequenceWrapper pathWrapper;
+  private DeleteWriteResult result = null;
+
+  public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>, 
DeleteWriteResult> writer) {
+    this.writer = writer;
+    this.positionsByPath = Maps.newHashMap();
+    this.pathWrapper = CharSequenceWrapper.wrap(null);
+  }
+
+  @Override
+  public void write(PositionDelete<T> positionDelete) {
+    CharSequence path = positionDelete.path();
+    long position = positionDelete.pos();
+    Roaring64Bitmap positions = positionsByPath.get(pathWrapper.set(path));
+    if (positions != null) {
+      positions.add(position);
+    } else {
+      positions = new Roaring64Bitmap();
+      positions.add(position);
+      positionsByPath.put(CharSequenceWrapper.wrap(path), positions);
+    }
+  }
+
+  @Override
+  public long length() {
+    return writer.length();
+  }
+
+  @Override
+  public DeleteWriteResult result() {
+    return result;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (result == null) {
+      this.result = writeDeletes();
+    }
+  }
+
+  private DeleteWriteResult writeDeletes() throws IOException {
+    try {
+      PositionDelete<T> positionDelete = PositionDelete.create();
+      for (CharSequenceWrapper path : sortedPaths()) {
+        // the iterator provides values in ascending sorted order
+        PeekableLongIterator positions = 
positionsByPath.get(path).getLongIterator();
+        while (positions.hasNext()) {
+          long position = positions.next();
+          writer.write(positionDelete.set(path.get(), position, null /* no row 
*/));
+        }
+      }
+    } finally {
+      writer.close();
+    }
+
+    return writer.result();
+  }
+
+  private List<CharSequenceWrapper> sortedPaths() {
+    List<CharSequenceWrapper> paths = 
Lists.newArrayList(positionsByPath.keySet());
+    paths.sort(Comparators.charSequences());
+    return paths;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java 
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
new file mode 100644
index 0000000000..15c1cc6bb4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
+
+/**
+ * A position delete writer capable of writing to multiple specs and 
partitions if the incoming
+ * stream of deletes is not ordered. If the incoming records are ordered by an 
external process, use
+ * {@link ClusteredPositionDeleteWriter} instead.
+ *
+ * <p>Note this writer stores only positions. It does not store deleted 
records.
+ */
+public class FanoutPositionOnlyDeleteWriter<T>
+    extends FanoutWriter<PositionDelete<T>, DeleteWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSizeInBytes;
+  private final List<DeleteFile> deleteFiles;
+  private final CharSequenceSet referencedDataFiles;
+
+  public FanoutPositionOnlyDeleteWriter(
+      FileWriterFactory<T> writerFactory,
+      OutputFileFactory fileFactory,
+      FileIO io,
+      long targetFileSizeInBytes) {
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.deleteFiles = Lists.newArrayList();
+    this.referencedDataFiles = CharSequenceSet.empty();
+  }
+
+  @Override
+  protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(
+      PartitionSpec spec, StructLike partition) {
+    FileWriter<PositionDelete<T>, DeleteWriteResult> delegate =
+        new RollingPositionDeleteWriter<>(
+            writerFactory, fileFactory, io, targetFileSizeInBytes, spec, 
partition);
+    return new SortingPositionOnlyDeleteWriter<>(delegate);
+  }
+
+  @Override
+  protected void addResult(DeleteWriteResult result) {
+    deleteFiles.addAll(result.deleteFiles());
+    referencedDataFiles.addAll(result.referencedDataFiles());
+  }
+
+  @Override
+  protected DeleteWriteResult aggregatedResult() {
+    return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+  }
+}
diff --git 
a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java 
b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
index 3de86e4ff4..32a8c8d2cc 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -462,4 +462,99 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
             toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, "bbb"), toRow(4, 
"bbb"), toRow(5, "ccc"));
     Assert.assertEquals("Records should match", toSet(expectedRows), 
actualRowSet("*"));
   }
+
+  @Test
+  public void testFanoutPositionOnlyDeleteWriterNoRecords() throws IOException 
{
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    FanoutPositionOnlyDeleteWriter<T> writer =
+        new FanoutPositionOnlyDeleteWriter<>(
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+  }
+
+  @Test
+  public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws 
IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // add an unpartitioned data file
+    ImmutableList<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, 
"aaa"), toRow(11, "aaa"));
+    DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile1).commit();
+
+    // partition by bucket
+    table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
+
+    // add a data file partitioned by bucket
+    ImmutableList<T> rows2 = ImmutableList.of(toRow(3, "bbb"), toRow(4, 
"bbb"), toRow(12, "bbb"));
+    DataFile dataFile2 =
+        writeData(
+            writerFactory, fileFactory, rows2, table.spec(), 
partitionKey(table.spec(), "bbb"));
+    table.newFastAppend().appendFile(dataFile2).commit();
+
+    // partition by data
+    table
+        .updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    // add a data file partitioned by data
+    ImmutableList<T> rows3 = ImmutableList.of(toRow(5, "ccc"), toRow(13, 
"ccc"));
+    DataFile dataFile3 =
+        writeData(
+            writerFactory, fileFactory, rows3, table.spec(), 
partitionKey(table.spec(), "ccc"));
+    table.newFastAppend().appendFile(dataFile3).commit();
+
+    FanoutPositionOnlyDeleteWriter<T> writer =
+        new FanoutPositionOnlyDeleteWriter<>(
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+
+    PartitionSpec unpartitionedSpec = table.specs().get(0);
+    PartitionSpec bucketSpec = table.specs().get(1);
+    PartitionSpec identitySpec = table.specs().get(2);
+
+    writer.write(positionDelete(dataFile1.path(), 1L, null), 
unpartitionedSpec, null);
+    writer.write(
+        positionDelete(dataFile2.path(), 1L, null), bucketSpec, 
partitionKey(bucketSpec, "bbb"));
+    writer.write(
+        positionDelete(dataFile2.path(), 0L, null), bucketSpec, 
partitionKey(bucketSpec, "bbb"));
+    writer.write(
+        positionDelete(dataFile3.path(), 1L, null),
+        identitySpec,
+        partitionKey(identitySpec, "ccc"));
+    writer.write(
+        positionDelete(dataFile3.path(), 2L, null),
+        identitySpec,
+        partitionKey(identitySpec, "ccc"));
+    writer.write(positionDelete(dataFile1.path(), 0L, null), 
unpartitionedSpec, null);
+    writer.write(
+        positionDelete(dataFile3.path(), 0L, null),
+        identitySpec,
+        partitionKey(identitySpec, "ccc"));
+    writer.write(positionDelete(dataFile1.path(), 2L, null), 
unpartitionedSpec, null);
+
+    writer.close();
+
+    DeleteWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 delete files", 3, 
result.deleteFiles().size());
+    Assert.assertEquals(
+        "Must reference 3 data files", 3, 
writer.result().referencedDataFiles().size());
+    Assert.assertTrue("Must reference data files", 
writer.result().referencesDataFiles());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.deleteFiles().forEach(rowDelta::addDeletes);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(toRow(12, "bbb"));
+    Assert.assertEquals("Records should match", toSet(expectedRows), 
actualRowSet("*"));
+  }
 }
diff --git 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 13ff034e4b..094fd58443 100644
--- 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++ 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -22,9 +22,11 @@ import static 
org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
@@ -37,8 +39,8 @@ import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.ClusteredDataWriter;
 import org.apache.iceberg.io.ClusteredEqualityDeleteWriter;
 import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
-import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.io.FanoutDataWriter;
+import org.apache.iceberg.io.FanoutPositionOnlyDeleteWriter;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
@@ -52,7 +54,9 @@ import org.apache.iceberg.transforms.Transform;
 import org.apache.iceberg.transforms.Transforms;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.TearDown;
@@ -62,6 +66,9 @@ import org.openjdk.jmh.infra.Blackhole;
 public abstract class WritersBenchmark extends IcebergSourceBenchmark {
 
   private static final int NUM_ROWS = 2500000;
+  private static final int NUM_DATA_FILES_PER_POSITION_DELETE_FILE = 100;
+  private static final int NUM_DELETED_POSITIONS_PER_DATA_FILE = 50_000;
+  private static final int DELETE_POSITION_STEP = 10;
   private static final long TARGET_FILE_SIZE_IN_BYTES = 50L * 1024 * 1024;
 
   private static final Schema SCHEMA =
@@ -76,6 +83,7 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
   private Iterable<InternalRow> rows;
   private Iterable<InternalRow> positionDeleteRows;
+  private Iterable<InternalRow> shuffledPositionDeleteRows;
   private PartitionSpec unpartitionedSpec;
   private PartitionSpec partitionedSpec;
 
@@ -93,14 +101,32 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
             row -> 
transform.bind(Types.IntegerType.get()).apply(row.getInt(1))));
     this.rows = data;
 
-    this.positionDeleteRows =
-        RandomData.generateSpark(DeleteSchemaUtil.pathPosSchema(), NUM_ROWS, 
0L);
+    this.positionDeleteRows = generatePositionDeletes(false /* no shuffle */);
+    this.shuffledPositionDeleteRows = generatePositionDeletes(true /* shuffle 
*/);
 
     this.unpartitionedSpec = table().specs().get(0);
     Preconditions.checkArgument(unpartitionedSpec.isUnpartitioned());
     this.partitionedSpec = table().specs().get(1);
   }
 
+  private Iterable<InternalRow> generatePositionDeletes(boolean shuffle) {
+    int numDeletes = NUM_DATA_FILES_PER_POSITION_DELETE_FILE * 
NUM_DELETED_POSITIONS_PER_DATA_FILE;
+    List<InternalRow> deletes = Lists.newArrayListWithExpectedSize(numDeletes);
+
+    for (int pathIndex = 0; pathIndex < 
NUM_DATA_FILES_PER_POSITION_DELETE_FILE; pathIndex++) {
+      UTF8String path = UTF8String.fromString("path/to/position/delete/file/" 
+ UUID.randomUUID());
+      for (long pos = 0; pos < NUM_DELETED_POSITIONS_PER_DATA_FILE; pos++) {
+        deletes.add(new GenericInternalRow(new Object[] {path, pos * 
DELETE_POSITION_STEP}));
+      }
+    }
+
+    if (shuffle) {
+      Collections.shuffle(deletes);
+    }
+
+    return deletes;
+  }
+
   @TearDown
   public void tearDownBenchmark() throws IOException {
     tearDownSpark();
@@ -363,6 +389,60 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
     blackhole.consume(writer);
   }
 
+  @Benchmark
+  @Threads(1)
+  public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole 
blackhole) throws IOException {
+    FileIO io = table().io();
+
+    OutputFileFactory fileFactory = newFileFactory();
+    SparkFileWriterFactory writerFactory =
+        
SparkFileWriterFactory.builderFor(table()).dataFileFormat(fileFormat()).build();
+
+    FanoutPositionOnlyDeleteWriter<InternalRow> writer =
+        new FanoutPositionOnlyDeleteWriter<>(
+            writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+
+    PositionDelete<InternalRow> positionDelete = PositionDelete.create();
+    try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) 
{
+      for (InternalRow row : positionDeleteRows) {
+        String path = row.getString(0);
+        long pos = row.getLong(1);
+        positionDelete.set(path, pos, null);
+        closeableWriter.write(positionDelete, unpartitionedSpec, null);
+      }
+    }
+
+    blackhole.consume(writer);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole 
blackhole)
+      throws IOException {
+
+    FileIO io = table().io();
+
+    OutputFileFactory fileFactory = newFileFactory();
+    SparkFileWriterFactory writerFactory =
+        
SparkFileWriterFactory.builderFor(table()).dataFileFormat(fileFormat()).build();
+
+    FanoutPositionOnlyDeleteWriter<InternalRow> writer =
+        new FanoutPositionOnlyDeleteWriter<>(
+            writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+
+    PositionDelete<InternalRow> positionDelete = PositionDelete.create();
+    try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) 
{
+      for (InternalRow row : shuffledPositionDeleteRows) {
+        String path = row.getString(0);
+        long pos = row.getLong(1);
+        positionDelete.set(path, pos, null);
+        closeableWriter.write(positionDelete, unpartitionedSpec, null);
+      }
+    }
+
+    blackhole.consume(writer);
+  }
+
   private OutputFileFactory newFileFactory() {
     return OutputFileFactory.builderFor(table(), 1, 
1).format(fileFormat()).build();
   }

Reply via email to