This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e8c197d360 Spark 3.4: Rework DeleteFileIndexBenchmark (#9600)
e8c197d360 is described below

commit e8c197d360ca75d3be8f9c886615a196c63d36c5
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 1 09:34:25 2024 -0800

    Spark 3.4: Rework DeleteFileIndexBenchmark (#9600)
---
 spark/v3.4/build.gradle                            |  1 +
 .../apache/iceberg/DeleteFileIndexBenchmark.java   | 91 +++-------------------
 2 files changed, 10 insertions(+), 82 deletions(-)

diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index cf41896e21..0b1c4df75a 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -158,6 +158,7 @@ 
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
     testImplementation project(path: ':iceberg-parquet')
     testImplementation project(path: ':iceberg-hive-metastore')
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-hive-metastore', configuration: 
'testArtifacts')
     testImplementation project(path: 
":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", 
configuration: 'testArtifacts')
 
diff --git 
a/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
 
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
index f73794acaf..f48e39e500 100644
--- 
a/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
+++ 
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iceberg;
 
-import static org.apache.spark.sql.functions.lit;
-
 import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
 import java.io.IOException;
@@ -29,25 +27,15 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
-import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkSessionCatalog;
-import org.apache.iceberg.spark.data.RandomData;
 import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
 import org.apache.iceberg.util.ThreadPools;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.parser.ParseException;
-import org.apache.spark.sql.types.StructType;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -83,10 +71,8 @@ public class DeleteFileIndexBenchmark {
   private static final String PARTITION_COLUMN = "ss_ticket_number";
 
   private static final int NUM_PARTITIONS = 50;
-  private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25;
-  private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000;
+  private static final int NUM_DATA_FILES_PER_PARTITION = 50_000;
   private static final int NUM_DELETE_FILES_PER_PARTITION = 100;
-  private static final int NUM_ROWS_PER_DATA_FILE = 500;
 
   private final Configuration hadoopConf = new Configuration();
   private SparkSession spark;
@@ -148,85 +134,26 @@ public class DeleteFileIndexBenchmark {
         .build();
   }
 
-  private DataFile loadAddedDataFile() {
-    table.refresh();
-
-    Iterable<DataFile> addedDataFiles = 
table.currentSnapshot().addedDataFiles(table.io());
-    return Iterables.getOnlyElement(addedDataFiles);
-  }
-
-  private DeleteFile loadAddedDeleteFile() {
-    table.refresh();
-
-    Iterable<DeleteFile> addedDeleteFiles = 
table.currentSnapshot().addedDeleteFiles(table.io());
-    return Iterables.getOnlyElement(addedDeleteFiles);
-  }
-
-  private void initDataAndDeletes() throws NoSuchTableException {
-    Schema schema = table.schema();
-    PartitionSpec spec = table.spec();
-    LocationProvider locations = table.locationProvider();
-
+  private void initDataAndDeletes() {
     for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; 
partitionOrdinal++) {
-      Dataset<Row> inputDF =
-          randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
-              .drop(PARTITION_COLUMN)
-              .withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
-
-      for (int fileOrdinal = 0; fileOrdinal < 
NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) {
-        appendAsFile(inputDF);
-      }
+      StructLike partition = TestHelpers.Row.of(partitionOrdinal);
 
-      DataFile dataFile = loadAddedDataFile();
-
-      sql(
-          "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d",
-          TABLE_NAME, PARTITION_COLUMN, partitionOrdinal);
-
-      DeleteFile deleteFile = loadAddedDeleteFile();
-
-      AppendFiles append = table.newFastAppend();
+      RowDelta rowDelta = table.newRowDelta();
 
-      for (int fileOrdinal = 0; fileOrdinal < 
NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) {
-        String replicaFileName = UUID.randomUUID() + "-replica.parquet";
-        DataFile replicaDataFile =
-            DataFiles.builder(spec)
-                .copy(dataFile)
-                .withPath(locations.newDataLocation(spec, 
dataFile.partition(), replicaFileName))
-                .build();
-        append.appendFile(replicaDataFile);
+      for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; 
fileOrdinal++) {
+        DataFile dataFile = FileGenerationUtil.generateDataFile(table, 
partition);
+        rowDelta.addRows(dataFile);
       }
 
-      append.commit();
-
-      RowDelta rowDelta = table.newRowDelta();
-
       for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; 
fileOrdinal++) {
-        String replicaFileName = UUID.randomUUID() + "-replica.parquet";
-        DeleteFile replicaDeleteFile =
-            FileMetadata.deleteFileBuilder(spec)
-                .copy(deleteFile)
-                .withPath(locations.newDataLocation(spec, 
deleteFile.partition(), replicaFileName))
-                .build();
-        rowDelta.addDeletes(replicaDeleteFile);
+        DeleteFile deleteFile = 
FileGenerationUtil.generatePositionDeleteFile(table, partition);
+        rowDelta.addDeletes(deleteFile);
       }
 
       rowDelta.commit();
     }
   }
 
-  private void appendAsFile(Dataset<Row> df) throws NoSuchTableException {
-    df.coalesce(1).writeTo(TABLE_NAME).append();
-  }
-
-  private Dataset<Row> randomDataDF(Schema schema, int numRows) {
-    Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
-    JavaSparkContext context = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
-    JavaRDD<InternalRow> rowRDD = 
context.parallelize(Lists.newArrayList(rows));
-    StructType rowSparkType = SparkSchemaUtil.convert(schema);
-    return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, 
false);
-  }
-
   private void setupSpark() {
     this.spark =
         SparkSession.builder()

Reply via email to