This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d61159e322 Core: Optimize lookup in DeleteFileIndex without useful
bounds (#8278)
d61159e322 is described below
commit d61159e32213ffb767aa43e9e9da955d2facded7
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Aug 23 23:27:56 2023 -0700
Core: Optimize lookup in DeleteFileIndex without useful bounds (#8278)
---
.../java/org/apache/iceberg/DeleteFileIndex.java | 68 ++++-
.../apache/iceberg/DeleteFileIndexBenchmark.java | 304 +++++++++++++++++++++
2 files changed, 367 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index bf7bd89179..6f16794d5b 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -50,6 +50,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
@@ -75,6 +76,7 @@ class DeleteFileIndex {
private final DeleteFileGroup globalDeletes;
private final Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup>
deletesByPartition;
private final boolean isEmpty;
+ private final boolean useColumnStatsFiltering;
/** @deprecated since 1.4.0, will be removed in 1.5.0. */
@Deprecated
@@ -83,13 +85,14 @@ class DeleteFileIndex {
long[] globalSeqs,
DeleteFile[] globalDeletes,
Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>>
deletesByPartition) {
- this(specs, index(specs, globalSeqs, globalDeletes), index(specs,
deletesByPartition));
+ this(specs, index(specs, globalSeqs, globalDeletes), index(specs,
deletesByPartition), true);
}
private DeleteFileIndex(
Map<Integer, PartitionSpec> specs,
DeleteFileGroup globalDeletes,
- Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup>
deletesByPartition) {
+ Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup>
deletesByPartition,
+ boolean useColumnStatsFiltering) {
ImmutableMap.Builder<Integer, Types.StructType> builder =
ImmutableMap.builder();
specs.forEach((specId, spec) -> builder.put(specId, spec.partitionType()));
this.partitionTypeById = builder.build();
@@ -97,6 +100,7 @@ class DeleteFileIndex {
this.globalDeletes = globalDeletes;
this.deletesByPartition = deletesByPartition;
this.isEmpty = globalDeletes == null && deletesByPartition.isEmpty();
+ this.useColumnStatsFiltering = useColumnStatsFiltering;
}
public boolean isEmpty() {
@@ -148,7 +152,16 @@ class DeleteFileIndex {
if (globalDeletes == null && partitionDeletes == null) {
return NO_DELETES;
+ } else if (useColumnStatsFiltering) {
+ return limitWithColumnStatsFiltering(sequenceNumber, file,
partitionDeletes);
+ } else {
+ return limitWithoutColumnStatsFiltering(sequenceNumber,
partitionDeletes);
}
+ }
+
+ // limits deletes using sequence numbers and checks whether columns stats
overlap
+ private DeleteFile[] limitWithColumnStatsFiltering(
+ long sequenceNumber, DataFile file, DeleteFileGroup partitionDeletes) {
Stream<IndexedDeleteFile> matchingDeletes;
if (partitionDeletes == null) {
@@ -167,6 +180,21 @@ class DeleteFileIndex {
.toArray(DeleteFile[]::new);
}
+ // limits deletes using sequence numbers but skips expensive column stats
filtering
+ private DeleteFile[] limitWithoutColumnStatsFiltering(
+ long sequenceNumber, DeleteFileGroup partitionDeletes) {
+
+ if (partitionDeletes == null) {
+ return globalDeletes.filter(sequenceNumber);
+ } else if (globalDeletes == null) {
+ return partitionDeletes.filter(sequenceNumber);
+ } else {
+ DeleteFile[] matchingGlobalDeletes =
globalDeletes.filter(sequenceNumber);
+ DeleteFile[] matchingPartitionDeletes =
partitionDeletes.filter(sequenceNumber);
+ return ObjectArrays.concat(matchingGlobalDeletes,
matchingPartitionDeletes, DeleteFile.class);
+ }
+ }
+
private static boolean canContainDeletesForFile(DataFile dataFile,
IndexedDeleteFile deleteFile) {
switch (deleteFile.content()) {
case POSITION_DELETES:
@@ -483,6 +511,8 @@ class DeleteFileIndex {
DeleteFileIndex build() {
Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() :
loadDeleteFiles();
+ boolean useColumnStatsFiltering = false;
+
// build a map from (specId, partition) to delete file entries
Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
ListMultimap<Pair<Integer, StructLikeWrapper>, IndexedDeleteFile>
deleteFilesByPartition =
@@ -494,7 +524,13 @@ class DeleteFileIndex {
wrappersBySpecId
.computeIfAbsent(specId, id ->
StructLikeWrapper.forType(spec.partitionType()))
.copyFor(file.partition());
- deleteFilesByPartition.put(Pair.of(specId, wrapper), new
IndexedDeleteFile(spec, file));
+ IndexedDeleteFile indexedFile = new IndexedDeleteFile(spec, file);
+ deleteFilesByPartition.put(Pair.of(specId, wrapper), indexedFile);
+
+ if (!useColumnStatsFiltering) {
+ useColumnStatsFiltering = indexedFile.hasLowerAndUpperBounds();
+ }
+
ScanMetricsUtil.indexedDeleteFile(scanMetrics, file);
}
@@ -535,7 +571,8 @@ class DeleteFileIndex {
}
}
- return new DeleteFileIndex(specsById, globalDeletes,
sortedDeletesByPartition);
+ return new DeleteFileIndex(
+ specsById, globalDeletes, sortedDeletesByPartition,
useColumnStatsFiltering);
}
private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>>
deleteManifestReaders() {
@@ -597,7 +634,28 @@ class DeleteFileIndex {
this.files = files;
}
+ public DeleteFile[] filter(long seq) {
+ int start = findStartIndex(seq);
+
+ if (start >= files.length) {
+ return NO_DELETES;
+ }
+
+ DeleteFile[] matchingFiles = new DeleteFile[files.length - start];
+
+ for (int index = start; index < files.length; index++) {
+ matchingFiles[index - start] = files[index].wrapped();
+ }
+
+ return matchingFiles;
+ }
+
public Stream<IndexedDeleteFile> limit(long seq) {
+ int start = findStartIndex(seq);
+ return Arrays.stream(files, start, files.length);
+ }
+
+ private int findStartIndex(long seq) {
int pos = Arrays.binarySearch(seqs, seq);
int start;
if (pos < 0) {
@@ -612,7 +670,7 @@ class DeleteFileIndex {
}
}
- return Arrays.stream(files, start, files.length);
+ return start;
}
public Iterable<DeleteFile> referencedDeleteFiles() {
diff --git
a/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
new file mode 100644
index 0000000000..f73794acaf
--- /dev/null
+++
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.spark.sql.functions.lit;
+
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * A benchmark that evaluates the delete file index build and lookup
performance.
+ *
+ * <p>To run this benchmark for spark-3.4: <code>
+ * ./gradlew -DsparkVersions=3.4
:iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=DeleteFileIndexBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-delete-file-index-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 10)
+@Timeout(time = 20, timeUnit = TimeUnit.MINUTES)
+@BenchmarkMode(Mode.SingleShotTime)
+public class DeleteFileIndexBenchmark {
+
+ private static final String TABLE_NAME = "test_table";
+ private static final String PARTITION_COLUMN = "ss_ticket_number";
+
+ private static final int NUM_PARTITIONS = 50;
+ private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25;
+ private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000;
+ private static final int NUM_DELETE_FILES_PER_PARTITION = 100;
+ private static final int NUM_ROWS_PER_DATA_FILE = 500;
+
+ private final Configuration hadoopConf = new Configuration();
+ private SparkSession spark;
+ private Table table;
+
+ private List<DataFile> dataFiles;
+
+ @Setup
+ public void setupBenchmark() throws NoSuchTableException, ParseException {
+ setupSpark();
+ initTable();
+ initDataAndDeletes();
+ loadDataFiles();
+ }
+
+ @TearDown
+ public void tearDownBenchmark() {
+ dropTable();
+ tearDownSpark();
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void buildIndexAndLookup(Blackhole blackhole) {
+ DeleteFileIndex deletes = buildDeletes();
+ for (DataFile dataFile : dataFiles) {
+ DeleteFile[] deleteFiles =
deletes.forDataFile(dataFile.dataSequenceNumber(), dataFile);
+ blackhole.consume(deleteFiles);
+ }
+ }
+
+ private void loadDataFiles() {
+ table.refresh();
+
+ Snapshot snapshot = table.currentSnapshot();
+
+ ManifestGroup manifestGroup =
+ new ManifestGroup(table.io(), snapshot.dataManifests(table.io()),
ImmutableList.of());
+
+ try (CloseableIterable<ManifestEntry<DataFile>> entries =
manifestGroup.entries()) {
+ List<DataFile> files = Lists.newArrayList();
+ for (ManifestEntry<DataFile> entry : entries) {
+ files.add(entry.file().copyWithoutStats());
+ }
+ this.dataFiles = files;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private DeleteFileIndex buildDeletes() {
+ table.refresh();
+
+ List<ManifestFile> deleteManifests =
table.currentSnapshot().deleteManifests(table.io());
+
+ return DeleteFileIndex.builderFor(table.io(), deleteManifests)
+ .specsById(table.specs())
+ .planWith(ThreadPools.getWorkerPool())
+ .build();
+ }
+
+ private DataFile loadAddedDataFile() {
+ table.refresh();
+
+ Iterable<DataFile> addedDataFiles =
table.currentSnapshot().addedDataFiles(table.io());
+ return Iterables.getOnlyElement(addedDataFiles);
+ }
+
+ private DeleteFile loadAddedDeleteFile() {
+ table.refresh();
+
+ Iterable<DeleteFile> addedDeleteFiles =
table.currentSnapshot().addedDeleteFiles(table.io());
+ return Iterables.getOnlyElement(addedDeleteFiles);
+ }
+
+ private void initDataAndDeletes() throws NoSuchTableException {
+ Schema schema = table.schema();
+ PartitionSpec spec = table.spec();
+ LocationProvider locations = table.locationProvider();
+
+ for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS;
partitionOrdinal++) {
+ Dataset<Row> inputDF =
+ randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
+ .drop(PARTITION_COLUMN)
+ .withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
+
+ for (int fileOrdinal = 0; fileOrdinal <
NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) {
+ appendAsFile(inputDF);
+ }
+
+ DataFile dataFile = loadAddedDataFile();
+
+ sql(
+ "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d",
+ TABLE_NAME, PARTITION_COLUMN, partitionOrdinal);
+
+ DeleteFile deleteFile = loadAddedDeleteFile();
+
+ AppendFiles append = table.newFastAppend();
+
+ for (int fileOrdinal = 0; fileOrdinal <
NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) {
+ String replicaFileName = UUID.randomUUID() + "-replica.parquet";
+ DataFile replicaDataFile =
+ DataFiles.builder(spec)
+ .copy(dataFile)
+ .withPath(locations.newDataLocation(spec,
dataFile.partition(), replicaFileName))
+ .build();
+ append.appendFile(replicaDataFile);
+ }
+
+ append.commit();
+
+ RowDelta rowDelta = table.newRowDelta();
+
+ for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION;
fileOrdinal++) {
+ String replicaFileName = UUID.randomUUID() + "-replica.parquet";
+ DeleteFile replicaDeleteFile =
+ FileMetadata.deleteFileBuilder(spec)
+ .copy(deleteFile)
+ .withPath(locations.newDataLocation(spec,
deleteFile.partition(), replicaFileName))
+ .build();
+ rowDelta.addDeletes(replicaDeleteFile);
+ }
+
+ rowDelta.commit();
+ }
+ }
+
+ private void appendAsFile(Dataset<Row> df) throws NoSuchTableException {
+ df.coalesce(1).writeTo(TABLE_NAME).append();
+ }
+
+ private Dataset<Row> randomDataDF(Schema schema, int numRows) {
+ Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
+ JavaSparkContext context =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+ JavaRDD<InternalRow> rowRDD =
context.parallelize(Lists.newArrayList(rows));
+ StructType rowSparkType = SparkSchemaUtil.convert(schema);
+ return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType,
false);
+ }
+
+ private void setupSpark() {
+ this.spark =
+ SparkSession.builder()
+ .config("spark.ui.enabled", false)
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config("spark.sql.extensions",
IcebergSparkSessionExtensions.class.getName())
+ .config("spark.sql.catalog.spark_catalog",
SparkSessionCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+ .config("spark.sql.catalog.spark_catalog.warehouse",
newWarehouseDir())
+ .master("local[*]")
+ .getOrCreate();
+ }
+
+ private void tearDownSpark() {
+ spark.stop();
+ }
+
+ private void initTable() throws NoSuchTableException, ParseException {
+ sql(
+ "CREATE TABLE %s ( "
+ + " `ss_sold_date_sk` INT, "
+ + " `ss_sold_time_sk` INT, "
+ + " `ss_item_sk` INT, "
+ + " `ss_customer_sk` STRING, "
+ + " `ss_cdemo_sk` STRING, "
+ + " `ss_hdemo_sk` STRING, "
+ + " `ss_addr_sk` STRING, "
+ + " `ss_store_sk` STRING, "
+ + " `ss_promo_sk` STRING, "
+ + " `ss_ticket_number` INT, "
+ + " `ss_quantity` STRING, "
+ + " `ss_wholesale_cost` STRING, "
+ + " `ss_list_price` STRING, "
+ + " `ss_sales_price` STRING, "
+ + " `ss_ext_discount_amt` STRING, "
+ + " `ss_ext_sales_price` STRING, "
+ + " `ss_ext_wholesale_cost` STRING, "
+ + " `ss_ext_list_price` STRING, "
+ + " `ss_ext_tax` STRING, "
+ + " `ss_coupon_amt` STRING, "
+ + " `ss_net_paid` STRING, "
+ + " `ss_net_paid_inc_tax` STRING, "
+ + " `ss_net_profit` STRING "
+ + ")"
+ + "USING iceberg "
+ + "PARTITIONED BY (%s) "
+ + "TBLPROPERTIES ("
+ + " '%s' '%b',"
+ + " '%s' '%s',"
+ + " '%s' '%d')",
+ TABLE_NAME,
+ PARTITION_COLUMN,
+ TableProperties.MANIFEST_MERGE_ENABLED,
+ false,
+ TableProperties.DELETE_MODE,
+ RowLevelOperationMode.MERGE_ON_READ.modeName(),
+ TableProperties.FORMAT_VERSION,
+ 2);
+
+ this.table = Spark3Util.loadIcebergTable(spark, TABLE_NAME);
+ }
+
+ private void dropTable() {
+ sql("DROP TABLE IF EXISTS %s PURGE", TABLE_NAME);
+ }
+
+ private String newWarehouseDir() {
+ return hadoopConf.get("hadoop.tmp.dir") + UUID.randomUUID();
+ }
+
+ @FormatMethod
+ private void sql(@FormatString String query, Object... args) {
+ spark.sql(String.format(query, args));
+ }
+}