This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 74a7d959a5 Core: Don't persist useless file and position bounds for 
deletes (#8360)
74a7d959a5 is described below

commit 74a7d959a504ce5b7ae20baf95d99dfd3b9223ce
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Aug 21 17:51:25 2023 -0700

    Core: Don't persist useless file and position bounds for deletes (#8360)
---
 .../main/java/org/apache/iceberg/MetricsUtil.java  | 31 ++++++++++++
 .../iceberg/deletes/PositionDeleteWriter.java      | 21 +++++++-
 .../apache/iceberg/io/TestFileWriterFactory.java   | 58 ++++++++++++++++++++++
 .../org/apache/iceberg/io/TestWriterMetrics.java   | 38 ++++++++++++++
 4 files changed, 147 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java 
b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
index b631af0fc5..2cd001b5c4 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
@@ -40,6 +40,37 @@ public class MetricsUtil {
 
   private MetricsUtil() {}
 
+  /**
+   * Copies a metrics object without lower and upper bounds for given fields.
+   *
+   * @param excludedFieldIds field IDs for which the lower and upper bounds 
must be dropped
+   * @return a new metrics object without lower and upper bounds for given 
fields
+   */
+  public static Metrics copyWithoutFieldBounds(Metrics metrics, Set<Integer> 
excludedFieldIds) {
+    return new Metrics(
+        metrics.recordCount(),
+        metrics.columnSizes(),
+        metrics.valueCounts(),
+        metrics.nullValueCounts(),
+        metrics.nanValueCounts(),
+        copyWithoutKeys(metrics.lowerBounds(), excludedFieldIds),
+        copyWithoutKeys(metrics.upperBounds(), excludedFieldIds));
+  }
+
+  private static <K, V> Map<K, V> copyWithoutKeys(Map<K, V> map, Set<K> keys) {
+    if (map == null) {
+      return null;
+    }
+
+    Map<K, V> filteredMap = Maps.newHashMap(map);
+
+    for (K key : keys) {
+      filteredMap.remove(key);
+    }
+
+    return filteredMap.isEmpty() ? null : filteredMap;
+  }
+
   /**
    * Construct mapping relationship between column id to NaN value counts from 
input metrics and
    * metrics config.
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java 
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
index d3e01bcd04..4f799b4349 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
@@ -18,11 +18,17 @@
  */
 package org.apache.iceberg.deletes;
 
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Set;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsUtil;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.encryption.EncryptionKeyMetadata;
@@ -30,6 +36,7 @@ import org.apache.iceberg.io.DeleteWriteResult;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.FileWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.util.CharSequenceSet;
 
 /**
@@ -40,6 +47,9 @@ import org.apache.iceberg.util.CharSequenceSet;
  * records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
  */
 public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, 
DeleteWriteResult> {
+  private static final Set<Integer> SINGLE_REFERENCED_FILE_BOUNDS_ONLY =
+      ImmutableSet.of(DELETE_FILE_PATH.fieldId(), DELETE_FILE_POS.fieldId());
+
   private final FileAppender<StructLike> appender;
   private final FileFormat format;
   private final String location;
@@ -89,7 +99,7 @@ public class PositionDeleteWriter<T> implements 
FileWriter<PositionDelete<T>, De
               .withEncryptionKeyMetadata(keyMetadata)
               .withSplitOffsets(appender.splitOffsets())
               .withFileSizeInBytes(appender.length())
-              .withMetrics(appender.metrics())
+              .withMetrics(metrics())
               .build();
     }
   }
@@ -107,4 +117,13 @@ public class PositionDeleteWriter<T> implements 
FileWriter<PositionDelete<T>, De
   public DeleteWriteResult result() {
     return new DeleteWriteResult(toDeleteFile(), referencedDataFiles());
   }
+
+  private Metrics metrics() {
+    Metrics metrics = appender.metrics();
+    if (referencedDataFiles.size() > 1) {
+      return MetricsUtil.copyWithoutFieldBounds(metrics, 
SINGLE_REFERENCED_FILE_BOUNDS_ONLY);
+    } else {
+      return metrics;
+    }
+  }
 }
