This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 454ca6899 refactor: Split read benchmarks and add addParquetScanCases 
helper (#3407)
454ca6899 is described below

commit 454ca6899c1dff784313cfb89630d57634a55ddc
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 6 13:24:50 2026 -0700

    refactor: Split read benchmarks and add addParquetScanCases helper (#3407)
---
 .../spark/sql/benchmark/CometBenchmarkBase.scala   |  27 ++
 .../sql/benchmark/CometIcebergReadBenchmark.scala  |  81 ++++++
 .../benchmark/CometPartitionColumnBenchmark.scala  |  82 ++++++
 .../spark/sql/benchmark/CometReadBenchmark.scala   | 305 ++-------------------
 4 files changed, 214 insertions(+), 281 deletions(-)

diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
index 5d1d0c571..2a81316c9 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DecimalType
 
 import org.apache.comet.CometConf
+import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, 
SCAN_NATIVE_ICEBERG_COMPAT}
 import org.apache.comet.CometSparkSessionExtensions
 
 trait CometBenchmarkBase
@@ -164,6 +165,32 @@ trait CometBenchmarkBase
     benchmark.run()
   }
 
+  protected def addParquetScanCases(
+      benchmark: Benchmark,
+      query: String,
+      caseSuffix: String = "",
+      extraConf: Map[String, String] = Map.empty): Unit = {
+    val suffix = if (caseSuffix.nonEmpty) s" ($caseSuffix)" else ""
+
+    benchmark.addCase(s"SQL Parquet - Spark$suffix") { _ =>
+      withSQLConf(extraConf.toSeq: _*) {
+        spark.sql(query).noop()
+      }
+    }
+
+    for (scanImpl <- Seq(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT)) {
+      benchmark.addCase(s"SQL Parquet - Comet ($scanImpl)$suffix") { _ =>
+        withSQLConf(
+          (extraConf ++ Map(
+            CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
+            CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl)).toSeq: _*) {
+          spark.sql(query).noop()
+        }
+      }
+    }
+  }
+
   protected def prepareTable(dir: File, df: DataFrame, partition: 
Option[String] = None): Unit = {
     val testDf = if (partition.isDefined) {
       df.write.partitionBy(partition.get)
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala
new file mode 100644
index 000000000..b90b89371
--- /dev/null
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.types._
+
+import org.apache.comet.CometConf
+
+/**
+ * Benchmark to measure Comet Iceberg read performance. To run this benchmark:
+ * `SPARK_GENERATE_BENCHMARK_FILES=1 make
+ * benchmark-org.apache.spark.sql.benchmark.CometIcebergReadBenchmark` Results 
will be written to
+ * "spark/benchmarks/CometIcebergReadBenchmark-**results.txt".
+ */
+object CometIcebergReadBenchmark extends CometBenchmarkBase {
+
+  def icebergScanBenchmark(values: Int, dataType: DataType): Unit = {
+    val sqlBenchmark =
+      new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values, 
output = output)
+
+    withTempPath { dir =>
+      withTempTable("icebergTable") {
+        prepareIcebergTable(
+          dir,
+          spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"),
+          "icebergTable")
+
+        val query = dataType match {
+          case BooleanType => "sum(cast(id as bigint))"
+          case _ => "sum(id)"
+        }
+
+        sqlBenchmark.addCase("SQL Iceberg - Spark") { _ =>
+          withSQLConf(
+            "spark.memory.offHeap.enabled" -> "true",
+            "spark.memory.offHeap.size" -> "10g") {
+            spark.sql(s"select $query from icebergTable").noop()
+          }
+        }
+
+        sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ =>
+          withSQLConf(
+            CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
+            "spark.memory.offHeap.enabled" -> "true",
+            "spark.memory.offHeap.size" -> "10g",
+            CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
+            spark.sql(s"select $query from icebergTable").noop()
+          }
+        }
+
+        sqlBenchmark.run()
+      }
+    }
+  }
+
+  override def runCometBenchmark(mainArgs: Array[String]): Unit = {
+    runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 * 
1024 * 128) { v =>
+      Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, 
DoubleType)
+        .foreach(icebergScanBenchmark(v, _))
+    }
+  }
+}
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala
new file mode 100644
index 000000000..a7d170057
--- /dev/null
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+
+/**
+ * Benchmark to measure partition column scan performance. This exercises the 
CometConstantVector
+ * path where constant columns are exported as 1-element Arrow arrays and 
expanded on the native
+ * side.
+ *
+ * To run this benchmark:
+ * {{{
+ * SPARK_GENERATE_BENCHMARK_FILES=1 make \
+ *   benchmark-org.apache.spark.sql.benchmark.CometPartitionColumnBenchmark
+ * }}}
+ *
+ * Results will be written to 
"spark/benchmarks/CometPartitionColumnBenchmark-**results.txt".
+ */
+object CometPartitionColumnBenchmark extends CometBenchmarkBase {
+
+  def partitionColumnScanBenchmark(values: Int, numPartitionCols: Int): Unit = 
{
+    val sqlBenchmark = new Benchmark(
+      s"Partitioned Scan with $numPartitionCols partition column(s)",
+      values,
+      output = output)
+
+    withTempPath { dir =>
+      withTempTable("parquetV1Table") {
+        val partCols =
+          (1 to numPartitionCols).map(i => s"'part$i' as p$i").mkString(", ")
+        val partNames = (1 to numPartitionCols).map(i => s"p$i")
+        val df = spark.sql(s"SELECT value as id, $partCols FROM $tbl")
+        val parquetDir = dir.getCanonicalPath + "/parquetV1"
+        df.write
+          .partitionBy(partNames: _*)
+          .mode("overwrite")
+          .option("compression", "snappy")
+          .parquet(parquetDir)
+        
spark.read.parquet(parquetDir).createOrReplaceTempView("parquetV1Table")
+
+        addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table")
+
+        // Also benchmark reading partition columns themselves
+        val partSumExpr =
+          (1 to numPartitionCols).map(i => s"sum(length(p$i))").mkString(", ")
+
+        addParquetScanCases(
+          sqlBenchmark,
+          s"select $partSumExpr from parquetV1Table",
+          caseSuffix = "partition cols")
+
+        sqlBenchmark.run()
+      }
+    }
+  }
+
+  override def runCometBenchmark(mainArgs: Array[String]): Unit = {
+    runBenchmarkWithTable("Partitioned Column Scan", 1024 * 1024 * 15) { v =>
+      for (numPartCols <- List(1, 5)) {
+        partitionColumnScanBenchmark(v, numPartCols)
+      }
+    }
+  }
+}
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
index 3bfbdee91..a2f196a4f 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnVector
 
 import org.apache.comet.{CometConf, WithHdfsCluster}
