This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 74a7d959a5 Core: Don't persist useless file and position bounds for
deletes (#8360)
74a7d959a5 is described below
commit 74a7d959a504ce5b7ae20baf95d99dfd3b9223ce
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Aug 21 17:51:25 2023 -0700
Core: Don't persist useless file and position bounds for deletes (#8360)
---
.../main/java/org/apache/iceberg/MetricsUtil.java | 31 ++++++++++++
.../iceberg/deletes/PositionDeleteWriter.java | 21 +++++++-
.../apache/iceberg/io/TestFileWriterFactory.java | 58 ++++++++++++++++++++++
.../org/apache/iceberg/io/TestWriterMetrics.java | 38 ++++++++++++++
4 files changed, 147 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java
b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
index b631af0fc5..2cd001b5c4 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
@@ -40,6 +40,37 @@ public class MetricsUtil {
private MetricsUtil() {}
+ /**
+ * Copies a metrics object without lower and upper bounds for given fields.
+ *
+ * @param excludedFieldIds field IDs for which the lower and upper bounds
must be dropped
+ * @return a new metrics object without lower and upper bounds for given
fields
+ */
+ public static Metrics copyWithoutFieldBounds(Metrics metrics, Set<Integer>
excludedFieldIds) {
+ return new Metrics(
+ metrics.recordCount(),
+ metrics.columnSizes(),
+ metrics.valueCounts(),
+ metrics.nullValueCounts(),
+ metrics.nanValueCounts(),
+ copyWithoutKeys(metrics.lowerBounds(), excludedFieldIds),
+ copyWithoutKeys(metrics.upperBounds(), excludedFieldIds));
+ }
+
+ private static <K, V> Map<K, V> copyWithoutKeys(Map<K, V> map, Set<K> keys) {
+ if (map == null) {
+ return null;
+ }
+
+ Map<K, V> filteredMap = Maps.newHashMap(map);
+
+ for (K key : keys) {
+ filteredMap.remove(key);
+ }
+
+ return filteredMap.isEmpty() ? null : filteredMap;
+ }
+
/**
* Construct mapping relationship between column id to NaN value counts from
input metrics and
* metrics config.
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
index d3e01bcd04..4f799b4349 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
@@ -18,11 +18,17 @@
*/
package org.apache.iceberg.deletes;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
+
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Set;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
@@ -30,6 +36,7 @@ import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.util.CharSequenceSet;
/**
@@ -40,6 +47,9 @@ import org.apache.iceberg.util.CharSequenceSet;
* records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
*/
public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>,
DeleteWriteResult> {
+ private static final Set<Integer> SINGLE_REFERENCED_FILE_BOUNDS_ONLY =
+ ImmutableSet.of(DELETE_FILE_PATH.fieldId(), DELETE_FILE_POS.fieldId());
+
private final FileAppender<StructLike> appender;
private final FileFormat format;
private final String location;
@@ -89,7 +99,7 @@ public class PositionDeleteWriter<T> implements
FileWriter<PositionDelete<T>, De
.withEncryptionKeyMetadata(keyMetadata)
.withSplitOffsets(appender.splitOffsets())
.withFileSizeInBytes(appender.length())
- .withMetrics(appender.metrics())
+ .withMetrics(metrics())
.build();
}
}
@@ -107,4 +117,13 @@ public class PositionDeleteWriter<T> implements
FileWriter<PositionDelete<T>, De
public DeleteWriteResult result() {
return new DeleteWriteResult(toDeleteFile(), referencedDataFiles());
}
+
+ private Metrics metrics() {
+ Metrics metrics = appender.metrics();
+ if (referencedDataFiles.size() > 1) {
+ return MetricsUtil.copyWithoutFieldBounds(metrics,
SINGLE_REFERENCED_FILE_BOUNDS_ONLY);
+ } else {
+ return metrics;
+ }
+ }
}
diff --git
a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
index eff918b145..7910c666b4 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
@@ -229,6 +229,17 @@ public abstract class TestFileWriterFactory<T> extends
WriterTestBase<T> {
DeleteFile deleteFile = result.first();
CharSequenceSet referencedDataFiles = result.second();
+ if (fileFormat == FileFormat.AVRO) {
+ Assert.assertNull(deleteFile.lowerBounds());
+ Assert.assertNull(deleteFile.upperBounds());
+ } else {
+ Assert.assertEquals(1, referencedDataFiles.size());
+ Assert.assertEquals(2, deleteFile.lowerBounds().size());
+
Assert.assertTrue(deleteFile.lowerBounds().containsKey(DELETE_FILE_PATH.fieldId()));
+ Assert.assertEquals(2, deleteFile.upperBounds().size());
+
Assert.assertTrue(deleteFile.upperBounds().containsKey(DELETE_FILE_PATH.fieldId()));
+ }
+
// verify the written delete file
GenericRecord deleteRecord =
GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
List<Record> expectedDeletes =
@@ -302,6 +313,53 @@ public abstract class TestFileWriterFactory<T> extends
WriterTestBase<T> {
Assert.assertEquals("Records should match", toSet(expectedRows),
actualRowSet("*"));
}
+ @Test
+ public void testPositionDeleteWriterMultipleDataFiles() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // write two data files
+ DataFile dataFile1 = writeData(writerFactory, dataRows, table.spec(),
partition);
+ DataFile dataFile2 = writeData(writerFactory, dataRows, table.spec(),
partition);
+
+ // write a position delete file referencing both
+ List<PositionDelete<T>> deletes =
+ ImmutableList.of(
+ positionDelete(dataFile1.path(), 0L, null),
+ positionDelete(dataFile1.path(), 2L, null),
+ positionDelete(dataFile2.path(), 4L, null));
+ Pair<DeleteFile, CharSequenceSet> result =
+ writePositionDeletes(writerFactory, deletes, table.spec(), partition);
+ DeleteFile deleteFile = result.first();
+ CharSequenceSet referencedDataFiles = result.second();
+
+ // verify the written delete file has NO lower and upper bounds
+ Assert.assertEquals(2, referencedDataFiles.size());
+ Assert.assertNull(deleteFile.lowerBounds());
+ Assert.assertNull(deleteFile.upperBounds());
+
+ // commit the data and delete files
+ table
+ .newRowDelta()
+ .addRows(dataFile1)
+ .addRows(dataFile2)
+ .addDeletes(deleteFile)
+ .validateDataFilesExist(referencedDataFiles)
+ .validateDeletedFiles()
+ .commit();
+
+ // verify the delete file is applied correctly
+ List<T> expectedRows =
+ ImmutableList.of(
+ toRow(2, "aaa"),
+ toRow(4, "aaa"),
+ toRow(5, "aaa"),
+ toRow(1, "aaa"),
+ toRow(2, "aaa"),
+ toRow(3, "aaa"),
+ toRow(4, "aaa"));
+ Assert.assertEquals("Records should match", toSet(expectedRows),
actualRowSet("*"));
+ }
+
private DataFile writeData(
FileWriterFactory<T> writerFactory, List<T> rows, PartitionSpec spec,
StructLike partitionKey)
throws IOException {
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
index 21698cf376..d1a7820570 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
@@ -203,6 +203,44 @@ public abstract class TestWriterMetrics<T> {
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(),
upperBounds.get(5)));
}
+ @Test
+ public void testPositionDeleteMetricsCoveringMultipleDataFiles() throws
IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table);
+ EncryptedOutputFile outputFile = fileFactory.newOutputFile();
+ PositionDeleteWriter<T> deleteWriter =
+ writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);
+
+ try {
+ PositionDelete<T> positionDelete = PositionDelete.create();
+
+ positionDelete.set("File A", 1, toRow(3, "3", true, 3L));
+ deleteWriter.write(positionDelete);
+
+ positionDelete.set("File B", 1, toRow(3, "3", true, 3L));
+ deleteWriter.write(positionDelete);
+
+ } finally {
+ deleteWriter.close();
+ }
+
+ DeleteFile deleteFile = deleteWriter.toDeleteFile();
+
+ // should have NO bounds for path and position as the file covers multiple
data paths
+ Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
+ Assert.assertEquals(2, lowerBounds.size());
+ Assert.assertEquals(
+ 3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(),
lowerBounds.get(1)));
+ Assert.assertEquals(
+ 3L, (long) Conversions.fromByteBuffer(Types.LongType.get(),
lowerBounds.get(5)));
+
+ Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
+ Assert.assertEquals(2, upperBounds.size());
+ Assert.assertEquals(
+ 3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(),
upperBounds.get(1)));
+ Assert.assertEquals(
+ 3L, (long) Conversions.fromByteBuffer(Types.LongType.get(),
upperBounds.get(5)));
+ }
+
@Test
public void testMaxColumns() throws IOException {
File tableDir = temp.newFolder();