This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e8c197d360 Spark 3.4: Rework DeleteFileIndexBenchmark (#9600)
e8c197d360 is described below
commit e8c197d360ca75d3be8f9c886615a196c63d36c5
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 1 09:34:25 2024 -0800
Spark 3.4: Rework DeleteFileIndexBenchmark (#9600)
---
spark/v3.4/build.gradle | 1 +
.../apache/iceberg/DeleteFileIndexBenchmark.java | 91 +++-------------------
2 files changed, 10 insertions(+), 82 deletions(-)
diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index cf41896e21..0b1c4df75a 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -158,6 +158,7 @@
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation project(path: ':iceberg-parquet')
testImplementation project(path: ':iceberg-hive-metastore')
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration:
'testArtifacts')
testImplementation project(path:
":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}",
configuration: 'testArtifacts')
diff --git
a/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
index f73794acaf..f48e39e500 100644
---
a/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
+++
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
@@ -18,8 +18,6 @@
*/
package org.apache.iceberg;
-import static org.apache.spark.sql.functions.lit;
-
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
@@ -29,25 +27,15 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
-import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkSessionCatalog;
-import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.apache.iceberg.util.ThreadPools;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
-import org.apache.spark.sql.types.StructType;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -83,10 +71,8 @@ public class DeleteFileIndexBenchmark {
private static final String PARTITION_COLUMN = "ss_ticket_number";
private static final int NUM_PARTITIONS = 50;
- private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25;
- private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000;
+ private static final int NUM_DATA_FILES_PER_PARTITION = 50_000;
private static final int NUM_DELETE_FILES_PER_PARTITION = 100;
- private static final int NUM_ROWS_PER_DATA_FILE = 500;
private final Configuration hadoopConf = new Configuration();
private SparkSession spark;
@@ -148,85 +134,26 @@ public class DeleteFileIndexBenchmark {
.build();
}
- private DataFile loadAddedDataFile() {
- table.refresh();
-
- Iterable<DataFile> addedDataFiles =
table.currentSnapshot().addedDataFiles(table.io());
- return Iterables.getOnlyElement(addedDataFiles);
- }
-
- private DeleteFile loadAddedDeleteFile() {
- table.refresh();
-
- Iterable<DeleteFile> addedDeleteFiles =
table.currentSnapshot().addedDeleteFiles(table.io());
- return Iterables.getOnlyElement(addedDeleteFiles);
- }
-
- private void initDataAndDeletes() throws NoSuchTableException {
- Schema schema = table.schema();
- PartitionSpec spec = table.spec();
- LocationProvider locations = table.locationProvider();
-
+ private void initDataAndDeletes() {
for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS;
partitionOrdinal++) {
- Dataset<Row> inputDF =
- randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
- .drop(PARTITION_COLUMN)
- .withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
-
- for (int fileOrdinal = 0; fileOrdinal <
NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) {
- appendAsFile(inputDF);
- }
+ StructLike partition = TestHelpers.Row.of(partitionOrdinal);
- DataFile dataFile = loadAddedDataFile();
-
- sql(
- "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d",
- TABLE_NAME, PARTITION_COLUMN, partitionOrdinal);
-
- DeleteFile deleteFile = loadAddedDeleteFile();
-
- AppendFiles append = table.newFastAppend();
+ RowDelta rowDelta = table.newRowDelta();
- for (int fileOrdinal = 0; fileOrdinal <
NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) {
- String replicaFileName = UUID.randomUUID() + "-replica.parquet";
- DataFile replicaDataFile =
- DataFiles.builder(spec)
- .copy(dataFile)
- .withPath(locations.newDataLocation(spec,
dataFile.partition(), replicaFileName))
- .build();
- append.appendFile(replicaDataFile);
+ for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION;
fileOrdinal++) {
+ DataFile dataFile = FileGenerationUtil.generateDataFile(table,
partition);
+ rowDelta.addRows(dataFile);
}
- append.commit();
-
- RowDelta rowDelta = table.newRowDelta();
-
for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION;
fileOrdinal++) {
- String replicaFileName = UUID.randomUUID() + "-replica.parquet";
- DeleteFile replicaDeleteFile =
- FileMetadata.deleteFileBuilder(spec)
- .copy(deleteFile)
- .withPath(locations.newDataLocation(spec,
deleteFile.partition(), replicaFileName))
- .build();
- rowDelta.addDeletes(replicaDeleteFile);
+ DeleteFile deleteFile =
FileGenerationUtil.generatePositionDeleteFile(table, partition);
+ rowDelta.addDeletes(deleteFile);
}
rowDelta.commit();
}
}
- private void appendAsFile(Dataset<Row> df) throws NoSuchTableException {
- df.coalesce(1).writeTo(TABLE_NAME).append();
- }
-
- private Dataset<Row> randomDataDF(Schema schema, int numRows) {
- Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
- JavaSparkContext context =
JavaSparkContext.fromSparkContext(spark.sparkContext());
- JavaRDD<InternalRow> rowRDD =
context.parallelize(Lists.newArrayList(rows));
- StructType rowSparkType = SparkSchemaUtil.convert(schema);
- return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType,
false);
- }
-
private void setupSpark() {
this.spark =
SparkSession.builder()