-import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, 
SCAN_NATIVE_ICEBERG_COMPAT}
 import org.apache.comet.parquet.BatchReader
 
 /**
@@ -50,7 +49,6 @@ import org.apache.comet.parquet.BatchReader
 class CometReadBaseBenchmark extends CometBenchmarkBase {
 
   def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
-    // Benchmarks running through spark sql.
     val sqlBenchmark =
       new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values, output 
= output)
 
@@ -63,76 +61,13 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
           case _ => "sum(id)"
         }
 
-        sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql(s"select $query from parquetV1Table").noop()
-        }
-
-        sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
-            spark.sql(s"select $query from parquetV1Table").noop()
-          }
-        }
-
-        sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ 
=>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
-            spark.sql(s"select $query from parquetV1Table").noop()
-          }
-        }
-
-        sqlBenchmark.run()
-      }
-    }
-  }
-
-  def icebergScanBenchmark(values: Int, dataType: DataType): Unit = {
-    // Benchmarks running through spark sql.
-    val sqlBenchmark =
-      new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values, 
output = output)
-
-    withTempPath { dir =>
-      withTempTable("icebergTable") {
-        prepareIcebergTable(
-          dir,
-          spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"),
-          "icebergTable")
-
-        val query = dataType match {
-          case BooleanType => "sum(cast(id as bigint))"
-          case _ => "sum(id)"
-        }
-
-        sqlBenchmark.addCase("SQL Iceberg - Spark") { _ =>
-          withSQLConf(
-            "spark.memory.offHeap.enabled" -> "true",
-            "spark.memory.offHeap.size" -> "10g") {
-            spark.sql(s"select $query from icebergTable").noop()
-          }
-        }
-
-        sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            "spark.memory.offHeap.enabled" -> "true",
-            "spark.memory.offHeap.size" -> "10g",
-            CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
-            spark.sql(s"select $query from icebergTable").noop()
-          }
-        }
-
+        addParquetScanCases(sqlBenchmark, s"select $query from parquetV1Table")
         sqlBenchmark.run()
       }
     }
   }
 
   def encryptedScanBenchmark(values: Int, dataType: DataType): Unit = {
-    // Benchmarks running through spark sql.
     val sqlBenchmark =
       new Benchmark(s"SQL Single ${dataType.sql} Encrypted Column Scan", 
values, output = output)
 
@@ -143,6 +78,15 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
     val cryptoFactoryClass =
       "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"
 
+    val cryptoConf = Map(
+      "spark.memory.offHeap.enabled" -> "true",
+      "spark.memory.offHeap.size" -> "10g",
+      DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> 
cryptoFactoryClass,
+      KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
+        "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+      InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
+        s"footerKey: ${footerKey}, key1: ${key1}")
+
     withTempPath { dir =>
       withTempTable("parquetV1Table") {
         prepareEncryptedTable(
@@ -154,51 +98,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
           case _ => "sum(id)"
         }
 
-        sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
-          withSQLConf(
-            "spark.memory.offHeap.enabled" -> "true",
-            "spark.memory.offHeap.size" -> "10g",
-            DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> 
cryptoFactoryClass,
-            KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
-              "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
-            InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
-              s"footerKey: ${footerKey}, key1: ${key1}") {
-            spark.sql(s"select $query from parquetV1Table").noop()
-          }
-        }
-
-        sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
-          withSQLConf(
-            "spark.memory.offHeap.enabled" -> "true",
-            "spark.memory.offHeap.size" -> "10g",
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION,
-            DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> 
cryptoFactoryClass,
-            KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
-              "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
-            InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
-              s"footerKey: ${footerKey}, key1: ${key1}") {
-            spark.sql(s"select $query from parquetV1Table").noop()
-          }
-        }
-
-        sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ 
=>
-          withSQLConf(
-            "spark.memory.offHeap.enabled" -> "true",
-            "spark.memory.offHeap.size" -> "10g",
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT,
-            DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> 
cryptoFactoryClass,
-            KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
-              "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
-            InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
-              s"footerKey: ${footerKey}, key1: ${key1}") {
-            spark.sql(s"select $query from parquetV1Table").noop()
-          }
-        }
-
+        addParquetScanCases(
+          sqlBenchmark,
+          s"select $query from parquetV1Table",
+          extraConf = cryptoConf)
         sqlBenchmark.run()
       }
     }
@@ -218,28 +121,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
             s"SELECT CAST(value / 10000000.0 as DECIMAL($precision, $scale)) " 
+
               s"id FROM $tbl"))
 
-        sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql("select sum(id) from parquetV1Table").noop()
-        }
-
-        sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
-            spark.sql("select sum(id) from parquetV1Table").noop()
-          }
-        }
-
-        sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ 
=>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
-            spark.sql("select sum(id) from parquetV1Table").noop()
-          }
-        }
-
+        addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table")
         sqlBenchmark.run()
       }
     }
@@ -338,28 +220,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
             s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, value AS 
c2 FROM " +
               s"$tbl"))
 
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 
0").noop()
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
-            spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 
0").noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
-            spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 
0").noop()
-          }
-        }
-
+        addParquetScanCases(benchmark, "select sum(c2) from parquetV1Table 
where c1 + 1 > 0")
         benchmark.run()
       }
     }
@@ -388,28 +249,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
                        |FROM tmp
                        |""".stripMargin))
 
-        sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql("select sum(length(id)) from parquetV1Table").noop()
-        }
-
-        sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
-            spark.sql("select sum(length(id)) from parquetV1Table").noop()
-          }
-        }
-
-        sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ 
=>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
-            spark.sql("select sum(length(id)) from parquetV1Table").noop()
-          }
-        }
-
+        addParquetScanCases(sqlBenchmark, "select sum(length(id)) from 
parquetV1Table")
         sqlBenchmark.run()
       }
     }
@@ -428,37 +268,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
             s"SELECT IF(RAND(1) < $fractionOfNulls, NULL, CAST(value as 
STRING)) AS c1, " +
               s"IF(RAND(2) < $fractionOfNulls, NULL, CAST(value as STRING)) AS 
c2 FROM $tbl"))
 
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark
-            .sql("select sum(length(c2)) from parquetV1Table where c1 is " +
-              "not NULL and c2 is not NULL")
-            .noop()
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
-            spark
-              .sql("select sum(length(c2)) from parquetV1Table where c1 is " +
-                "not NULL and c2 is not NULL")
-              .noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
-            spark
-              .sql("select sum(length(c2)) from parquetV1Table where c1 is " +
-                "not NULL and c2 is not NULL")
-              .noop()
-          }
-        }
-
+        addParquetScanCases(
+          benchmark,
+          "select sum(length(c2)) from parquetV1Table where c1 is " +
+            "not NULL and c2 is not NULL")
         benchmark.run()
       }
     }
@@ -476,28 +289,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
 
         prepareTable(dir, spark.sql("SELECT * FROM t1"))
 
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
-            spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
-            spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
-          }
-        }
-
+        addParquetScanCases(benchmark, s"SELECT sum(c$middle) FROM 
parquetV1Table")
         benchmark.run()
       }
     }
@@ -519,28 +311,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
             s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, " +
               s"REPEAT(CAST(value AS STRING), 100) AS c2 FROM $tbl"))
 
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
-            spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
-            spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
-          }
-        }
-
+        addParquetScanCases(benchmark, "SELECT * FROM parquetV1Table WHERE c1 
+ 1 > 0")
         benchmark.run()
       }
     }
@@ -562,28 +333,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
             s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, " +
               s"REPEAT(CAST(value AS STRING), 100) AS c2 FROM $tbl ORDER BY 
c1, c2"))
 
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
-            spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
-            spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
-          }
-        }
-
+        addParquetScanCases(benchmark, "SELECT * FROM parquetV1Table WHERE c1 
+ 1 > 0")
         benchmark.run()
       }
     }
@@ -611,13 +361,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
         }
     }
 
-    runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 * 
1024 * 128) { v =>
-      Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, 
DoubleType)
-        .foreach { dataType =>
-          icebergScanBenchmark(v, dataType)
-        }
-    }
-
     runBenchmarkWithTable("SQL Single Numeric Encrypted Column Scan", 1024 * 
1024 * 128) { v =>
       Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, 
DoubleType)
         .foreach { dataType =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to