This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new bef775999 feat: expose comet metrics through Sparks external 
monitoring system (#3708)
bef775999 is described below

commit bef775999f078e967844f5a56d250d0e11c8bf52
Author: Bhargava Vadlamani <[email protected]>
AuthorDate: Sun Mar 29 21:16:52 2026 -0700

    feat: expose comet metrics through Sparks external monitoring system (#3708)
---
 .../main/scala/org/apache/comet/CometConf.scala    | 14 +++++
 dev/ensure-jars-have-correct-contents.sh           |  1 +
 .../org/apache/comet/CometMetricsListener.scala    | 38 +++++++++++++
 .../org/apache/comet/ExtendedExplainInfo.scala     | 19 +++++++
 .../main/scala/org/apache/spark/CometSource.scala  | 62 ++++++++++++++++++++++
 .../src/main/scala/org/apache/spark/Plugins.scala  | 31 ++++++++++-
 .../scala/org/apache/spark/CometPluginsSuite.scala | 42 ++++++++++++++-
 7 files changed, 205 insertions(+), 2 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index ab9d87e77..046ccf0b1 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -798,6 +798,20 @@ object CometConf extends ShimCometConf {
       .longConf
       .createWithDefault(3000L)
 
+  val COMET_METRICS_ENABLED: ConfigEntry[Boolean] =
+    conf("spark.comet.metrics.enabled")
+      .category(CATEGORY_EXEC)
+      .doc(
+        "Whether to enable Comet metrics reporting through Spark's external 
monitoring system. " +
+          "When enabled, Comet exposes metrics such as native operators, Spark 
operators, " +
+          "queries planned, transitions, and acceleration ratio. These metrics 
can be " +
+          "visualized through tools like Grafana when a metrics sink (e.g., 
Prometheus) is " +
+          "configured. Disabled by default because Spark plan traversal adds 
overhead and " +
+          "metrics require a sink to be useful. " +
+          "This config must be set before the SparkSession is created to take 
effect.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COMET_LIBHDFS_SCHEMES_KEY = "fs.comet.libhdfs.schemes"
 
   val COMET_LIBHDFS_SCHEMES: OptionalConfigEntry[String] =
diff --git a/dev/ensure-jars-have-correct-contents.sh 
b/dev/ensure-jars-have-correct-contents.sh
index 570aeabb2..084936475 100755
--- a/dev/ensure-jars-have-correct-contents.sh
+++ b/dev/ensure-jars-have-correct-contents.sh
@@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$"
 allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$"
 allowed_expr+="|^org/apache/spark/CometPlugin.class$"
 allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$"
+allowed_expr+="|^org/apache/spark/CometSource.*$"
 allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$"
 allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$"
 allowed_expr+="|^scala-collection-compat.properties$"
diff --git a/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala 
b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala
new file mode 100644
index 000000000..96c03a99d
--- /dev/null
+++ b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import org.apache.spark.CometSource
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+class CometMetricsListener extends QueryExecutionListener {
+
+  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+    val stats = CometCoverageStats.forPlan(qe.executedPlan)
+    CometSource.recordStats(stats)
+  }
+
+  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
+    // Record stats even on failure since the query was still planned
+    val stats = CometCoverageStats.forPlan(qe.executedPlan)
+    CometSource.recordStats(stats)
+  }
+}
diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala 
b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
index f47428e80..d30a1fe78 100644
--- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
+++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
@@ -192,6 +192,25 @@ class CometCoverageStats {
   }
 }
 
