This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d61159e322 Core: Optimize lookup in DeleteFileIndex without useful 
bounds (#8278)
d61159e322 is described below

commit d61159e32213ffb767aa43e9e9da955d2facded7
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Aug 23 23:27:56 2023 -0700

    Core: Optimize lookup in DeleteFileIndex without useful bounds (#8278)
---
 .../java/org/apache/iceberg/DeleteFileIndex.java   |  68 ++++-
 .../apache/iceberg/DeleteFileIndexBenchmark.java   | 304 +++++++++++++++++++++
 2 files changed, 367 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java 
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index bf7bd89179..6f16794d5b 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -50,6 +50,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Conversions;
@@ -75,6 +76,7 @@ class DeleteFileIndex {
   private final DeleteFileGroup globalDeletes;
   private final Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> 
deletesByPartition;
   private final boolean isEmpty;
+  private final boolean useColumnStatsFiltering;
 
   /** @deprecated since 1.4.0, will be removed in 1.5.0. */
   @Deprecated
@@ -83,13 +85,14 @@ class DeleteFileIndex {
       long[] globalSeqs,
       DeleteFile[] globalDeletes,
       Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> 
deletesByPartition) {
-    this(specs, index(specs, globalSeqs, globalDeletes), index(specs, 
deletesByPartition));
+    this(specs, index(specs, globalSeqs, globalDeletes), index(specs, 
deletesByPartition), true);
   }
 
   private DeleteFileIndex(
       Map<Integer, PartitionSpec> specs,
       DeleteFileGroup globalDeletes,
-      Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> 
deletesByPartition) {
+      Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> 
deletesByPartition,
+      boolean useColumnStatsFiltering) {
     ImmutableMap.Builder<Integer, Types.StructType> builder = 
ImmutableMap.builder();
     specs.forEach((specId, spec) -> builder.put(specId, spec.partitionType()));
     this.partitionTypeById = builder.build();
@@ -97,6 +100,7 @@ class DeleteFileIndex {
     this.globalDeletes = globalDeletes;
     this.deletesByPartition = deletesByPartition;
     this.isEmpty = globalDeletes == null && deletesByPartition.isEmpty();
+    this.useColumnStatsFiltering = useColumnStatsFiltering;
   }
 
   public boolean isEmpty() {
@@ -148,7 +152,16 @@ class DeleteFileIndex {
 
     if (globalDeletes == null && partitionDeletes == null) {
       return NO_DELETES;
+    } else if (useColumnStatsFiltering) {
+      return limitWithColumnStatsFiltering(sequenceNumber, file, 
partitionDeletes);
+    } else {
+      return limitWithoutColumnStatsFiltering(sequenceNumber, 
partitionDeletes);
     }
+  }
+
+  // limits deletes using sequence numbers and checks whether columns stats 
overlap
+  private DeleteFile[] limitWithColumnStatsFiltering(
+      long sequenceNumber, DataFile file, DeleteFileGroup partitionDeletes) {
 
     Stream<IndexedDeleteFile> matchingDeletes;
     if (partitionDeletes == null) {
@@ -167,6 +180,21 @@ class DeleteFileIndex {
         .toArray(DeleteFile[]::new);
   }
 
+  // limits deletes using sequence numbers but skips expensive column stats 
filtering
+  private DeleteFile[] limitWithoutColumnStatsFiltering(
+      long sequenceNumber, DeleteFileGroup partitionDeletes) {
+
+    if (partitionDeletes == null) {
+      return globalDeletes.filter(sequenceNumber);
+    } else if (globalDeletes == null) {
+      return partitionDeletes.filter(sequenceNumber);
+    } else {
+      DeleteFile[] matchingGlobalDeletes = 
globalDeletes.filter(sequenceNumber);
+      DeleteFile[] matchingPartitionDeletes = 
partitionDeletes.filter(sequenceNumber);
+      return ObjectArrays.concat(matchingGlobalDeletes, 
matchingPartitionDeletes, DeleteFile.class);
+    }
+  }
+
   private static boolean canContainDeletesForFile(DataFile dataFile, 
IndexedDeleteFile deleteFile) {
     switch (deleteFile.content()) {
       case POSITION_DELETES:
@@ -483,6 +511,8 @@ class DeleteFileIndex {
     DeleteFileIndex build() {
       Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() : 
loadDeleteFiles();
 
+      boolean useColumnStatsFiltering = false;
+
       // build a map from (specId, partition) to delete file entries
       Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
       ListMultimap<Pair<Integer, StructLikeWrapper>, IndexedDeleteFile> 
deleteFilesByPartition =
@@ -494,7 +524,13 @@ class DeleteFileIndex {
             wrappersBySpecId
                 .computeIfAbsent(specId, id -> 
StructLikeWrapper.forType(spec.partitionType()))
                 .copyFor(file.partition());
-        deleteFilesByPartition.put(Pair.of(specId, wrapper), new 
IndexedDeleteFile(spec, file));
+        IndexedDeleteFile indexedFile = new IndexedDeleteFile(spec, file);
+        deleteFilesByPartition.put(Pair.of(specId, wrapper), indexedFile);
+
+        if (!useColumnStatsFiltering) {
+          useColumnStatsFiltering = indexedFile.hasLowerAndUpperBounds();
+        }
+
         ScanMetricsUtil.indexedDeleteFile(scanMetrics, file);
       }
 
@@ -535,7 +571,8 @@ class DeleteFileIndex {
         }
       }
 
-      return new DeleteFileIndex(specsById, globalDeletes, 
sortedDeletesByPartition);
+      return new DeleteFileIndex(
+          specsById, globalDeletes, sortedDeletesByPartition, 
useColumnStatsFiltering);
     }
 
     private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> 
deleteManifestReaders() {
@@ -597,7 +634,28 @@ class DeleteFileIndex {
       this.files = files;
     }
 
+    public DeleteFile[] filter(long seq) {
+      int start = findStartIndex(seq);
+
+      if (start >= files.length) {
+        return NO_DELETES;
+      }
+
+      DeleteFile[] matchingFiles = new DeleteFile[files.length - start];
+
+      for (int index = start; index < files.length; index++) {
+        matchingFiles[index - start] = files[index].wrapped();
+      }
+
+      return matchingFiles;
+    }
+
     public Stream<IndexedDeleteFile> limit(long seq) {
+      int start = findStartIndex(seq);
+      return Arrays.stream(files, start, files.length);
+    }
+
+    private int findStartIndex(long seq) {
       int pos = Arrays.binarySearch(seqs, seq);
       int start;
       if (pos < 0) {
@@ -612,7 +670,7 @@ class DeleteFileIndex {
         }
       }
 
-      return Arrays.stream(files, start, files.length);
+      return start;
     }
 
     public Iterable<DeleteFile> referencedDeleteFiles() {
diff --git 
a/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
 
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
new file mode 100644
index 0000000000..f73794acaf
--- /dev/null
+++ 
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.spark.sql.functions.lit;
+
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * A benchmark that evaluates the delete file index build and lookup 
performance.
+ *
+ * <p>To run this benchmark for spark-3.4: <code>
+ *   ./gradlew -DsparkVersions=3.4 
:iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=DeleteFileIndexBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-delete-file-index-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@Timeout(time = 20, timeUnit = TimeUnit.MINUTES)
+@BenchmarkMode(Mode.SingleShotTime)
+public class DeleteFileIndexBenchmark {
+
+  private static final String TABLE_NAME = "test_table";
+  private static final String PARTITION_COLUMN = "ss_ticket_number";
+
+  private static final int NUM_PARTITIONS = 50;
+  private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25;
+  private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000;
+  private static final int NUM_DELETE_FILES_PER_PARTITION = 100;
+  private static final int NUM_ROWS_PER_DATA_FILE = 500;
+
+  private final Configuration hadoopConf = new Configuration();
+  private SparkSession spark;
+  private Table table;
+
+  private List<DataFile> dataFiles;
+
+  @Setup
+  public void setupBenchmark() throws NoSuchTableException, ParseException {
+    setupSpark();
+    initTable();
+    initDataAndDeletes();
+    loadDataFiles();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    dropTable();
+    tearDownSpark();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void buildIndexAndLookup(Blackhole blackhole) {
+    DeleteFileIndex deletes = buildDeletes();
+    for (DataFile dataFile : dataFiles) {
+      DeleteFile[] deleteFiles = 
deletes.forDataFile(dataFile.dataSequenceNumber(), dataFile);
+      blackhole.consume(deleteFiles);
+    }
+  }
+
+  private void loadDataFiles() {
+    table.refresh();
+
+    Snapshot snapshot = table.currentSnapshot();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table.io(), snapshot.dataManifests(table.io()), 
ImmutableList.of());
+
+    try (CloseableIterable<ManifestEntry<DataFile>> entries = 
manifestGroup.entries()) {
+      List<DataFile> files = Lists.newArrayList();
+      for (ManifestEntry<DataFile> entry : entries) {
+        files.add(entry.file().copyWithoutStats());
+      }
+      this.dataFiles = files;
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private DeleteFileIndex buildDeletes() {
+    table.refresh();
+
+    List<ManifestFile> deleteManifests = 
table.currentSnapshot().deleteManifests(table.io());
+
+    return DeleteFileIndex.builderFor(table.io(), deleteManifests)
+        .specsById(table.specs())
+        .planWith(ThreadPools.getWorkerPool())
+        .build();
+  }
+
+  private DataFile loadAddedDataFile() {
+    table.refresh();
+
+    Iterable<DataFile> addedDataFiles = 
table.currentSnapshot().addedDataFiles(table.io());
+    return Iterables.getOnlyElement(addedDataFiles);
+  }
+
+  private DeleteFile loadAddedDeleteFile() {
+    table.refresh();
+
+    Iterable<DeleteFile> addedDeleteFiles = 
table.currentSnapshot().addedDeleteFiles(table.io());
+    return Iterables.getOnlyElement(addedDeleteFiles);
+  }
+
+  private void initDataAndDeletes() throws NoSuchTableException {
+    Schema schema = table.schema();
+    PartitionSpec spec = table.spec();
+    LocationProvider locations = table.locationProvider();
+
+    for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; 
partitionOrdinal++) {
+      Dataset<Row> inputDF =
+          randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
+              .drop(PARTITION_COLUMN)
+              .withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
+
+      for (int fileOrdinal = 0; fileOrdinal < 
NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) {
+        appendAsFile(inputDF);
+      }
+
+      DataFile dataFile = loadAddedDataFile();
+
+      sql(
+          "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d",
+          TABLE_NAME, PARTITION_COLUMN, partitionOrdinal);
+
+      DeleteFile deleteFile = loadAddedDeleteFile();
+
+      AppendFiles append = table.newFastAppend();
+
+      for (int fileOrdinal = 0; fileOrdinal < 
NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) {
+        String replicaFileName = UUID.randomUUID() + "-replica.parquet";
+        DataFile replicaDataFile =
+            DataFiles.builder(spec)
+                .copy(dataFile)
+                .withPath(locations.newDataLocation(spec, 
dataFile.partition(), replicaFileName))
+                .build();
+        append.appendFile(replicaDataFile);
+      }
+
+      append.commit();
+
+      RowDelta rowDelta = table.newRowDelta();
+
+      for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; 
fileOrdinal++) {
+        String replicaFileName = UUID.randomUUID() + "-replica.parquet";
+        DeleteFile replicaDeleteFile =
+            FileMetadata.deleteFileBuilder(spec)
+                .copy(deleteFile)
+                .withPath(locations.newDataLocation(spec, 
deleteFile.partition(), replicaFileName))
+                .build();
+        rowDelta.addDeletes(replicaDeleteFile);
+      }
+
+      rowDelta.commit();
+    }
+  }
+
+  private void appendAsFile(Dataset<Row> df) throws NoSuchTableException {
+    df.coalesce(1).writeTo(TABLE_NAME).append();
+  }
+
+  private Dataset<Row> randomDataDF(Schema schema, int numRows) {
+    Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
+    JavaSparkContext context = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+    JavaRDD<InternalRow> rowRDD = 
context.parallelize(Lists.newArrayList(rows));
+    StructType rowSparkType = SparkSchemaUtil.convert(schema);
+    return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, 
false);
+  }
+
+  private void setupSpark() {
+    this.spark =
+        SparkSession.builder()
+            .config("spark.ui.enabled", false)
+            .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+            .config("spark.sql.extensions", 
IcebergSparkSessionExtensions.class.getName())
+            .config("spark.sql.catalog.spark_catalog", 
SparkSessionCatalog.class.getName())
+            .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+            .config("spark.sql.catalog.spark_catalog.warehouse", 
newWarehouseDir())
+            .master("local[*]")
+            .getOrCreate();
+  }
+
+  private void tearDownSpark() {
+    spark.stop();
+  }
+
+  private void initTable() throws NoSuchTableException, ParseException {
+    sql(
+        "CREATE TABLE %s ( "
+            + " `ss_sold_date_sk` INT, "
+            + " `ss_sold_time_sk` INT, "
+            + " `ss_item_sk` INT, "
+            + " `ss_customer_sk` STRING, "
+            + " `ss_cdemo_sk` STRING, "
+            + " `ss_hdemo_sk` STRING, "
+            + " `ss_addr_sk` STRING, "
+            + " `ss_store_sk` STRING, "
+            + " `ss_promo_sk` STRING, "
+            + " `ss_ticket_number` INT, "
+            + " `ss_quantity` STRING, "
+            + " `ss_wholesale_cost` STRING, "
+            + " `ss_list_price` STRING, "
+            + " `ss_sales_price` STRING, "
+            + " `ss_ext_discount_amt` STRING, "
+            + " `ss_ext_sales_price` STRING, "
+            + " `ss_ext_wholesale_cost` STRING, "
+            + " `ss_ext_list_price` STRING, "
+            + " `ss_ext_tax` STRING, "
+            + " `ss_coupon_amt` STRING, "
+            + " `ss_net_paid` STRING, "
+            + " `ss_net_paid_inc_tax` STRING, "
+            + " `ss_net_profit` STRING "
+            + ")"
+            + "USING iceberg "
+            + "PARTITIONED BY (%s) "
+            + "TBLPROPERTIES ("
+            + " '%s' '%b',"
+            + " '%s' '%s',"
+            + " '%s' '%d')",
+        TABLE_NAME,
+        PARTITION_COLUMN,
+        TableProperties.MANIFEST_MERGE_ENABLED,
+        false,
+        TableProperties.DELETE_MODE,
+        RowLevelOperationMode.MERGE_ON_READ.modeName(),
+        TableProperties.FORMAT_VERSION,
+        2);
+
+    this.table = Spark3Util.loadIcebergTable(spark, TABLE_NAME);
+  }
+
+  private void dropTable() {
+    sql("DROP TABLE IF EXISTS %s PURGE", TABLE_NAME);
+  }
+
+  private String newWarehouseDir() {
+    return hadoopConf.get("hadoop.tmp.dir") + UUID.randomUUID();
+  }
+
+  @FormatMethod
+  private void sql(@FormatString String query, Object... args) {
+    spark.sql(String.format(query, args));
+  }
+}

Reply via email to