This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new af27b3732 chore: Refactor aggregate serde to use map (#2055) af27b3732 is described below commit af27b37329c36c6c13626546166d2e8bd22b2de7 Author: Andy Grove <agr...@apache.org> AuthorDate: Fri Aug 1 08:44:22 2025 -0600 chore: Refactor aggregate serde to use map (#2055) --- .../org/apache/comet/serde/QueryPlanSerde.scala | 53 ++++++++++++---------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 0374f87ac..45ca67ed7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -131,6 +131,29 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[SparkPartitionID] -> CometSparkPartitionId, classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId) + /** + * Mapping of Spark aggregate expression class to Comet expression handler. + */ + private val aggrSerdeMap: Map[Class[_], CometAggregateExpressionSerde] = Map( + classOf[Sum] -> CometSum, + classOf[Average] -> CometAverage, + classOf[Count] -> CometCount, + classOf[Min] -> CometMin, + classOf[Max] -> CometMax, + classOf[First] -> CometFirst, + classOf[Last] -> CometLast, + classOf[BitAndAgg] -> CometBitAndAgg, + classOf[BitOrAgg] -> CometBitOrAgg, + classOf[BitXorAgg] -> CometBitXOrAgg, + classOf[CovSample] -> CometCovSample, + classOf[CovPopulation] -> CometCovPopulation, + classOf[VarianceSamp] -> CometVarianceSamp, + classOf[VariancePop] -> CometVariancePop, + classOf[StddevSamp] -> CometStddevSamp, + classOf[StddevPop] -> CometStddevPop, + classOf[Corr] -> CometCorr, + classOf[BloomFilterAggregate] -> CometBloomFilterAggregate) + def emitWarning(reason: String): Unit = { logWarning(s"Comet native execution is disabled due to: $reason") } @@ -436,33 +459,17 @@ object QueryPlanSerde extends Logging with CometExprShim { return None } - val cometExpr: CometAggregateExpressionSerde = aggExpr.aggregateFunction match { - case _: Sum => CometSum - case _: Average => CometAverage - case _: Count => CometCount - case _: Min => CometMin - case _: Max => CometMax - case _: First => CometFirst - case _: Last => CometLast - case _: BitAndAgg => CometBitAndAgg - case _: BitOrAgg => CometBitOrAgg - case _: BitXorAgg => CometBitXOrAgg - case _: CovSample => CometCovSample - case _: CovPopulation => CometCovPopulation - case _: VarianceSamp => CometVarianceSamp - case _: VariancePop => CometVariancePop - case _: StddevSamp => CometStddevSamp - case _: StddevPop => CometStddevPop - case _: Corr => CometCorr - case _: BloomFilterAggregate => CometBloomFilterAggregate - case fn => + val fn = aggExpr.aggregateFunction + val cometExpr = aggrSerdeMap.get(fn.getClass) + cometExpr match { + case Some(handler) => + handler.convert(aggExpr, fn, inputs, binding, conf) + case _ => val msg = s"unsupported Spark aggregate function: ${fn.prettyName}" emitWarning(msg) withInfo(aggExpr, msg, fn.children: _*) - return None - + None } - cometExpr.convert(aggExpr, aggExpr.aggregateFunction, inputs, binding, conf) } def evalModeToProto(evalMode: CometEvalMode.Value): ExprOuterClass.EvalMode = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org