EmilyMatt commented on code in PR #1390:
URL: https://github.com/apache/datafusion-comet/pull/1390#discussion_r1963121789
##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -216,6 +216,17 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("initCap", defaultValue = false)
+ val COMET_EXEC_AGGREGATE_ENFORCE_RESULTS: ConfigEntry[Boolean] =
+ conf("spark.comet.exec.aggregate.enforceResults")
+ .doc("Whether to enforce converting results in the Final stage of a
HashAggregate, " +
+ "When enabled, Final-mode hashAggregates will not be converted to
Comet, this can cause " +
+ "issues when native shuffle is enabled. " +
+ "If this is disabled, unsupported result expressions will be " +
+ "separated into a ProjectExec to allow HashAggregate to complete
natively. " +
+ "This is disabled by default.")
Review Comment:
@kazuyukitanimura
Apologies, I believe I intended to say when native shuffle is disabled(or
more correctly - when columnar shuffle is used)^
Essentially whenever the Schema would come from Spark and not from the
data/batch itself, we will start having problems.
Looking at the tpcds tests, we can see an example:
`HashAggregate [cp_catalog_page_id,sum,sum,isEmpty,sum,isEmpty]
[sum(UnscaledValue(cs_ext_sales_price)),sum(coalesce(cast(cr_return_amount as
decimal(12,2)), 0.00)),sum((cs_net_profit - coalesce(cast(cr_net_loss as
decimal(12,2)),
0.00))),sales,returns,profit,channel,id,sum,sum,isEmpty,sum,isEmpty]
CometColumnarToRow
InputAdapter
CometExchange [cp_catalog_page_id] #10
CometHashAggregate `
See how the Partial HA is a Comet one, and the Final one is a HashAggregate
with a conversion,
This is the current implementation, and works for both shuffles, as comet
still produces the data Spark expects.
However, the moment we'll output something Spark does not expect(like
having the partial results of a CollectSet, the columnar shuffle will crash,
due to the mismatch in data types)
The issue with the shuffle can probably be circumvented by shuffling the
aggregate buffer as a binary column regardless of its datatype, then reforming
it in the Final aggregate, that way both shuffles will function.
However, as discussed in the issue, this will only delay the inevitable in
cases such as an unsupported ResultExpression, as a CometColumnarToRow will not
recreate the expected data type a regular HashAggregate will expect, and there
is no other path forward but to let the Comet aggregate expressions run their
course.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]