viirya opened a new issue, #262: URL: https://github.com/apache/arrow-datafusion-comet/issues/262
### Describe the bug
Related to #250.
When enabling columnar shuffle, a partial Spark aggregation could be
upstream of a Comet final aggregation, because columnar shuffle could be the
scan source of Comet native operator.
But there is query failure on `SQLQuerySuite`'s `SPARK-3176 Added Parser of
SQL LAST()` test.
```
[info] == Physical Plan ==
[info] AdaptiveSparkPlan isFinalPlan=true
[info] +- == Final Plan ==
[info] *(2) ColumnarToRow
[info] +- CometHashAggregate [last#4396, valueSet#4397], Final,
[last(n#93, false)]
[info] +- ShuffleQueryStage 0
[info] +- CometColumnarExchange SinglePartition,
ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10390]
[info] +- RowToColumnar
[info] +- *(1) HashAggregate(keys=[],
functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])
[info] +- *(1) SerializeFromObject
[knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]
[info] +- Scan[obj#92]
[info] +- == Initial Plan ==
[info] CometHashAggregate [last#4396, valueSet#4397], Final,
[last(n#93, false)]
[info] +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS,
CometColumnarShuffle, [plan_id=10279]
[info] +- HashAggregate(keys=[], functions=[partial_last(n#93,
false)], output=[last#4396, valueSet#4397])
[info] +- SerializeFromObject
[knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]
[info] +- Scan[obj#92]
[info]
[info] == Results ==
[info]
[info] == Results ==
[info] !== Correct Answer - 1 == == Spark Answer - 1 ==
[info] !struct<> struct<last(n):int>
[info] ![4] [2] (QueryTest.scala:243)
```
It is because the aggregation attributes (i.e., `state`) of Spark `Last`
aggregation expression are different to DataFusion's `Last` aggregation
expression.
Spark `Last` has:
```scala
override lazy val aggBufferAttributes: Seq[AttributeReference] = last ::
valueSet :: Nil
```
DataFusion's `Last`:
```rust
fn state_fields(&self) -> Result<Vec<Field>> {
let mut fields = vec![Field::new(
format_state_name(&self.name, "last_value"),
self.input_data_type.clone(),
true,
)];
fields.extend(ordering_fields(
&self.ordering_req,
&self.order_by_data_types,
));
fields.push(Field::new(
format_state_name(&self.name, "is_set"),
DataType::Boolean,
true,
));
Ok(fields)
}
```
I think this kind of issues will be more and more. Re-implementing such
aggregation expressions in Comet seems taking too much cost on developing and
maintaining. These cases are only happened if partial Spark aggregation cannot
be transformed to Comet. In the failed query, it is because its upstream is not
Comet plan. I think these cases are not what we care about mostly.
I think we should only have Comet final aggregation if the partial
aggregation is Comet. It will simplify the things.
### Steps to reproduce
_No response_
### Expected behavior
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
