This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e7999a194d Core, Data, Spark 3.5: Support file and partition delete
granularity (#9384)
e7999a194d is described below
commit e7999a194dc0f32fba8fb515c9108764ed6ba6b5
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Jan 2 22:26:59 2024 +0100
Core, Data, Spark 3.5: Support file and partition delete granularity (#9384)
---
.../org/apache/iceberg/util/CharSequenceUtil.java | 45 +++++
.../java/org/apache/iceberg/TableProperties.java | 4 +
.../apache/iceberg/deletes/DeleteGranularity.java | 71 ++++++++
.../deletes/FileScopedPositionDeleteWriter.java | 113 +++++++++++++
.../deletes/SortingPositionOnlyDeleteWriter.java | 67 ++++++--
.../iceberg/io/ClusteredPositionDeleteWriter.java | 25 +++
.../iceberg/io/FanoutPositionOnlyDeleteWriter.java | 21 ++-
.../apache/iceberg/io/TestPartitioningWriters.java | 188 +++++++++++++++++++--
.../spark/extensions/TestMergeOnReadDelete.java | 45 +++++
.../spark/extensions/TestMergeOnReadMerge.java | 57 +++++++
.../spark/extensions/TestMergeOnReadUpdate.java | 55 ++++++
.../iceberg/spark/source/WritersBenchmark.java | 52 +++++-
.../org/apache/iceberg/spark/SparkWriteConf.java | 12 ++
.../apache/iceberg/spark/SparkWriteOptions.java | 3 +
.../spark/source/SparkPositionDeletesRewrite.java | 15 +-
.../spark/source/SparkPositionDeltaWrite.java | 14 +-
.../apache/iceberg/spark/TestSparkWriteConf.java | 57 +++++++
.../TestRewritePositionDeleteFilesAction.java | 37 ++++
18 files changed, 846 insertions(+), 35 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java
b/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java
new file mode 100644
index 0000000000..b28c90df17
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+public class CharSequenceUtil {
+
+ private CharSequenceUtil() {}
+
+ public static boolean unequalPaths(CharSequence s1, CharSequence s2) {
+ if (s1 == s2) {
+ return false;
+ }
+
+ int s1Length = s1.length();
+ int s2Length = s2.length();
+
+ if (s1Length != s2Length) {
+ return true;
+ }
+
+ for (int index = s1Length - 1; index >= 0; index--) {
+ if (s1.charAt(index) != s2.charAt(index)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 84188c0f78..2267ba03fd 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
import java.util.Set;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
public class TableProperties {
@@ -334,6 +335,9 @@ public class TableProperties {
public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms";
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE;
+ public static final String DELETE_GRANULARITY = "write.delete.granularity";
+ public static final String DELETE_GRANULARITY_DEFAULT =
DeleteGranularity.PARTITION.toString();
+
public static final String DELETE_ISOLATION_LEVEL =
"write.delete.isolation-level";
public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java
b/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java
new file mode 100644
index 0000000000..c225192fa1
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * An enum that represents the granularity of deletes.
+ *
+ * <p>Under partition granularity, delete writers are directed to group
deletes for different data
+ * files into one delete file. This strategy tends to reduce the total number
of delete files in the
+ * table. However, a scan for a single data file will require reading delete
information for
+ * multiple data files even if those other files are not required for the
scan. All irrelevant
+ * deletes will be discarded by readers but reading this extra information
will cause overhead. The
+ * overhead can potentially be mitigated via delete file caching.
+ *
+ * <p>Under file granularity, delete writers always organize deletes by their
target data file,
+ * creating separate delete files for each referenced data file. This strategy
ensures the job
+ * planning does not assign irrelevant deletes to data files and readers only
load necessary delete
+ * information. However, it also increases the total number of delete files in
the table and may
+ * require a more aggressive approach for delete file compaction.
+ *
+ * <p>Currently, this configuration is only applicable to position deletes.
+ *
+ * <p>Each granularity has its own benefits and drawbacks and should be picked
based on a use case.
+ * Regular delete compaction is still required regardless of which granularity
is chosen. It is also
+ * possible to use one granularity for ingestion and another one for table
maintenance.
+ */
+public enum DeleteGranularity {
+ FILE,
+ PARTITION;
+
+ @Override
+ public String toString() {
+ switch (this) {
+ case FILE:
+ return "file";
+ case PARTITION:
+ return "partition";
+ default:
+ throw new IllegalArgumentException("Unknown delete granularity: " +
this);
+ }
+ }
+
+ public static DeleteGranularity fromString(String valueAsString) {
+ Preconditions.checkArgument(valueAsString != null, "Value is null");
+ if (FILE.toString().equalsIgnoreCase(valueAsString)) {
+ return FILE;
+ } else if (PARTITION.toString().equalsIgnoreCase(valueAsString)) {
+ return PARTITION;
+ } else {
+ throw new IllegalArgumentException("Unknown delete granularity: " +
valueAsString);
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java
new file mode 100644
index 0000000000..d85a5645fb
--- /dev/null
+++
b/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.CharSequenceUtil;
+
+/**
+ * A position delete writer that produces a separate delete file for each
referenced data file.
+ *
+ * <p>This writer does not keep track of seen deletes and assumes all incoming
records are ordered
+ * by file and position as required by the spec. If there is no external
process to order the
+ * records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
+ */
+public class FileScopedPositionDeleteWriter<T>
+ implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
+
+ private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>>
writers;
+ private final List<DeleteFile> deleteFiles;
+ private final CharSequenceSet referencedDataFiles;
+
+ private FileWriter<PositionDelete<T>, DeleteWriteResult> currentWriter =
null;
+ private CharSequence currentPath = null;
+ private boolean closed = false;
+
+ public FileScopedPositionDeleteWriter(
+ Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers) {
+ this.writers = writers;
+ this.deleteFiles = Lists.newArrayList();
+ this.referencedDataFiles = CharSequenceSet.empty();
+ }
+
+ @Override
+ public void write(PositionDelete<T> positionDelete) {
+ writer(positionDelete.path()).write(positionDelete);
+ }
+
+ private FileWriter<PositionDelete<T>, DeleteWriteResult> writer(CharSequence
path) {
+ if (currentWriter == null) {
+ openCurrentWriter(path);
+ } else if (CharSequenceUtil.unequalPaths(currentPath, path)) {
+ closeCurrentWriter();
+ openCurrentWriter(path);
+ }
+
+ return currentWriter;
+ }
+
+ @Override
+ public long length() {
+ throw new UnsupportedOperationException(getClass().getName() + " does not
implement length");
+ }
+
+ @Override
+ public DeleteWriteResult result() {
+ Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closeCurrentWriter();
+ this.closed = true;
+ }
+ }
+
+ private void openCurrentWriter(CharSequence path) {
+ Preconditions.checkState(!closed, "Writer has already been closed");
+ this.currentWriter = writers.get();
+ this.currentPath = path;
+ }
+
+ private void closeCurrentWriter() {
+ if (currentWriter != null) {
+ try {
+ currentWriter.close();
+ DeleteWriteResult result = currentWriter.result();
+ deleteFiles.addAll(result.deleteFiles());
+ referencedDataFiles.addAll(result.referencedDataFiles());
+ this.currentWriter = null;
+ this.currentPath = null;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close current writer", e);
+ }
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
index 8819640e9f..3fc6c5eec9 100644
---
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
+++
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
@@ -19,12 +19,17 @@
package org.apache.iceberg.deletes;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.CharSequenceSet;
import org.roaringbitmap.longlong.PeekableLongIterator;
import org.roaringbitmap.longlong.Roaring64Bitmap;
@@ -41,12 +46,20 @@ import org.roaringbitmap.longlong.Roaring64Bitmap;
public class SortingPositionOnlyDeleteWriter<T>
implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
- private final FileWriter<PositionDelete<T>, DeleteWriteResult> writer;
+ private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>>
writers;
+ private final DeleteGranularity granularity;
private final CharSequenceMap<Roaring64Bitmap> positionsByPath;
private DeleteWriteResult result = null;
public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>,
DeleteWriteResult> writer) {
- this.writer = writer;
+ this(() -> writer, DeleteGranularity.PARTITION);
+ }
+
+ public SortingPositionOnlyDeleteWriter(
+ Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
+ DeleteGranularity granularity) {
+ this.writers = writers;
+ this.granularity = granularity;
this.positionsByPath = CharSequenceMap.create();
}
@@ -60,7 +73,7 @@ public class SortingPositionOnlyDeleteWriter<T>
@Override
public long length() {
- return writer.length();
+ throw new UnsupportedOperationException(getClass().getName() + " does not
implement length");
}
@Override
@@ -71,14 +84,44 @@ public class SortingPositionOnlyDeleteWriter<T>
@Override
public void close() throws IOException {
if (result == null) {
- this.result = writeDeletes();
+ switch (granularity) {
+ case FILE:
+ this.result = writeFileDeletes();
+ return;
+ case PARTITION:
+ this.result = writePartitionDeletes();
+ return;
+ default:
+ throw new UnsupportedOperationException("Unsupported delete
granularity: " + granularity);
+ }
+ }
+ }
+
+ // write deletes for all data files together
+ private DeleteWriteResult writePartitionDeletes() throws IOException {
+ return writeDeletes(positionsByPath.keySet());
+ }
+
+ // write deletes for different data files into distinct delete files
+ private DeleteWriteResult writeFileDeletes() throws IOException {
+ List<DeleteFile> deleteFiles = Lists.newArrayList();
+ CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
+
+ for (CharSequence path : positionsByPath.keySet()) {
+ DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path));
+ deleteFiles.addAll(writeResult.deleteFiles());
+ referencedDataFiles.addAll(writeResult.referencedDataFiles());
}
+
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles);
}
- private DeleteWriteResult writeDeletes() throws IOException {
+ private DeleteWriteResult writeDeletes(Collection<CharSequence> paths)
throws IOException {
+ FileWriter<PositionDelete<T>, DeleteWriteResult> writer = writers.get();
+
try {
PositionDelete<T> positionDelete = PositionDelete.create();
- for (CharSequence path : sortedPaths()) {
+ for (CharSequence path : sort(paths)) {
// the iterator provides values in ascending sorted order
PeekableLongIterator positions =
positionsByPath.get(path).getLongIterator();
while (positions.hasNext()) {
@@ -93,9 +136,13 @@ public class SortingPositionOnlyDeleteWriter<T>
return writer.result();
}
- private List<CharSequence> sortedPaths() {
- List<CharSequence> paths = Lists.newArrayList(positionsByPath.keySet());
- paths.sort(Comparators.charSequences());
- return paths;
+ private Collection<CharSequence> sort(Collection<CharSequence> paths) {
+ if (paths.size() <= 1) {
+ return paths;
+ } else {
+ List<CharSequence> sortedPaths = Lists.newArrayList(paths);
+ sortedPaths.sort(Comparators.charSequences());
+ return sortedPaths;
+ }
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
index c0c26c2b90..c9d911894c 100644
---
a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
+++
b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
@@ -22,6 +22,8 @@ import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.deletes.FileScopedPositionDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
@@ -38,6 +40,7 @@ public class ClusteredPositionDeleteWriter<T>
private final OutputFileFactory fileFactory;
private final FileIO io;
private final long targetFileSizeInBytes;
+ private final DeleteGranularity granularity;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;
@@ -46,10 +49,20 @@ public class ClusteredPositionDeleteWriter<T>
OutputFileFactory fileFactory,
FileIO io,
long targetFileSizeInBytes) {
+ this(writerFactory, fileFactory, io, targetFileSizeInBytes,
DeleteGranularity.PARTITION);
+ }
+
+ public ClusteredPositionDeleteWriter(
+ FileWriterFactory<T> writerFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSizeInBytes,
+ DeleteGranularity granularity) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.granularity = granularity;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
}
@@ -57,6 +70,18 @@ public class ClusteredPositionDeleteWriter<T>
@Override
protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(
PartitionSpec spec, StructLike partition) {
+ switch (granularity) {
+ case FILE:
+ return new FileScopedPositionDeleteWriter<>(() ->
newRollingWriter(spec, partition));
+ case PARTITION:
+ return newRollingWriter(spec, partition);
+ default:
+ throw new UnsupportedOperationException("Unsupported delete
granularity: " + granularity);
+ }
+ }
+
+ private RollingPositionDeleteWriter<T> newRollingWriter(
+ PartitionSpec spec, StructLike partition) {
return new RollingPositionDeleteWriter<>(
writerFactory, fileFactory, io, targetFileSizeInBytes, spec,
partition);
}
diff --git
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
index 15c1cc6bb4..c6a55064b7 100644
---
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
+++
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -41,6 +42,7 @@ public class FanoutPositionOnlyDeleteWriter<T>
private final OutputFileFactory fileFactory;
private final FileIO io;
private final long targetFileSizeInBytes;
+ private final DeleteGranularity granularity;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;
@@ -49,10 +51,20 @@ public class FanoutPositionOnlyDeleteWriter<T>
OutputFileFactory fileFactory,
FileIO io,
long targetFileSizeInBytes) {
+ this(writerFactory, fileFactory, io, targetFileSizeInBytes,
DeleteGranularity.PARTITION);
+ }
+
+ public FanoutPositionOnlyDeleteWriter(
+ FileWriterFactory<T> writerFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSizeInBytes,
+ DeleteGranularity granularity) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.granularity = granularity;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
}
@@ -60,10 +72,11 @@ public class FanoutPositionOnlyDeleteWriter<T>
@Override
protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(
PartitionSpec spec, StructLike partition) {
- FileWriter<PositionDelete<T>, DeleteWriteResult> delegate =
- new RollingPositionDeleteWriter<>(
- writerFactory, fileFactory, io, targetFileSizeInBytes, spec,
partition);
- return new SortingPositionOnlyDeleteWriter<>(delegate);
+ return new SortingPositionOnlyDeleteWriter<>(
+ () ->
+ new RollingPositionDeleteWriter<>(
+ writerFactory, fileFactory, io, targetFileSizeInBytes, spec,
partition),
+ granularity);
}
@Override
diff --git
a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
index 32a8c8d2cc..d767743262 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.io;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -26,6 +28,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -278,11 +281,21 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
}
@Test
- public void testClusteredPositionDeleteWriterNoRecords() throws IOException {
+ public void testClusteredPositionDeleteWriterNoRecordsPartitionGranularity()
throws IOException {
+ checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterNoRecordsFileGranularity()
throws IOException {
+ checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity.FILE);
+ }
+
+ private void checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity
deleteGranularity)
+ throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
ClusteredPositionDeleteWriter<T> writer =
new ClusteredPositionDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE,
deleteGranularity);
writer.close();
Assert.assertEquals(0, writer.result().deleteFiles().size());
@@ -296,7 +309,18 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
}
@Test
- public void testClusteredPositionDeleteWriterMultipleSpecs() throws
IOException {
+ public void
testClusteredPositionDeleteWriterMultipleSpecsPartitionGranularity()
+ throws IOException {
+
checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterMultipleSpecsFileGranularity()
throws IOException {
+ checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity.FILE);
+ }
+
+ private void
checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity
deleteGranularity)
+ throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
// add an unpartitioned data file
@@ -330,7 +354,7 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
ClusteredPositionDeleteWriter<T> writer =
new ClusteredPositionDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE,
deleteGranularity);
PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
@@ -364,7 +388,19 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
}
@Test
- public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions()
throws IOException {
+ public void
testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitionsPartitionGranularity()
+ throws IOException {
+
checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void
testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitionsFileGranularity()
+ throws IOException {
+
checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(DeleteGranularity.FILE);
+ }
+
+ private void checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(
+ DeleteGranularity deleteGranularity) throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
@@ -377,7 +413,7 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
ClusteredPositionDeleteWriter<T> writer =
new ClusteredPositionDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE,
deleteGranularity);
PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
@@ -419,6 +455,61 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
writer.close();
}
+ @Test
+ public void testClusteredPositionDeleteWriterPartitionGranularity() throws
IOException {
+ checkClusteredPositionDeleteWriterGranularity(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterFileGranularity() throws
IOException {
+ checkClusteredPositionDeleteWriterGranularity(DeleteGranularity.FILE);
+ }
+
+ private void checkClusteredPositionDeleteWriterGranularity(DeleteGranularity
deleteGranularity)
+ throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // add the first data file
+ List<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"),
toRow(11, "aaa"));
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile1).commit();
+
+ // add the second data file
+ List<T> rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"),
toRow(12, "aaa"));
+ DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile2).commit();
+
+ // init the delete writer
+ ClusteredPositionDeleteWriter<T> writer =
+ new ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE,
deleteGranularity);
+
+ PartitionSpec spec = table.spec();
+
+ // write deletes for both data files
+ writer.write(positionDelete(dataFile1.path(), 0L, null), spec, null);
+ writer.write(positionDelete(dataFile1.path(), 1L, null), spec, null);
+ writer.write(positionDelete(dataFile2.path(), 0L, null), spec, null);
+ writer.write(positionDelete(dataFile2.path(), 1L, null), spec, null);
+ writer.close();
+
+ // verify the writer result
+ DeleteWriteResult result = writer.result();
+ int expectedNumDeleteFiles = deleteGranularity == DeleteGranularity.FILE ?
2 : 1;
+ assertThat(result.deleteFiles()).hasSize(expectedNumDeleteFiles);
+ assertThat(result.referencedDataFiles()).hasSize(2);
+ assertThat(result.referencesDataFiles()).isTrue();
+
+ // commit the deletes
+ RowDelta rowDelta = table.newRowDelta();
+ result.deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+
+ // verify correctness
+ List<T> expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12,
"aaa"));
+ assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
+ }
+
@Test
public void testFanoutDataWriterNoRecords() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
@@ -464,11 +555,21 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
}
@Test
- public void testFanoutPositionOnlyDeleteWriterNoRecords() throws IOException
{
+ public void
testFanoutPositionOnlyDeleteWriterNoRecordsPartitionGranularity() throws
IOException {
+ checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testFanoutPositionOnlyDeleteWriterNoRecordsFileGranularity()
throws IOException {
+ checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity.FILE);
+ }
+
+ private void checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity
deleteGranularity)
+ throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
FanoutPositionOnlyDeleteWriter<T> writer =
new FanoutPositionOnlyDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE,
deleteGranularity);
writer.close();
Assert.assertEquals(0, writer.result().deleteFiles().size());
@@ -482,7 +583,19 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
}
@Test
- public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws
IOException {
+ public void
testFanoutPositionOnlyDeleteWriterOutOfOrderRecordsPartitionGranularity()
+ throws IOException {
+
checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void
testFanoutPositionOnlyDeleteWriterOutOfOrderRecordsFileGranularity()
+ throws IOException {
+
checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(DeleteGranularity.FILE);
+ }
+
+ private void checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(
+ DeleteGranularity deleteGranularity) throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
// add an unpartitioned data file
@@ -516,7 +629,7 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
FanoutPositionOnlyDeleteWriter<T> writer =
new FanoutPositionOnlyDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE,
deleteGranularity);
PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
@@ -557,4 +670,59 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
List<T> expectedRows = ImmutableList.of(toRow(12, "bbb"));
Assert.assertEquals("Records should match", toSet(expectedRows),
actualRowSet("*"));
}
+
+ @Test
+ public void testFanoutPositionOnlyDeleteWriterPartitionGranularity() throws
IOException {
+
checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testFanoutPositionOnlyDeleteWriterFileGranularity() throws
IOException {
+ checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity.FILE);
+ }
+
+ private void
checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity
deleteGranularity)
+ throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // add the first data file
+ List<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"),
toRow(11, "aaa"));
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile1).commit();
+
+ // add the second data file
+ List<T> rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"),
toRow(12, "aaa"));
+ DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile2).commit();
+
+ // init the delete writer
+ FanoutPositionOnlyDeleteWriter<T> writer =
+ new FanoutPositionOnlyDeleteWriter<>(
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE,
deleteGranularity);
+
+ PartitionSpec spec = table.spec();
+
+ // write deletes for both data files (the order of records is mixed)
+ writer.write(positionDelete(dataFile1.path(), 1L, null), spec, null);
+ writer.write(positionDelete(dataFile2.path(), 0L, null), spec, null);
+ writer.write(positionDelete(dataFile1.path(), 0L, null), spec, null);
+ writer.write(positionDelete(dataFile2.path(), 1L, null), spec, null);
+ writer.close();
+
+ // verify the writer result
+ DeleteWriteResult result = writer.result();
+ int expectedNumDeleteFiles = deleteGranularity == DeleteGranularity.FILE ?
2 : 1;
+ assertThat(result.deleteFiles()).hasSize(expectedNumDeleteFiles);
+ assertThat(result.referencedDataFiles()).hasSize(2);
+ assertThat(result.referencesDataFiles()).isTrue();
+
+ // commit the deletes
+ RowDelta rowDelta = table.newRowDelta();
+ result.deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+
+ // verify correctness
+ List<T> expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12,
"aaa"));
+ assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
+ }
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index 9c0e8235f8..01f24c4dfe 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -28,14 +29,18 @@ import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.TestSparkCatalog;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
@@ -80,6 +85,46 @@ public class TestMergeOnReadDelete extends TestDelete {
TestSparkCatalog.clearTables();
}
+ @Test
+ public void testDeleteFileGranularity() throws NoSuchTableException {
+ checkDeleteFileGranularity(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testDeletePartitionGranularity() throws NoSuchTableException {
+ checkDeleteFileGranularity(DeleteGranularity.PARTITION);
+ }
+
+ private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
+ throws NoSuchTableException {
+ createAndInitPartitionedTable();
+
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
+ tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
+
+ append(tableName, new Employee(1, "hr"), new Employee(2, "hr"));
+ append(tableName, new Employee(3, "hr"), new Employee(4, "hr"));
+ append(tableName, new Employee(1, "hardware"), new Employee(2,
"hardware"));
+ append(tableName, new Employee(3, "hardware"), new Employee(4,
"hardware"));
+
+ createBranchIfNeeded();
+
+ sql("DELETE FROM %s WHERE id = 1 OR id = 3", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.snapshots()).hasSize(5);
+
+ Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+ String expectedDeleteFilesCount = deleteGranularity ==
DeleteGranularity.FILE ? "4" : "2";
+ validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, null);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(2, "hardware"), row(2, "hr"), row(4, "hardware"),
row(4, "hr")),
+ sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
+ }
+
@Test
public void testCommitUnknownException() {
createAndInitTable("id INT, dep STRING, category STRING");
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
index e743b32b45..f9c13d828c 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
@@ -18,11 +18,20 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Encoders;
+import org.junit.Test;
public class TestMergeOnReadMerge extends TestMerge {
@@ -56,4 +65,52 @@ public class TestMergeOnReadMerge extends TestMerge {
TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}
+
+ @Test
+ public void testMergeDeleteFileGranularity() {
+ checkMergeDeleteGranularity(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testMergeDeletePartitionGranularity() {
+ checkMergeDeleteGranularity(DeleteGranularity.PARTITION);
+ }
+
+ private void checkMergeDeleteGranularity(DeleteGranularity
deleteGranularity) {
+ createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /*
empty */);
+
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
+ tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
+
+ append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2,
\"dep\": \"hr\" }");
+ append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4,
\"dep\": \"hr\" }");
+ append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2,
\"dep\": \"it\" }");
+ append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4,
\"dep\": \"it\" }");
+
+ createBranchIfNeeded();
+
+ createOrReplaceView("source", ImmutableList.of(1, 3, 5), Encoders.INT());
+
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, dep) VALUES (-1, 'other')",
+ commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.snapshots()).hasSize(5);
+
+ Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+ String expectedDeleteFilesCount = deleteGranularity ==
DeleteGranularity.FILE ? "4" : "2";
+ validateMergeOnRead(currentSnapshot, "3", expectedDeleteFilesCount, "1");
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(-1, "other"), row(2, "hr"), row(2, "it"), row(4,
"hr"), row(4, "it")),
+ sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
+ }
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index 0207d4ce4d..45ef343b2d 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -18,11 +18,19 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.junit.Test;
public class TestMergeOnReadUpdate extends TestUpdate {
@@ -56,4 +64,51 @@ public class TestMergeOnReadUpdate extends TestUpdate {
TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}
+
+ @Test
+ public void testUpdateFileGranularity() {
+ checkUpdateFileGranularity(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testUpdatePartitionGranularity() {
+ checkUpdateFileGranularity(DeleteGranularity.PARTITION);
+ }
+
+ private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity)
{
+ createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /*
empty */);
+
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
+ tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
+
+ append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2,
\"dep\": \"hr\" }");
+ append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4,
\"dep\": \"hr\" }");
+ append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2,
\"dep\": \"it\" }");
+ append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4,
\"dep\": \"it\" }");
+
+ createBranchIfNeeded();
+
+ sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.snapshots()).hasSize(5);
+
+ Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+ String expectedDeleteFilesCount = deleteGranularity ==
DeleteGranularity.FILE ? "4" : "2";
+ validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(0, "hr"),
+ row(2, "hr"),
+ row(2, "hr"),
+ row(4, "hr"),
+ row(0, "it"),
+ row(2, "it"),
+ row(2, "it"),
+ row(4, "it")),
+ sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
+ }
}
diff --git
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 094fd58443..71813c5a63 100644
---
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
@@ -364,8 +365,20 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
@Benchmark
@Threads(1)
- public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole
blackhole)
+ public void
writeUnpartitionedClusteredPositionDeleteWriterPartitionGranularity(
+ Blackhole blackhole) throws IOException {
+ writeUnpartitionedClusteredPositionDeleteWriter(blackhole,
DeleteGranularity.PARTITION);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void
writeUnpartitionedClusteredPositionDeleteWriterFileGranularity(Blackhole
blackhole)
throws IOException {
+ writeUnpartitionedClusteredPositionDeleteWriter(blackhole,
DeleteGranularity.FILE);
+ }
+
+ private void writeUnpartitionedClusteredPositionDeleteWriter(
+ Blackhole blackhole, DeleteGranularity deleteGranularity) throws
IOException {
FileIO io = table().io();
OutputFileFactory fileFactory = newFileFactory();
@@ -374,7 +387,7 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
ClusteredPositionDeleteWriter<InternalRow> writer =
new ClusteredPositionDeleteWriter<>(
- writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+ writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES,
deleteGranularity);
PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
@@ -391,7 +404,20 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
@Benchmark
@Threads(1)
- public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole
blackhole) throws IOException {
+ public void
writeUnpartitionedFanoutPositionDeleteWriterPartitionGranularity(Blackhole
blackhole)
+ throws IOException {
+ writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole,
DeleteGranularity.PARTITION);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void
writeUnpartitionedFanoutPositionDeleteWriterFileGranularity(Blackhole blackhole)
+ throws IOException {
+ writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole,
DeleteGranularity.FILE);
+ }
+
+ private void writeUnpartitionedFanoutPositionDeleteWriterPartition(
+ Blackhole blackhole, DeleteGranularity deleteGranularity) throws
IOException {
FileIO io = table().io();
OutputFileFactory fileFactory = newFileFactory();
@@ -400,7 +426,7 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
FanoutPositionOnlyDeleteWriter<InternalRow> writer =
new FanoutPositionOnlyDeleteWriter<>(
- writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+ writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES,
deleteGranularity);
PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer)
{
@@ -417,8 +443,20 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
@Benchmark
@Threads(1)
- public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole
blackhole)
- throws IOException {
+ public void
writeUnpartitionedFanoutPositionDeleteWriterShuffledPartitionGranularity(
+ Blackhole blackhole) throws IOException {
+ writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole,
DeleteGranularity.PARTITION);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void
writeUnpartitionedFanoutPositionDeleteWriterShuffledFileGranularity(
+ Blackhole blackhole) throws IOException {
+ writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole,
DeleteGranularity.FILE);
+ }
+
+ private void writeUnpartitionedFanoutPositionDeleteWriterShuffled(
+ Blackhole blackhole, DeleteGranularity deleteGranularity) throws
IOException {
FileIO io = table().io();
@@ -428,7 +466,7 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
FanoutPositionOnlyDeleteWriter<InternalRow> writer =
new FanoutPositionOnlyDeleteWriter<>(
- writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+ writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES,
deleteGranularity);
PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer)
{
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 1454fc534e..07393a67fe 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -708,4 +709,15 @@ public class SparkWriteConf {
private double shuffleCompressionRatio(FileFormat outputFileFormat, String
outputCodec) {
return SparkCompressionUtil.shuffleCompressionRatio(spark,
outputFileFormat, outputCodec);
}
+
+ public DeleteGranularity deleteGranularity() {
+ String valueAsString =
+ confParser
+ .stringConf()
+ .option(SparkWriteOptions.DELETE_GRANULARITY)
+ .tableProperty(TableProperties.DELETE_GRANULARITY)
+ .defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT)
+ .parse();
+ return DeleteGranularity.fromString(valueAsString);
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 48dfa44c91..d9c4f66b19 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -83,4 +83,7 @@ public class SparkWriteOptions {
// Overrides the advisory partition size
public static final String ADVISORY_PARTITION_SIZE =
"advisory-partition-size";
+
+ // Overrides the delete granularity
+ public static final String DELETE_GRANULARITY = "delete-granularity";
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index a397a069ee..d917794758 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.PositionDeletesTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
import org.apache.iceberg.io.FileIO;
@@ -70,6 +71,7 @@ public class SparkPositionDeletesRewrite implements Write {
private final String queryId;
private final FileFormat format;
private final long targetFileSize;
+ private final DeleteGranularity deleteGranularity;
private final Schema writeSchema;
private final StructType dsSchema;
private final String fileSetId;
@@ -103,6 +105,7 @@ public class SparkPositionDeletesRewrite implements Write {
this.queryId = writeInfo.queryId();
this.format = writeConf.deleteFileFormat();
this.targetFileSize = writeConf.targetDeleteFileSize();
+ this.deleteGranularity = writeConf.deleteGranularity();
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.fileSetId = writeConf.rewrittenFileSetId();
@@ -129,6 +132,7 @@ public class SparkPositionDeletesRewrite implements Write {
queryId,
format,
targetFileSize,
+ deleteGranularity,
writeSchema,
dsSchema,
specId,
@@ -179,6 +183,7 @@ public class SparkPositionDeletesRewrite implements Write {
private final String queryId;
private final FileFormat format;
private final Long targetFileSize;
+ private final DeleteGranularity deleteGranularity;
private final Schema writeSchema;
private final StructType dsSchema;
private final int specId;
@@ -190,6 +195,7 @@ public class SparkPositionDeletesRewrite implements Write {
String queryId,
FileFormat format,
long targetFileSize,
+ DeleteGranularity deleteGranularity,
Schema writeSchema,
StructType dsSchema,
int specId,
@@ -199,6 +205,7 @@ public class SparkPositionDeletesRewrite implements Write {
this.queryId = queryId;
this.format = format;
this.targetFileSize = targetFileSize;
+ this.deleteGranularity = deleteGranularity;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.specId = specId;
@@ -241,6 +248,7 @@ public class SparkPositionDeletesRewrite implements Write {
writerFactoryWithoutRow,
deleteFileFactory,
targetFileSize,
+ deleteGranularity,
dsSchema,
specId,
partition);
@@ -289,6 +297,7 @@ public class SparkPositionDeletesRewrite implements Write {
private final SparkFileWriterFactory writerFactoryWithoutRow;
private final OutputFileFactory deleteFileFactory;
private final long targetFileSize;
+ private final DeleteGranularity deleteGranularity;
private final PositionDelete<InternalRow> positionDelete;
private final FileIO io;
private final PartitionSpec spec;
@@ -322,11 +331,13 @@ public class SparkPositionDeletesRewrite implements Write
{
SparkFileWriterFactory writerFactoryWithoutRow,
OutputFileFactory deleteFileFactory,
long targetFileSize,
+ DeleteGranularity deleteGranularity,
StructType dsSchema,
int specId,
StructLike partition) {
this.deleteFileFactory = deleteFileFactory;
this.targetFileSize = targetFileSize;
+ this.deleteGranularity = deleteGranularity;
this.writerFactoryWithRow = writerFactoryWithRow;
this.writerFactoryWithoutRow = writerFactoryWithoutRow;
this.positionDelete = PositionDelete.create();
@@ -387,7 +398,7 @@ public class SparkPositionDeletesRewrite implements Write {
if (writerWithRow == null) {
this.writerWithRow =
new ClusteredPositionDeleteWriter<>(
- writerFactoryWithRow, deleteFileFactory, io, targetFileSize);
+ writerFactoryWithRow, deleteFileFactory, io, targetFileSize,
deleteGranularity);
}
return writerWithRow;
}
@@ -396,7 +407,7 @@ public class SparkPositionDeletesRewrite implements Write {
if (writerWithoutRow == null) {
this.writerWithoutRow =
new ClusteredPositionDeleteWriter<>(
- writerFactoryWithoutRow, deleteFileFactory, io,
targetFileSize);
+ writerFactoryWithoutRow, deleteFileFactory, io,
targetFileSize, deleteGranularity);
}
return writerWithoutRow;
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index dcc949b290..022283631f 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
@@ -441,11 +442,14 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
FileIO io = table.io();
boolean inputOrdered = context.inputOrdered();
long targetFileSize = context.targetDeleteFileSize();
+ DeleteGranularity deleteGranularity = context.deleteGranularity();
if (inputOrdered) {
- return new ClusteredPositionDeleteWriter<>(writers, files, io,
targetFileSize);
+ return new ClusteredPositionDeleteWriter<>(
+ writers, files, io, targetFileSize, deleteGranularity);
} else {
- return new FanoutPositionOnlyDeleteWriter<>(writers, files, io,
targetFileSize);
+ return new FanoutPositionOnlyDeleteWriter<>(
+ writers, files, io, targetFileSize, deleteGranularity);
}
}
}
@@ -684,6 +688,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
private final StructType metadataSparkType;
private final FileFormat deleteFileFormat;
private final long targetDeleteFileSize;
+ private final DeleteGranularity deleteGranularity;
private final String queryId;
private final boolean useFanoutWriter;
private final boolean inputOrdered;
@@ -700,6 +705,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
this.deleteSparkType = info.rowIdSchema().get();
this.deleteFileFormat = writeConf.deleteFileFormat();
this.targetDeleteFileSize = writeConf.targetDeleteFileSize();
+ this.deleteGranularity = writeConf.deleteGranularity();
this.metadataSparkType = info.metadataSchema().get();
this.queryId = info.queryId();
this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements);
@@ -734,6 +740,10 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
return targetDeleteFileSize;
}
+ DeleteGranularity deleteGranularity() {
+ return deleteGranularity;
+ }
+
String queryId() {
return queryId;
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index abf40ebd95..7c3ecac676 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -45,6 +45,7 @@ import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.DEL
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE;
import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.List;
import java.util.Map;
@@ -52,6 +53,7 @@ import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.internal.SQLConf;
@@ -76,6 +78,61 @@ public class TestSparkWriteConf extends
SparkTestBaseWithCatalog {
sql("DROP TABLE IF EXISTS %s", tableName);
}
+ @Test
+ public void testDeleteGranularityDefault() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+
+ DeleteGranularity value = writeConf.deleteGranularity();
+ assertThat(value).isEqualTo(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testDeleteGranularityTableProperty() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table
+ .updateProperties()
+ .set(TableProperties.DELETE_GRANULARITY,
DeleteGranularity.FILE.toString())
+ .commit();
+
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+
+ DeleteGranularity value = writeConf.deleteGranularity();
+ assertThat(value).isEqualTo(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testDeleteGranularityWriteOption() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table
+ .updateProperties()
+ .set(TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString())
+ .commit();
+
+ Map<String, String> options =
+ ImmutableMap.of(SparkWriteOptions.DELETE_GRANULARITY,
DeleteGranularity.FILE.toString());
+
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table, options);
+
+ DeleteGranularity value = writeConf.deleteGranularity();
+ assertThat(value).isEqualTo(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testDeleteGranularityInvalidValue() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateProperties().set(TableProperties.DELETE_GRANULARITY,
"invalid").commit();
+
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+
+ assertThatThrownBy(writeConf::deleteGranularity)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unknown delete granularity");
+ }
+
@Test
public void testAdvisoryPartitionSize() {
Table table = validationCatalog.loadTable(tableIdent);
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index d1e33950eb..7c55ff82df 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -57,6 +57,7 @@ import
org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
import org.apache.iceberg.actions.SizeBasedFileRewriter;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -131,6 +132,42 @@ public class TestRewritePositionDeleteFilesAction extends
CatalogTestBase {
assertThat(result.addedDeleteFilesCount()).as("No added delete
files").isZero();
}
+ @TestTemplate
+ public void testFileGranularity() throws Exception {
+ checkDeleteGranularity(DeleteGranularity.FILE);
+ }
+
+ @TestTemplate
+ public void testPartitionGranularity() throws Exception {
+ checkDeleteGranularity(DeleteGranularity.PARTITION);
+ }
+
+ private void checkDeleteGranularity(DeleteGranularity deleteGranularity)
throws Exception {
+ Table table = createTableUnpartitioned(2, SCALE);
+
+ table
+ .updateProperties()
+ .set(TableProperties.DELETE_GRANULARITY, deleteGranularity.toString())
+ .commit();
+
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ assertThat(dataFiles).hasSize(2);
+
+ writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
+
+ List<DeleteFile> deleteFiles = deleteFiles(table);
+ assertThat(deleteFiles).hasSize(2);
+
+ Result result =
+ SparkActions.get(spark)
+ .rewritePositionDeletes(table)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .execute();
+
+ int expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE
? 2 : 1;
+
assertThat(result.addedDeleteFilesCount()).isEqualTo(expectedDeleteFilesCount);
+ }
+
@TestTemplate
public void testUnpartitioned() throws Exception {
Table table = createTableUnpartitioned(2, SCALE);