brancz opened a new issue, #16011:
URL: https://github.com/apache/datafusion/issues/16011
### Is your feature request related to a problem or challenge?
It's currently not possible to aggregate by `RunArrays`.
<details>
<summary>Example code grouping by a `RunArray`</summary>
```rust
use arrow::array::{Array, Int32Array, RunArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, Int32Type};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), datafusion::error::DataFusionError> {
// Create a new DataFusion context
let ctx = SessionContext::new();
// First, let's create our data
// We'll have temperature readings where multiple consecutive readings
come from the same sensor
// Temperature values (not run-length encoded)
// This represents all temperature readings in sequence
let temperatures = Int32Array::from(vec![
22, 23, 24, 25, 22, 21, 20, 21, 22, 23, 24, 25, 26, 27, 28
]);
// Create the string values for sensor IDs
let sensor_id_values = StringViewArray::from(vec!["sensor_A",
"sensor_B", "sensor_C", "sensor_D"]);
// Create the run ends array (positions where each run ends)
let sensor_id_run_ends = Int32Array::from(vec![4, 7, 12, 15]);
// Create RunArray for sensor IDs with Int32Type as run end type
let sensor_id_ree = RunArray::<Int32Type>::try_new(&sensor_id_run_ends,
&sensor_id_values)
.expect("Failed to create sensor ID RunArray");
// Get the exact data type of the RunArray for the schema
let sensor_id_type = sensor_id_ree.data_type().clone();
// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("sensor_id", sensor_id_type, false),
Field::new("temperature", DataType::Int32, false),
]));
// Create record batch
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(sensor_id_ree), Arc::new(temperatures)],
)?;
// Register as a table
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
ctx.register_table("sensor_readings", Arc::new(provider))?;
// Run aggregation query
// Group by sensor ID and calculate statistics
let sql = "
SELECT
sensor_id,
AVG(temperature) AS avg_temp,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
COUNT(temperature) AS reading_count
FROM sensor_readings
GROUP BY sensor_id
ORDER BY sensor_id
";
let results = ctx.sql(sql).await?.collect().await?;
for batch in results {
println!("{:?}", batch);
}
Ok(())
}
```
</details>
### Describe the solution you'd like
To make it happen there are a variety of things that need to happen:
- [ ] Support for `RunArray`s in `arrow-select`'s `concat`.
- [ ] Support for `RunArray`s in `arrow-row`
- [ ] Support for `RunArray`s in `arrow-data`'s `build_extend_nulls` and
`build_extend`
- [ ] Support for grouping by `RunArray`s in DataFusion (mainly in
`datafusion/common/src/cast.rs` and
`datafusion/physical-plan/src/aggregates/group_values/row.rs` to turn groups
into `RunArray`s after aggregating and `datafusion/common/src/hash_utils.rs` to
implement the actual hashing handling)
### Describe alternatives you've considered
We're currently expanding REE arrays before pushing them through DataFusion
query plans, but being able to do it with zero-copy would be much better for
performance.
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]