This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5eb45117fa Core: Add writer for unordered position deletes (#7692)
5eb45117fa is described below
commit 5eb45117fae738ff087e38aaf6b961b6a9cd38ec
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu May 25 08:05:50 2023 -0700
Core: Add writer for unordered position deletes (#7692)
---
.../iceberg/deletes/PositionDeleteWriter.java | 7 ++
.../deletes/SortingPositionOnlyDeleteWriter.java | 111 +++++++++++++++++++++
.../iceberg/io/FanoutPositionOnlyDeleteWriter.java | 79 +++++++++++++++
.../apache/iceberg/io/TestPartitioningWriters.java | 95 ++++++++++++++++++
.../iceberg/spark/source/WritersBenchmark.java | 86 +++++++++++++++-
5 files changed, 375 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
index 36a8949c75..d3e01bcd04 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
@@ -32,6 +32,13 @@ import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.CharSequenceSet;
+/**
+ * A position delete writer that can handle deletes ordered by file and
position.
+ *
+ * <p>This writer does not keep track of seen deletes and assumes all incoming
records are ordered
+ * by file and position as required by the spec. If there is no external
process to order the
+ * records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
+ */
public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>,
DeleteWriteResult> {
private final FileAppender<StructLike> appender;
private final FileFormat format;
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
new file mode 100644
index 0000000000..728eff097e
--- /dev/null
+++
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.CharSequenceWrapper;
+import org.roaringbitmap.longlong.PeekableLongIterator;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * A position delete writer that is capable of handling unordered deletes
without rows.
+ *
+ * <p>This writer keeps an in-memory bitmap of deleted positions per each seen
data file and flushes
+ * the result into a file when closed. This enables writing position delete
files when the incoming
+ * records are not ordered by file and position as required by the spec. If
the incoming deletes are
+ * ordered by an external process, use {@link PositionDeleteWriter} instead.
+ *
+ * <p>Note this writer stores only positions. It does not store deleted
records.
+ */
+public class SortingPositionOnlyDeleteWriter<T>
+ implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
+
+ private final FileWriter<PositionDelete<T>, DeleteWriteResult> writer;
+ private final Map<CharSequenceWrapper, Roaring64Bitmap> positionsByPath;
+ private final CharSequenceWrapper pathWrapper;
+ private DeleteWriteResult result = null;
+
+ public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>,
DeleteWriteResult> writer) {
+ this.writer = writer;
+ this.positionsByPath = Maps.newHashMap();
+ this.pathWrapper = CharSequenceWrapper.wrap(null);
+ }
+
+ @Override
+ public void write(PositionDelete<T> positionDelete) {
+ CharSequence path = positionDelete.path();
+ long position = positionDelete.pos();
+ Roaring64Bitmap positions = positionsByPath.get(pathWrapper.set(path));
+ if (positions != null) {
+ positions.add(position);
+ } else {
+ positions = new Roaring64Bitmap();
+ positions.add(position);
+ positionsByPath.put(CharSequenceWrapper.wrap(path), positions);
+ }
+ }
+
+ @Override
+ public long length() {
+ return writer.length();
+ }
+
+ @Override
+ public DeleteWriteResult result() {
+ return result;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (result == null) {
+ this.result = writeDeletes();
+ }
+ }
+
+ private DeleteWriteResult writeDeletes() throws IOException {
+ try {
+ PositionDelete<T> positionDelete = PositionDelete.create();
+ for (CharSequenceWrapper path : sortedPaths()) {
+ // the iterator provides values in ascending sorted order
+ PeekableLongIterator positions =
positionsByPath.get(path).getLongIterator();
+ while (positions.hasNext()) {
+ long position = positions.next();
+ writer.write(positionDelete.set(path.get(), position, null /* no row
*/));
+ }
+ }
+ } finally {
+ writer.close();
+ }
+
+ return writer.result();
+ }
+
+ private List<CharSequenceWrapper> sortedPaths() {
+ List<CharSequenceWrapper> paths =
Lists.newArrayList(positionsByPath.keySet());
+ paths.sort(Comparators.charSequences());
+ return paths;
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
new file mode 100644
index 0000000000..15c1cc6bb4
--- /dev/null
+++
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
+
+/**
+ * A position delete writer capable of writing to multiple specs and
partitions if the incoming
+ * stream of deletes is not ordered. If the incoming records are ordered by an
external process, use
+ * {@link ClusteredPositionDeleteWriter} instead.
+ *
+ * <p>Note this writer stores only positions. It does not store deleted
records.
+ */
+public class FanoutPositionOnlyDeleteWriter<T>
+ extends FanoutWriter<PositionDelete<T>, DeleteWriteResult> {
+
+ private final FileWriterFactory<T> writerFactory;
+ private final OutputFileFactory fileFactory;
+ private final FileIO io;
+ private final long targetFileSizeInBytes;
+ private final List<DeleteFile> deleteFiles;
+ private final CharSequenceSet referencedDataFiles;
+
+ public FanoutPositionOnlyDeleteWriter(
+ FileWriterFactory<T> writerFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSizeInBytes) {
+ this.writerFactory = writerFactory;
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.deleteFiles = Lists.newArrayList();
+ this.referencedDataFiles = CharSequenceSet.empty();
+ }
+
+ @Override
+ protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(
+ PartitionSpec spec, StructLike partition) {
+ FileWriter<PositionDelete<T>, DeleteWriteResult> delegate =
+ new RollingPositionDeleteWriter<>(
+ writerFactory, fileFactory, io, targetFileSizeInBytes, spec,
partition);
+ return new SortingPositionOnlyDeleteWriter<>(delegate);
+ }
+
+ @Override
+ protected void addResult(DeleteWriteResult result) {
+ deleteFiles.addAll(result.deleteFiles());
+ referencedDataFiles.addAll(result.referencedDataFiles());
+ }
+
+ @Override
+ protected DeleteWriteResult aggregatedResult() {
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+ }
+}
diff --git
a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
index 3de86e4ff4..32a8c8d2cc 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -462,4 +462,99 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, "bbb"), toRow(4,
"bbb"), toRow(5, "ccc"));
Assert.assertEquals("Records should match", toSet(expectedRows),
actualRowSet("*"));
}
+
+ @Test
+ public void testFanoutPositionOnlyDeleteWriterNoRecords() throws IOException
{
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ FanoutPositionOnlyDeleteWriter<T> writer =
+ new FanoutPositionOnlyDeleteWriter<>(
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+ }
+
+ @Test
+ public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws
IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // add an unpartitioned data file
+ ImmutableList<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2,
"aaa"), toRow(11, "aaa"));
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile1).commit();
+
+ // partition by bucket
+ table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
+
+ // add a data file partitioned by bucket
+ ImmutableList<T> rows2 = ImmutableList.of(toRow(3, "bbb"), toRow(4,
"bbb"), toRow(12, "bbb"));
+ DataFile dataFile2 =
+ writeData(
+ writerFactory, fileFactory, rows2, table.spec(),
partitionKey(table.spec(), "bbb"));
+ table.newFastAppend().appendFile(dataFile2).commit();
+
+ // partition by data
+ table
+ .updateSpec()
+ .removeField(Expressions.bucket("data", 16))
+ .addField(Expressions.ref("data"))
+ .commit();
+
+ // add a data file partitioned by data
+ ImmutableList<T> rows3 = ImmutableList.of(toRow(5, "ccc"), toRow(13,
"ccc"));
+ DataFile dataFile3 =
+ writeData(
+ writerFactory, fileFactory, rows3, table.spec(),
partitionKey(table.spec(), "ccc"));
+ table.newFastAppend().appendFile(dataFile3).commit();
+
+ FanoutPositionOnlyDeleteWriter<T> writer =
+ new FanoutPositionOnlyDeleteWriter<>(
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+
+ PartitionSpec unpartitionedSpec = table.specs().get(0);
+ PartitionSpec bucketSpec = table.specs().get(1);
+ PartitionSpec identitySpec = table.specs().get(2);
+
+ writer.write(positionDelete(dataFile1.path(), 1L, null),
unpartitionedSpec, null);
+ writer.write(
+ positionDelete(dataFile2.path(), 1L, null), bucketSpec,
partitionKey(bucketSpec, "bbb"));
+ writer.write(
+ positionDelete(dataFile2.path(), 0L, null), bucketSpec,
partitionKey(bucketSpec, "bbb"));
+ writer.write(
+ positionDelete(dataFile3.path(), 1L, null),
+ identitySpec,
+ partitionKey(identitySpec, "ccc"));
+ writer.write(
+ positionDelete(dataFile3.path(), 2L, null),
+ identitySpec,
+ partitionKey(identitySpec, "ccc"));
+ writer.write(positionDelete(dataFile1.path(), 0L, null),
unpartitionedSpec, null);
+ writer.write(
+ positionDelete(dataFile3.path(), 0L, null),
+ identitySpec,
+ partitionKey(identitySpec, "ccc"));
+ writer.write(positionDelete(dataFile1.path(), 2L, null),
unpartitionedSpec, null);
+
+ writer.close();
+
+ DeleteWriteResult result = writer.result();
+ Assert.assertEquals("Must be 3 delete files", 3,
result.deleteFiles().size());
+ Assert.assertEquals(
+ "Must reference 3 data files", 3,
writer.result().referencedDataFiles().size());
+ Assert.assertTrue("Must reference data files",
writer.result().referencesDataFiles());
+
+ RowDelta rowDelta = table.newRowDelta();
+ result.deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+
+ List<T> expectedRows = ImmutableList.of(toRow(12, "bbb"));
+ Assert.assertEquals("Records should match", toSet(expectedRows),
actualRowSet("*"));
+ }
}
diff --git
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 13ff034e4b..094fd58443 100644
---
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -22,9 +22,11 @@ import static
org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import java.io.IOException;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
@@ -37,8 +39,8 @@ import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.ClusteredDataWriter;
import org.apache.iceberg.io.ClusteredEqualityDeleteWriter;
import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
-import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.FanoutDataWriter;
+import org.apache.iceberg.io.FanoutPositionOnlyDeleteWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
@@ -52,7 +54,9 @@ import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
@@ -62,6 +66,9 @@ import org.openjdk.jmh.infra.Blackhole;
public abstract class WritersBenchmark extends IcebergSourceBenchmark {
private static final int NUM_ROWS = 2500000;
+ private static final int NUM_DATA_FILES_PER_POSITION_DELETE_FILE = 100;
+ private static final int NUM_DELETED_POSITIONS_PER_DATA_FILE = 50_000;
+ private static final int DELETE_POSITION_STEP = 10;
private static final long TARGET_FILE_SIZE_IN_BYTES = 50L * 1024 * 1024;
private static final Schema SCHEMA =
@@ -76,6 +83,7 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
private Iterable<InternalRow> rows;
private Iterable<InternalRow> positionDeleteRows;
+ private Iterable<InternalRow> shuffledPositionDeleteRows;
private PartitionSpec unpartitionedSpec;
private PartitionSpec partitionedSpec;
@@ -93,14 +101,32 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
row ->
transform.bind(Types.IntegerType.get()).apply(row.getInt(1))));
this.rows = data;
- this.positionDeleteRows =
- RandomData.generateSpark(DeleteSchemaUtil.pathPosSchema(), NUM_ROWS,
0L);
+ this.positionDeleteRows = generatePositionDeletes(false /* no shuffle */);
+ this.shuffledPositionDeleteRows = generatePositionDeletes(true /* shuffle
*/);
this.unpartitionedSpec = table().specs().get(0);
Preconditions.checkArgument(unpartitionedSpec.isUnpartitioned());
this.partitionedSpec = table().specs().get(1);
}
+ private Iterable<InternalRow> generatePositionDeletes(boolean shuffle) {
+ int numDeletes = NUM_DATA_FILES_PER_POSITION_DELETE_FILE *
NUM_DELETED_POSITIONS_PER_DATA_FILE;
+ List<InternalRow> deletes = Lists.newArrayListWithExpectedSize(numDeletes);
+
+ for (int pathIndex = 0; pathIndex <
NUM_DATA_FILES_PER_POSITION_DELETE_FILE; pathIndex++) {
+ UTF8String path = UTF8String.fromString("path/to/position/delete/file/"
+ UUID.randomUUID());
+ for (long pos = 0; pos < NUM_DELETED_POSITIONS_PER_DATA_FILE; pos++) {
+ deletes.add(new GenericInternalRow(new Object[] {path, pos *
DELETE_POSITION_STEP}));
+ }
+ }
+
+ if (shuffle) {
+ Collections.shuffle(deletes);
+ }
+
+ return deletes;
+ }
+
@TearDown
public void tearDownBenchmark() throws IOException {
tearDownSpark();
@@ -363,6 +389,60 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
blackhole.consume(writer);
}
+ @Benchmark
+ @Threads(1)
+ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole
blackhole) throws IOException {
+ FileIO io = table().io();
+
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory =
+
SparkFileWriterFactory.builderFor(table()).dataFileFormat(fileFormat()).build();
+
+ FanoutPositionOnlyDeleteWriter<InternalRow> writer =
+ new FanoutPositionOnlyDeleteWriter<>(
+ writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+
+ PositionDelete<InternalRow> positionDelete = PositionDelete.create();
+ try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer)
{
+ for (InternalRow row : positionDeleteRows) {
+ String path = row.getString(0);
+ long pos = row.getLong(1);
+ positionDelete.set(path, pos, null);
+ closeableWriter.write(positionDelete, unpartitionedSpec, null);
+ }
+ }
+
+ blackhole.consume(writer);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole
blackhole)
+ throws IOException {
+
+ FileIO io = table().io();
+
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory =
+
SparkFileWriterFactory.builderFor(table()).dataFileFormat(fileFormat()).build();
+
+ FanoutPositionOnlyDeleteWriter<InternalRow> writer =
+ new FanoutPositionOnlyDeleteWriter<>(
+ writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+
+ PositionDelete<InternalRow> positionDelete = PositionDelete.create();
+ try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer)
{
+ for (InternalRow row : shuffledPositionDeleteRows) {
+ String path = row.getString(0);
+ long pos = row.getLong(1);
+ positionDelete.set(path, pos, null);
+ closeableWriter.write(positionDelete, unpartitionedSpec, null);
+ }
+ }
+
+ blackhole.consume(writer);
+ }
+
private OutputFileFactory newFileFactory() {
return OutputFileFactory.builderFor(table(), 1,
1).format(fileFormat()).build();
}