This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new bef775999 feat: expose comet metrics through Sparks external
monitoring system (#3708)
bef775999 is described below
commit bef775999f078e967844f5a56d250d0e11c8bf52
Author: Bhargava Vadlamani <[email protected]>
AuthorDate: Sun Mar 29 21:16:52 2026 -0700
feat: expose comet metrics through Sparks external monitoring system (#3708)
---
.../main/scala/org/apache/comet/CometConf.scala | 14 +++++
dev/ensure-jars-have-correct-contents.sh | 1 +
.../org/apache/comet/CometMetricsListener.scala | 38 +++++++++++++
.../org/apache/comet/ExtendedExplainInfo.scala | 19 +++++++
.../main/scala/org/apache/spark/CometSource.scala | 62 ++++++++++++++++++++++
.../src/main/scala/org/apache/spark/Plugins.scala | 31 ++++++++++-
.../scala/org/apache/spark/CometPluginsSuite.scala | 42 ++++++++++++++-
7 files changed, 205 insertions(+), 2 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index ab9d87e77..046ccf0b1 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -798,6 +798,20 @@ object CometConf extends ShimCometConf {
.longConf
.createWithDefault(3000L)
+ val COMET_METRICS_ENABLED: ConfigEntry[Boolean] =
+ conf("spark.comet.metrics.enabled")
+ .category(CATEGORY_EXEC)
+ .doc(
+ "Whether to enable Comet metrics reporting through Spark's external
monitoring system. " +
+ "When enabled, Comet exposes metrics such as native operators, Spark
operators, " +
+ "queries planned, transitions, and acceleration ratio. These metrics
can be " +
+ "visualized through tools like Grafana when a metrics sink (e.g.,
Prometheus) is " +
+ "configured. Disabled by default because Spark plan traversal adds
overhead and " +
+ "metrics require a sink to be useful. " +
+ "This config must be set before the SparkSession is created to take
effect.")
+ .booleanConf
+ .createWithDefault(false)
+
val COMET_LIBHDFS_SCHEMES_KEY = "fs.comet.libhdfs.schemes"
val COMET_LIBHDFS_SCHEMES: OptionalConfigEntry[String] =
diff --git a/dev/ensure-jars-have-correct-contents.sh
b/dev/ensure-jars-have-correct-contents.sh
index 570aeabb2..084936475 100755
--- a/dev/ensure-jars-have-correct-contents.sh
+++ b/dev/ensure-jars-have-correct-contents.sh
@@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$"
allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$"
allowed_expr+="|^org/apache/spark/CometPlugin.class$"
allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$"
+allowed_expr+="|^org/apache/spark/CometSource.*$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$"
allowed_expr+="|^scala-collection-compat.properties$"
diff --git a/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala
b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala
new file mode 100644
index 000000000..96c03a99d
--- /dev/null
+++ b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import org.apache.spark.CometSource
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+class CometMetricsListener extends QueryExecutionListener {
+
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ val stats = CometCoverageStats.forPlan(qe.executedPlan)
+ CometSource.recordStats(stats)
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {
+ // Record stats even on failure since the query was still planned
+ val stats = CometCoverageStats.forPlan(qe.executedPlan)
+ CometSource.recordStats(stats)
+ }
+}
diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
index f47428e80..d30a1fe78 100644
--- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
+++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
@@ -192,6 +192,25 @@ class CometCoverageStats {
}
}
+object CometCoverageStats {
+
+ /**
+ * Compute coverage stats for a plan without generating explain string.
+ */
+ def forPlan(plan: SparkPlan): CometCoverageStats = {
+ val stats = new CometCoverageStats()
+ val explainInfo = new ExtendedExplainInfo()
+ explainInfo.generateTreeString(
+ CometExplainInfo.getActualPlan(plan),
+ 0,
+ Seq(),
+ 0,
+ new StringBuilder(),
+ stats)
+ stats
+ }
+}
+
object CometExplainInfo {
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")
diff --git a/spark/src/main/scala/org/apache/spark/CometSource.scala
b/spark/src/main/scala/org/apache/spark/CometSource.scala
new file mode 100644
index 000000000..95d752361
--- /dev/null
+++ b/spark/src/main/scala/org/apache/spark/CometSource.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.metrics.source.Source
+
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
+
+import org.apache.comet.CometCoverageStats
+
+/**
+ * Exposes following metrics (hooked from CometCoverageStats)
+ * - operators.native: Total operators executed natively
+ * - operators.spark: Total operators that fell back to Spark
+ * - queries.planned: Total queries processed
+ * - transitions: Total Spark-to-Comet transitions
+ * - acceleration.ratio: native / (native + spark)
+ */
+object CometSource extends Source {
+ override val sourceName = "comet"
+ override val metricRegistry = new MetricRegistry()
+
+ val NATIVE_OPERATORS: Counter =
+ metricRegistry.counter(MetricRegistry.name("operators", "native"))
+ val SPARK_OPERATORS: Counter =
metricRegistry.counter(MetricRegistry.name("operators", "spark"))
+ val QUERIES_PLANNED: Counter =
metricRegistry.counter(MetricRegistry.name("queries", "planned"))
+ val TRANSITIONS: Counter =
metricRegistry.counter(MetricRegistry.name("transitions"))
+
+ metricRegistry.register(
+ MetricRegistry.name("acceleration", "ratio"),
+ new Gauge[Double] {
+ override def getValue: Double = {
+ val native = NATIVE_OPERATORS.getCount
+ val total = native + SPARK_OPERATORS.getCount
+ if (total > 0) native.toDouble / total else 0.0
+ }
+ })
+
+ def recordStats(stats: CometCoverageStats): Unit = {
+ NATIVE_OPERATORS.inc(stats.cometOperators)
+ SPARK_OPERATORS.inc(stats.sparkOperators)
+ TRANSITIONS.inc(stats.transitions)
+ QUERIES_PLANNED.inc()
+ }
+}
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index 2529f08cf..7290ab436 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY,
EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
import org.apache.spark.sql.internal.StaticSQLConf
-import org.apache.comet.CometConf.COMET_ONHEAP_ENABLED
+import org.apache.comet.CometConf.{COMET_METRICS_ENABLED, COMET_ONHEAP_ENABLED}
import org.apache.comet.CometSparkSessionExtensions
/**
@@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging
with ShimCometDriverPl
// register CometSparkSessionExtensions if it isn't already registered
CometDriverPlugin.registerCometSessionExtension(sc.conf)
+ // Register Comet metrics
+ CometDriverPlugin.registerCometMetrics(sc)
+
if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) {
val execMemOverhead = if
(sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) {
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
@@ -101,6 +104,32 @@ class CometDriverPlugin extends DriverPlugin with Logging
with ShimCometDriverPl
}
object CometDriverPlugin extends Logging {
+ def registerCometMetrics(sc: SparkContext): Unit = {
+ if (sc.getConf.getBoolean(
+ COMET_METRICS_ENABLED.key,
+ COMET_METRICS_ENABLED.defaultValue.get)) {
+ sc.env.metricsSystem.registerSource(CometSource)
+
+ val listenerKey = "spark.sql.queryExecutionListeners"
+ val listenerClass = "org.apache.comet.CometMetricsListener"
+ val listeners = sc.conf.get(listenerKey, "")
+ if (listeners.isEmpty) {
+ logInfo(s"Setting $listenerKey=$listenerClass")
+ sc.conf.set(listenerKey, listenerClass)
+ } else {
+ val currentListeners = listeners.split(",").map(_.trim)
+ if (!currentListeners.contains(listenerClass)) {
+ val newValue = s"$listeners,$listenerClass"
+ logInfo(s"Setting $listenerKey=$newValue")
+ sc.conf.set(listenerKey, newValue)
+ }
+ }
+ } else {
+ logInfo(
+ "Comet metrics reporting is disabled. Set
spark.comet.metrics.enabled=true to enable.")
+ }
+ }
+
def registerCometSessionExtension(conf: SparkConf): Unit = {
val extensionKey = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key
val extensionClass = classOf[CometSparkSessionExtensions].getName
diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
index c493b22f7..675ff0171 100644
--- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
@@ -19,7 +19,9 @@
package org.apache.spark
-import org.apache.spark.sql.CometTestBase
+import java.io.File
+
+import org.apache.spark.sql.{CometTestBase, SaveMode}
import org.apache.spark.sql.internal.StaticSQLConf
class CometPluginsSuite extends CometTestBase {
@@ -32,6 +34,7 @@ class CometPluginsSuite extends CometTestBase {
conf.set("spark.comet.enabled", "true")
conf.set("spark.comet.exec.enabled", "true")
conf.set("spark.comet.exec.onHeap.enabled", "true")
+ conf.set("spark.comet.metrics.enabled", "true")
conf
}
@@ -77,6 +80,43 @@ class CometPluginsSuite extends CometTestBase {
}
}
+ test("CometSource metrics are recorded") {
+ val nativeBefore = CometSource.NATIVE_OPERATORS.getCount
+ val queriesBefore = CometSource.QUERIES_PLANNED.getCount
+
+ withTempPath { dir =>
+ val path = new File(dir, "test.parquet").toString
+ spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)
+ spark.read.parquet(path).filter("id > 500").collect()
+ }
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ assert(
+ CometSource.QUERIES_PLANNED.getCount > queriesBefore,
+ "queries.planned should increment after query")
+ assert(
+ CometSource.NATIVE_OPERATORS.getCount > nativeBefore,
+ "operators.native should increment for native execution")
+ }
+
+ test("metrics not double counted with AQE") {
+ withSQLConf("spark.sql.adaptive.enabled" -> "true") {
+ withTempPath { dir =>
+ val path = new File(dir, "test.parquet").toString
+
spark.range(10000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)
+
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ val queriesBefore = CometSource.QUERIES_PLANNED.getCount
+ spark.read.parquet(path).filter("id > 100").collect()
+ spark.read.parquet(path).filter("id > 200").collect()
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ val queriesAfter = CometSource.QUERIES_PLANNED.getCount
+ assert(
+ queriesAfter == queriesBefore + 2,
+ s"Expected 2 queries, got ${queriesAfter - queriesBefore}")
+ }
+ }
+ }
+
test("Default Comet memory overhead") {
val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
val execMemOverhead2 =
spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]