This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 454ca6899 refactor: Split read benchmarks and add addParquetScanCases
helper (#3407)
454ca6899 is described below
commit 454ca6899c1dff784313cfb89630d57634a55ddc
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 6 13:24:50 2026 -0700
refactor: Split read benchmarks and add addParquetScanCases helper (#3407)
---
.../spark/sql/benchmark/CometBenchmarkBase.scala | 27 ++
.../sql/benchmark/CometIcebergReadBenchmark.scala | 81 ++++++
.../benchmark/CometPartitionColumnBenchmark.scala | 82 ++++++
.../spark/sql/benchmark/CometReadBenchmark.scala | 305 ++-------------------
4 files changed, 214 insertions(+), 281 deletions(-)
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
index 5d1d0c571..2a81316c9 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DecimalType
import org.apache.comet.CometConf
+import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION,
SCAN_NATIVE_ICEBERG_COMPAT}
import org.apache.comet.CometSparkSessionExtensions
trait CometBenchmarkBase
@@ -164,6 +165,32 @@ trait CometBenchmarkBase
benchmark.run()
}
+ protected def addParquetScanCases(
+ benchmark: Benchmark,
+ query: String,
+ caseSuffix: String = "",
+ extraConf: Map[String, String] = Map.empty): Unit = {
+ val suffix = if (caseSuffix.nonEmpty) s" ($caseSuffix)" else ""
+
+ benchmark.addCase(s"SQL Parquet - Spark$suffix") { _ =>
+ withSQLConf(extraConf.toSeq: _*) {
+ spark.sql(query).noop()
+ }
+ }
+
+ for (scanImpl <- Seq(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT)) {
+ benchmark.addCase(s"SQL Parquet - Comet ($scanImpl)$suffix") { _ =>
+ withSQLConf(
+ (extraConf ++ Map(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl)).toSeq: _*) {
+ spark.sql(query).noop()
+ }
+ }
+ }
+ }
+
protected def prepareTable(dir: File, df: DataFrame, partition:
Option[String] = None): Unit = {
val testDf = if (partition.isDefined) {
df.write.partitionBy(partition.get)
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala
new file mode 100644
index 000000000..b90b89371
--- /dev/null
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.types._
+
+import org.apache.comet.CometConf
+
+/**
+ * Benchmark to measure Comet Iceberg read performance. To run this benchmark:
+ * `SPARK_GENERATE_BENCHMARK_FILES=1 make
+ * benchmark-org.apache.spark.sql.benchmark.CometIcebergReadBenchmark` Results
will be written to
+ * "spark/benchmarks/CometIcebergReadBenchmark-**results.txt".
+ */
+object CometIcebergReadBenchmark extends CometBenchmarkBase {
+
+ def icebergScanBenchmark(values: Int, dataType: DataType): Unit = {
+ val sqlBenchmark =
+ new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values,
output = output)
+
+ withTempPath { dir =>
+ withTempTable("icebergTable") {
+ prepareIcebergTable(
+ dir,
+ spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"),
+ "icebergTable")
+
+ val query = dataType match {
+ case BooleanType => "sum(cast(id as bigint))"
+ case _ => "sum(id)"
+ }
+
+ sqlBenchmark.addCase("SQL Iceberg - Spark") { _ =>
+ withSQLConf(
+ "spark.memory.offHeap.enabled" -> "true",
+ "spark.memory.offHeap.size" -> "10g") {
+ spark.sql(s"select $query from icebergTable").noop()
+ }
+ }
+
+ sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ =>
+ withSQLConf(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ "spark.memory.offHeap.enabled" -> "true",
+ "spark.memory.offHeap.size" -> "10g",
+ CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
+ spark.sql(s"select $query from icebergTable").noop()
+ }
+ }
+
+ sqlBenchmark.run()
+ }
+ }
+ }
+
+ override def runCometBenchmark(mainArgs: Array[String]): Unit = {
+ runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 *
1024 * 128) { v =>
+ Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
DoubleType)
+ .foreach(icebergScanBenchmark(v, _))
+ }
+ }
+}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala
new file mode 100644
index 000000000..a7d170057
--- /dev/null
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+
+/**
+ * Benchmark to measure partition column scan performance. This exercises the
CometConstantVector
+ * path where constant columns are exported as 1-element Arrow arrays and
expanded on the native
+ * side.
+ *
+ * To run this benchmark:
+ * {{{
+ * SPARK_GENERATE_BENCHMARK_FILES=1 make \
+ * benchmark-org.apache.spark.sql.benchmark.CometPartitionColumnBenchmark
+ * }}}
+ *
+ * Results will be written to
"spark/benchmarks/CometPartitionColumnBenchmark-**results.txt".
+ */
+object CometPartitionColumnBenchmark extends CometBenchmarkBase {
+
+ def partitionColumnScanBenchmark(values: Int, numPartitionCols: Int): Unit =
{
+ val sqlBenchmark = new Benchmark(
+ s"Partitioned Scan with $numPartitionCols partition column(s)",
+ values,
+ output = output)
+
+ withTempPath { dir =>
+ withTempTable("parquetV1Table") {
+ val partCols =
+ (1 to numPartitionCols).map(i => s"'part$i' as p$i").mkString(", ")
+ val partNames = (1 to numPartitionCols).map(i => s"p$i")
+ val df = spark.sql(s"SELECT value as id, $partCols FROM $tbl")
+ val parquetDir = dir.getCanonicalPath + "/parquetV1"
+ df.write
+ .partitionBy(partNames: _*)
+ .mode("overwrite")
+ .option("compression", "snappy")
+ .parquet(parquetDir)
+
spark.read.parquet(parquetDir).createOrReplaceTempView("parquetV1Table")
+
+ addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table")
+
+ // Also benchmark reading partition columns themselves
+ val partSumExpr =
+ (1 to numPartitionCols).map(i => s"sum(length(p$i))").mkString(", ")
+
+ addParquetScanCases(
+ sqlBenchmark,
+ s"select $partSumExpr from parquetV1Table",
+ caseSuffix = "partition cols")
+
+ sqlBenchmark.run()
+ }
+ }
+ }
+
+ override def runCometBenchmark(mainArgs: Array[String]): Unit = {
+ runBenchmarkWithTable("Partitioned Column Scan", 1024 * 1024 * 15) { v =>
+ for (numPartCols <- List(1, 5)) {
+ partitionColumnScanBenchmark(v, numPartCols)
+ }
+ }
+ }
+}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
index 3bfbdee91..a2f196a4f 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
import org.apache.comet.{CometConf, WithHdfsCluster}
-import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION,
SCAN_NATIVE_ICEBERG_COMPAT}
import org.apache.comet.parquet.BatchReader
/**
@@ -50,7 +49,6 @@ import org.apache.comet.parquet.BatchReader
class CometReadBaseBenchmark extends CometBenchmarkBase {
def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
- // Benchmarks running through spark sql.
val sqlBenchmark =
new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values, output
= output)
@@ -63,76 +61,13 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
case _ => "sum(id)"
}
- sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql(s"select $query from parquetV1Table").noop()
- }
-
- sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
- spark.sql(s"select $query from parquetV1Table").noop()
- }
- }
-
- sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _
=>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key ->
SCAN_NATIVE_ICEBERG_COMPAT) {
- spark.sql(s"select $query from parquetV1Table").noop()
- }
- }
-
- sqlBenchmark.run()
- }
- }
- }
-
- def icebergScanBenchmark(values: Int, dataType: DataType): Unit = {
- // Benchmarks running through spark sql.
- val sqlBenchmark =
- new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values,
output = output)
-
- withTempPath { dir =>
- withTempTable("icebergTable") {
- prepareIcebergTable(
- dir,
- spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"),
- "icebergTable")
-
- val query = dataType match {
- case BooleanType => "sum(cast(id as bigint))"
- case _ => "sum(id)"
- }
-
- sqlBenchmark.addCase("SQL Iceberg - Spark") { _ =>
- withSQLConf(
- "spark.memory.offHeap.enabled" -> "true",
- "spark.memory.offHeap.size" -> "10g") {
- spark.sql(s"select $query from icebergTable").noop()
- }
- }
-
- sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- "spark.memory.offHeap.enabled" -> "true",
- "spark.memory.offHeap.size" -> "10g",
- CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
- spark.sql(s"select $query from icebergTable").noop()
- }
- }
-
+ addParquetScanCases(sqlBenchmark, s"select $query from parquetV1Table")
sqlBenchmark.run()
}
}
}
def encryptedScanBenchmark(values: Int, dataType: DataType): Unit = {
- // Benchmarks running through spark sql.
val sqlBenchmark =
new Benchmark(s"SQL Single ${dataType.sql} Encrypted Column Scan",
values, output = output)
@@ -143,6 +78,15 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
val cryptoFactoryClass =
"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"
+ val cryptoConf = Map(
+ "spark.memory.offHeap.enabled" -> "true",
+ "spark.memory.offHeap.size" -> "10g",
+ DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME ->
cryptoFactoryClass,
+ KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
+ "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+ InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
+ s"footerKey: ${footerKey}, key1: ${key1}")
+
withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareEncryptedTable(
@@ -154,51 +98,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
case _ => "sum(id)"
}
- sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
- withSQLConf(
- "spark.memory.offHeap.enabled" -> "true",
- "spark.memory.offHeap.size" -> "10g",
- DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME ->
cryptoFactoryClass,
- KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
- "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
- InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
- s"footerKey: ${footerKey}, key1: ${key1}") {
- spark.sql(s"select $query from parquetV1Table").noop()
- }
- }
-
- sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
- withSQLConf(
- "spark.memory.offHeap.enabled" -> "true",
- "spark.memory.offHeap.size" -> "10g",
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION,
- DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME ->
cryptoFactoryClass,
- KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
- "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
- InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
- s"footerKey: ${footerKey}, key1: ${key1}") {
- spark.sql(s"select $query from parquetV1Table").noop()
- }
- }
-
- sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _
=>
- withSQLConf(
- "spark.memory.offHeap.enabled" -> "true",
- "spark.memory.offHeap.size" -> "10g",
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT,
- DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME ->
cryptoFactoryClass,
- KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
- "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
- InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
- s"footerKey: ${footerKey}, key1: ${key1}") {
- spark.sql(s"select $query from parquetV1Table").noop()
- }
- }
-
+ addParquetScanCases(
+ sqlBenchmark,
+ s"select $query from parquetV1Table",
+ extraConf = cryptoConf)
sqlBenchmark.run()
}
}
@@ -218,28 +121,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
s"SELECT CAST(value / 10000000.0 as DECIMAL($precision, $scale)) "
+
s"id FROM $tbl"))
- sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql("select sum(id) from parquetV1Table").noop()
- }
-
- sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
- spark.sql("select sum(id) from parquetV1Table").noop()
- }
- }
-
- sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _
=>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key ->
SCAN_NATIVE_ICEBERG_COMPAT) {
- spark.sql("select sum(id) from parquetV1Table").noop()
- }
- }
-
+ addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table")
sqlBenchmark.run()
}
}
@@ -338,28 +220,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, value AS
c2 FROM " +
s"$tbl"))
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql("select sum(c2) from parquetV1Table where c1 + 1 >
0").noop()
- }
-
- benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
- spark.sql("select sum(c2) from parquetV1Table where c1 + 1 >
0").noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key ->
SCAN_NATIVE_ICEBERG_COMPAT) {
- spark.sql("select sum(c2) from parquetV1Table where c1 + 1 >
0").noop()
- }
- }
-
+ addParquetScanCases(benchmark, "select sum(c2) from parquetV1Table
where c1 + 1 > 0")
benchmark.run()
}
}
@@ -388,28 +249,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
|FROM tmp
|""".stripMargin))
- sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql("select sum(length(id)) from parquetV1Table").noop()
- }
-
- sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
- spark.sql("select sum(length(id)) from parquetV1Table").noop()
- }
- }
-
- sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _
=>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key ->
SCAN_NATIVE_ICEBERG_COMPAT) {
- spark.sql("select sum(length(id)) from parquetV1Table").noop()
- }
- }
-
+ addParquetScanCases(sqlBenchmark, "select sum(length(id)) from
parquetV1Table")
sqlBenchmark.run()
}
}
@@ -428,37 +268,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
s"SELECT IF(RAND(1) < $fractionOfNulls, NULL, CAST(value as
STRING)) AS c1, " +
s"IF(RAND(2) < $fractionOfNulls, NULL, CAST(value as STRING)) AS
c2 FROM $tbl"))
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark
- .sql("select sum(length(c2)) from parquetV1Table where c1 is " +
- "not NULL and c2 is not NULL")
- .noop()
- }
-
- benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
- spark
- .sql("select sum(length(c2)) from parquetV1Table where c1 is " +
- "not NULL and c2 is not NULL")
- .noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key ->
SCAN_NATIVE_ICEBERG_COMPAT) {
- spark
- .sql("select sum(length(c2)) from parquetV1Table where c1 is " +
- "not NULL and c2 is not NULL")
- .noop()
- }
- }
-
+ addParquetScanCases(
+ benchmark,
+ "select sum(length(c2)) from parquetV1Table where c1 is " +
+ "not NULL and c2 is not NULL")
benchmark.run()
}
}
@@ -476,28 +289,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
prepareTable(dir, spark.sql("SELECT * FROM t1"))
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
- }
-
- benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
- spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key ->
SCAN_NATIVE_ICEBERG_COMPAT) {
- spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
- }
- }
-
+ addParquetScanCases(benchmark, s"SELECT sum(c$middle) FROM
parquetV1Table")
benchmark.run()
}
}
@@ -519,28 +311,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, " +
s"REPEAT(CAST(value AS STRING), 100) AS c2 FROM $tbl"))
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
- }
-
- benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
- spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key ->
SCAN_NATIVE_ICEBERG_COMPAT) {
- spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
- }
- }
-
+ addParquetScanCases(benchmark, "SELECT * FROM parquetV1Table WHERE c1
+ 1 > 0")
benchmark.run()
}
}
@@ -562,28 +333,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, " +
s"REPEAT(CAST(value AS STRING), 100) AS c2 FROM $tbl ORDER BY
c1, c2"))
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
- }
-
- benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
- spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_NATIVE_SCAN_IMPL.key ->
SCAN_NATIVE_ICEBERG_COMPAT) {
- spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
- }
- }
-
+ addParquetScanCases(benchmark, "SELECT * FROM parquetV1Table WHERE c1
+ 1 > 0")
benchmark.run()
}
}
@@ -611,13 +361,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
}
}
- runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 *
1024 * 128) { v =>
- Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
DoubleType)
- .foreach { dataType =>
- icebergScanBenchmark(v, dataType)
- }
- }
-
runBenchmarkWithTable("SQL Single Numeric Encrypted Column Scan", 1024 *
1024 * 128) { v =>
Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
DoubleType)
.foreach { dataType =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]