+object CometCoverageStats {
+
+  /**
+   * Compute coverage stats for a plan without generating explain string.
+   */
+  def forPlan(plan: SparkPlan): CometCoverageStats = {
+    val stats = new CometCoverageStats()
+    val explainInfo = new ExtendedExplainInfo()
+    explainInfo.generateTreeString(
+      CometExplainInfo.getActualPlan(plan),
+      0,
+      Seq(),
+      0,
+      new StringBuilder(),
+      stats)
+    stats
+  }
+}
+
 object CometExplainInfo {
   val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")
 
diff --git a/spark/src/main/scala/org/apache/spark/CometSource.scala 
b/spark/src/main/scala/org/apache/spark/CometSource.scala
new file mode 100644
index 000000000..95d752361
--- /dev/null
+++ b/spark/src/main/scala/org/apache/spark/CometSource.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.metrics.source.Source
+
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
+
+import org.apache.comet.CometCoverageStats
+
+/**
+ * Exposes following metrics (hooked from CometCoverageStats)
+ *   - operators.native: Total operators executed natively
+ *   - operators.spark: Total operators that fell back to Spark
+ *   - queries.planned: Total queries processed
+ *   - transitions: Total Spark-to-Comet transitions
+ *   - acceleration.ratio: native / (native + spark)
+ */
+object CometSource extends Source {
+  override val sourceName = "comet"
+  override val metricRegistry = new MetricRegistry()
+
+  val NATIVE_OPERATORS: Counter =
+    metricRegistry.counter(MetricRegistry.name("operators", "native"))
+  val SPARK_OPERATORS: Counter = 
metricRegistry.counter(MetricRegistry.name("operators", "spark"))
+  val QUERIES_PLANNED: Counter = 
metricRegistry.counter(MetricRegistry.name("queries", "planned"))
+  val TRANSITIONS: Counter = 
metricRegistry.counter(MetricRegistry.name("transitions"))
+
+  metricRegistry.register(
+    MetricRegistry.name("acceleration", "ratio"),
+    new Gauge[Double] {
+      override def getValue: Double = {
+        val native = NATIVE_OPERATORS.getCount
+        val total = native + SPARK_OPERATORS.getCount
+        if (total > 0) native.toDouble / total else 0.0
+      }
+    })
+
+  def recordStats(stats: CometCoverageStats): Unit = {
+    NATIVE_OPERATORS.inc(stats.cometOperators)
+    SPARK_OPERATORS.inc(stats.sparkOperators)
+    TRANSITIONS.inc(stats.transitions)
+    QUERIES_PLANNED.inc()
+  }
+}
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala 
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index 2529f08cf..7290ab436 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{EXECUTOR_MEMORY, 
EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
 import org.apache.spark.sql.internal.StaticSQLConf
 
-import org.apache.comet.CometConf.COMET_ONHEAP_ENABLED
+import org.apache.comet.CometConf.{COMET_METRICS_ENABLED, COMET_ONHEAP_ENABLED}
 import org.apache.comet.CometSparkSessionExtensions
 
 /**
@@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging 
with ShimCometDriverPl
     // register CometSparkSessionExtensions if it isn't already registered
     CometDriverPlugin.registerCometSessionExtension(sc.conf)
 
+    // Register Comet metrics
+    CometDriverPlugin.registerCometMetrics(sc)
+
     if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) {
       val execMemOverhead = if 
(sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) {
         sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
@@ -101,6 +104,32 @@ class CometDriverPlugin extends DriverPlugin with Logging 
with ShimCometDriverPl
 }
 
 object CometDriverPlugin extends Logging {
+  def registerCometMetrics(sc: SparkContext): Unit = {
+    if (sc.getConf.getBoolean(
+        COMET_METRICS_ENABLED.key,
+        COMET_METRICS_ENABLED.defaultValue.get)) {
+      sc.env.metricsSystem.registerSource(CometSource)
+
+      val listenerKey = "spark.sql.queryExecutionListeners"
+      val listenerClass = "org.apache.comet.CometMetricsListener"
+      val listeners = sc.conf.get(listenerKey, "")
+      if (listeners.isEmpty) {
+        logInfo(s"Setting $listenerKey=$listenerClass")
+        sc.conf.set(listenerKey, listenerClass)
+      } else {
+        val currentListeners = listeners.split(",").map(_.trim)
+        if (!currentListeners.contains(listenerClass)) {
+          val newValue = s"$listeners,$listenerClass"
+          logInfo(s"Setting $listenerKey=$newValue")
+          sc.conf.set(listenerKey, newValue)
+        }
+      }
+    } else {
+      logInfo(
+        "Comet metrics reporting is disabled. Set 
spark.comet.metrics.enabled=true to enable.")
+    }
+  }
+
   def registerCometSessionExtension(conf: SparkConf): Unit = {
     val extensionKey = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key
     val extensionClass = classOf[CometSparkSessionExtensions].getName
diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala 
b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
index c493b22f7..675ff0171 100644
--- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
@@ -19,7 +19,9 @@
 
 package org.apache.spark
 
-import org.apache.spark.sql.CometTestBase
+import java.io.File
+
+import org.apache.spark.sql.{CometTestBase, SaveMode}
 import org.apache.spark.sql.internal.StaticSQLConf
 
 class CometPluginsSuite extends CometTestBase {
@@ -32,6 +34,7 @@ class CometPluginsSuite extends CometTestBase {
     conf.set("spark.comet.enabled", "true")
     conf.set("spark.comet.exec.enabled", "true")
     conf.set("spark.comet.exec.onHeap.enabled", "true")
+    conf.set("spark.comet.metrics.enabled", "true")
     conf
   }
 
@@ -77,6 +80,43 @@ class CometPluginsSuite extends CometTestBase {
     }
   }
 
+  test("CometSource metrics are recorded") {
+    val nativeBefore = CometSource.NATIVE_OPERATORS.getCount
+    val queriesBefore = CometSource.QUERIES_PLANNED.getCount
+
+    withTempPath { dir =>
+      val path = new File(dir, "test.parquet").toString
+      spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)
+      spark.read.parquet(path).filter("id > 500").collect()
+    }
+    spark.sparkContext.listenerBus.waitUntilEmpty()
+    assert(
+      CometSource.QUERIES_PLANNED.getCount > queriesBefore,
+      "queries.planned should increment after query")
+    assert(
+      CometSource.NATIVE_OPERATORS.getCount > nativeBefore,
+      "operators.native should increment for native execution")
+  }
+
+  test("metrics not double counted with AQE") {
+    withSQLConf("spark.sql.adaptive.enabled" -> "true") {
+      withTempPath { dir =>
+        val path = new File(dir, "test.parquet").toString
+        
spark.range(10000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)
+
+        spark.sparkContext.listenerBus.waitUntilEmpty()
+        val queriesBefore = CometSource.QUERIES_PLANNED.getCount
+        spark.read.parquet(path).filter("id > 100").collect()
+        spark.read.parquet(path).filter("id > 200").collect()
+        spark.sparkContext.listenerBus.waitUntilEmpty()
+        val queriesAfter = CometSource.QUERIES_PLANNED.getCount
+        assert(
+          queriesAfter == queriesBefore + 2,
+          s"Expected 2 queries, got ${queriesAfter - queriesBefore}")
+      }
+    }
+  }
+
   test("Default Comet memory overhead") {
     val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
     val execMemOverhead2 = 
spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to