This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new af27b3732 chore: Refactor aggregate serde to use map (#2055)
af27b3732 is described below

commit af27b37329c36c6c13626546166d2e8bd22b2de7
Author: Andy Grove <agr...@apache.org>
AuthorDate: Fri Aug 1 08:44:22 2025 -0600

    chore: Refactor aggregate serde to use map (#2055)
---
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 53 ++++++++++++----------
 1 file changed, 30 insertions(+), 23 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 0374f87ac..45ca67ed7 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -131,6 +131,29 @@ object QueryPlanSerde extends Logging with CometExprShim {
     classOf[SparkPartitionID] -> CometSparkPartitionId,
     classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId)
 
+  /**
+   * Mapping of Spark aggregate expression class to Comet expression handler.
+   */
+  private val aggrSerdeMap: Map[Class[_], CometAggregateExpressionSerde] = Map(
+    classOf[Sum] -> CometSum,
+    classOf[Average] -> CometAverage,
+    classOf[Count] -> CometCount,
+    classOf[Min] -> CometMin,
+    classOf[Max] -> CometMax,
+    classOf[First] -> CometFirst,
+    classOf[Last] -> CometLast,
+    classOf[BitAndAgg] -> CometBitAndAgg,
+    classOf[BitOrAgg] -> CometBitOrAgg,
+    classOf[BitXorAgg] -> CometBitXOrAgg,
+    classOf[CovSample] -> CometCovSample,
+    classOf[CovPopulation] -> CometCovPopulation,
+    classOf[VarianceSamp] -> CometVarianceSamp,
+    classOf[VariancePop] -> CometVariancePop,
+    classOf[StddevSamp] -> CometStddevSamp,
+    classOf[StddevPop] -> CometStddevPop,
+    classOf[Corr] -> CometCorr,
+    classOf[BloomFilterAggregate] -> CometBloomFilterAggregate)
+
   def emitWarning(reason: String): Unit = {
     logWarning(s"Comet native execution is disabled due to: $reason")
   }
@@ -436,33 +459,17 @@ object QueryPlanSerde extends Logging with CometExprShim {
       return None
     }
 
-    val cometExpr: CometAggregateExpressionSerde = aggExpr.aggregateFunction 
match {
-      case _: Sum => CometSum
-      case _: Average => CometAverage
-      case _: Count => CometCount
-      case _: Min => CometMin
-      case _: Max => CometMax
-      case _: First => CometFirst
-      case _: Last => CometLast
-      case _: BitAndAgg => CometBitAndAgg
-      case _: BitOrAgg => CometBitOrAgg
-      case _: BitXorAgg => CometBitXOrAgg
-      case _: CovSample => CometCovSample
-      case _: CovPopulation => CometCovPopulation
-      case _: VarianceSamp => CometVarianceSamp
-      case _: VariancePop => CometVariancePop
-      case _: StddevSamp => CometStddevSamp
-      case _: StddevPop => CometStddevPop
-      case _: Corr => CometCorr
-      case _: BloomFilterAggregate => CometBloomFilterAggregate
-      case fn =>
+    val fn = aggExpr.aggregateFunction
+    val cometExpr = aggrSerdeMap.get(fn.getClass)
+    cometExpr match {
+      case Some(handler) =>
+        handler.convert(aggExpr, fn, inputs, binding, conf)
+      case _ =>
         val msg = s"unsupported Spark aggregate function: ${fn.prettyName}"
         emitWarning(msg)
         withInfo(aggExpr, msg, fn.children: _*)
-        return None
-
+        None
     }
-    cometExpr.convert(aggExpr, aggExpr.aggregateFunction, inputs, binding, 
conf)
   }
 
   def evalModeToProto(evalMode: CometEvalMode.Value): ExprOuterClass.EvalMode 
= {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to