diff --git 
a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java 
b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
index eff918b145..7910c666b4 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
@@ -229,6 +229,17 @@ public abstract class TestFileWriterFactory<T> extends 
WriterTestBase<T> {
     DeleteFile deleteFile = result.first();
     CharSequenceSet referencedDataFiles = result.second();
 
+    if (fileFormat == FileFormat.AVRO) {
+      Assert.assertNull(deleteFile.lowerBounds());
+      Assert.assertNull(deleteFile.upperBounds());
+    } else {
+      Assert.assertEquals(1, referencedDataFiles.size());
+      Assert.assertEquals(2, deleteFile.lowerBounds().size());
+      
Assert.assertTrue(deleteFile.lowerBounds().containsKey(DELETE_FILE_PATH.fieldId()));
+      Assert.assertEquals(2, deleteFile.upperBounds().size());
+      
Assert.assertTrue(deleteFile.upperBounds().containsKey(DELETE_FILE_PATH.fieldId()));
+    }
+
     // verify the written delete file
     GenericRecord deleteRecord = 
GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
     List<Record> expectedDeletes =
@@ -302,6 +313,53 @@ public abstract class TestFileWriterFactory<T> extends 
WriterTestBase<T> {
     Assert.assertEquals("Records should match", toSet(expectedRows), 
actualRowSet("*"));
   }
 
+  @Test
+  public void testPositionDeleteWriterMultipleDataFiles() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // write two data files
+    DataFile dataFile1 = writeData(writerFactory, dataRows, table.spec(), 
partition);
+    DataFile dataFile2 = writeData(writerFactory, dataRows, table.spec(), 
partition);
+
+    // write a position delete file referencing both
+    List<PositionDelete<T>> deletes =
+        ImmutableList.of(
+            positionDelete(dataFile1.path(), 0L, null),
+            positionDelete(dataFile1.path(), 2L, null),
+            positionDelete(dataFile2.path(), 4L, null));
+    Pair<DeleteFile, CharSequenceSet> result =
+        writePositionDeletes(writerFactory, deletes, table.spec(), partition);
+    DeleteFile deleteFile = result.first();
+    CharSequenceSet referencedDataFiles = result.second();
+
+    // verify the written delete file has NO lower and upper bounds
+    Assert.assertEquals(2, referencedDataFiles.size());
+    Assert.assertNull(deleteFile.lowerBounds());
+    Assert.assertNull(deleteFile.upperBounds());
+
+    // commit the data and delete files
+    table
+        .newRowDelta()
+        .addRows(dataFile1)
+        .addRows(dataFile2)
+        .addDeletes(deleteFile)
+        .validateDataFilesExist(referencedDataFiles)
+        .validateDeletedFiles()
+        .commit();
+
+    // verify the delete file is applied correctly
+    List<T> expectedRows =
+        ImmutableList.of(
+            toRow(2, "aaa"),
+            toRow(4, "aaa"),
+            toRow(5, "aaa"),
+            toRow(1, "aaa"),
+            toRow(2, "aaa"),
+            toRow(3, "aaa"),
+            toRow(4, "aaa"));
+    Assert.assertEquals("Records should match", toSet(expectedRows), 
actualRowSet("*"));
+  }
+
   private DataFile writeData(
       FileWriterFactory<T> writerFactory, List<T> rows, PartitionSpec spec, 
StructLike partitionKey)
       throws IOException {
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java 
b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
index 21698cf376..d1a7820570 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
@@ -203,6 +203,44 @@ public abstract class TestWriterMetrics<T> {
         3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), 
upperBounds.get(5)));
   }
 
+  @Test
+  public void testPositionDeleteMetricsCoveringMultipleDataFiles() throws 
IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table);
+    EncryptedOutputFile outputFile = fileFactory.newOutputFile();
+    PositionDeleteWriter<T> deleteWriter =
+        writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);
+
+    try {
+      PositionDelete<T> positionDelete = PositionDelete.create();
+
+      positionDelete.set("File A", 1, toRow(3, "3", true, 3L));
+      deleteWriter.write(positionDelete);
+
+      positionDelete.set("File B", 1, toRow(3, "3", true, 3L));
+      deleteWriter.write(positionDelete);
+
+    } finally {
+      deleteWriter.close();
+    }
+
+    DeleteFile deleteFile = deleteWriter.toDeleteFile();
+
+    // should have NO bounds for path and position as the file covers multiple 
data paths
+    Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
+    Assert.assertEquals(2, lowerBounds.size());
+    Assert.assertEquals(
+        3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), 
lowerBounds.get(1)));
+    Assert.assertEquals(
+        3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), 
lowerBounds.get(5)));
+
+    Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
+    Assert.assertEquals(2, upperBounds.size());
+    Assert.assertEquals(
+        3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), 
upperBounds.get(1)));
+    Assert.assertEquals(
+        3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), 
upperBounds.get(5)));
+  }
+
   @Test
   public void testMaxColumns() throws IOException {
     File tableDir = temp.newFolder();

Reply via email to