Samyak2 opened a new issue, #20491:
URL: https://github.com/apache/datafusion/issues/20491
### Describe the bug
Aggregations over `Utf8View`/`StringViewArray` consume significantly more
memory as tracked by the memory pool, compared to `Utf8`/`StringArray`. The
actual memory usage is within 10% of each other, but the reported memory usage
can be more than 2x higher.
### To Reproduce
Minimal reproducer:
```rust
use std::sync::Arc;
use arrow::array::{ArrayRef, RecordBatch, StringArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::execution::disk_manager::{DiskManagerBuilder,
DiskManagerMode};
use datafusion::execution::memory_pool::MemoryPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::expressions::{Column, Literal};
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::test::TestMemoryExec;
use datafusion::physical_plan::{ExecutionPlan, Partitioning, collect};
use datafusion::prelude::{SessionConfig, SessionContext};
// --- Knobs ---
const USE_UTF8VIEW: bool = true;
const NUM_ROWS: usize = 2_000_000;
const DATA_PARTITIONS: usize = 100;
const HASH_PARTITIONS: usize = 100;
const POOL_SIZE: usize = 1024 * 1024 * 1024; // 1GB
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let (schema, batch) = make_data();
println!(
"input batch: {} rows, {} bytes",
batch.num_rows(),
batch.get_array_memory_size()
);
let plan = build_plan(schema, batch)?;
println!("plan:\n{}", displayable(plan.as_ref()).indent(true));
let (ctx, pool) = make_session_ctx()?;
println!("pool reserved before collect: {} bytes", pool.reserved());
let results = collect(plan, ctx.task_ctx()).await?;
println!("pool reserved after collect: {} bytes", pool.reserved());
let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
let total_bytes: usize = results.iter().map(|b|
b.get_array_memory_size()).sum();
println!(
"{} batches, {total_rows} rows, {total_bytes} bytes in output",
results.len()
);
Ok(())
}
fn make_data() -> (SchemaRef, RecordBatch) {
let values: Vec<String> = (0..NUM_ROWS).map(|i|
format!("value_{i}")).collect();
let string_refs: Vec<&str> = values.iter().map(|s| s.as_str()).collect();
let (dt, col): (DataType, ArrayRef) = if USE_UTF8VIEW {
(
DataType::Utf8View,
Arc::new(StringViewArray::from(string_refs)),
)
} else {
(DataType::Utf8, Arc::new(StringArray::from(string_refs)))
};
let schema = Arc::new(Schema::new(vec![Field::new("a", dt, false)]));
let batch = RecordBatch::try_new(Arc::clone(&schema),
vec![col]).unwrap();
(schema, batch)
}
fn build_plan(
schema: SchemaRef,
batch: RecordBatch,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let rows_per_partition = NUM_ROWS / DATA_PARTITIONS;
let partitions: Vec<Vec<RecordBatch>> = (0..DATA_PARTITIONS)
.map(|i| vec![batch.slice(i * rows_per_partition,
rows_per_partition)])
.collect();
let plan = TestMemoryExec::try_new_exec(&partitions,
Arc::clone(&schema), None)?;
// GROUP BY "a", COUNT(*)
let group_by =
PhysicalGroupBy::new_single(vec![(Arc::new(Column::new("a", 0)),
"a".to_string())]);
let count_expr = Arc::new(
AggregateExprBuilder::new(count_udaf(),
vec![Arc::new(Literal::new(1i8.into()))])
.schema(Arc::clone(&schema))
.alias("count")
.build()?,
);
let plan = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
vec![count_expr.clone()],
vec![None],
plan,
schema,
)?);
let partial_schema = plan.schema();
let plan = Arc::new(RepartitionExec::try_new(
plan,
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))],
HASH_PARTITIONS),
)?);
let plan = Arc::new(AggregateExec::try_new(
AggregateMode::FinalPartitioned,
group_by,
vec![count_expr],
vec![None],
plan,
partial_schema,
)?);
let plan: Arc<dyn ExecutionPlan> =
Arc::new(CoalescePartitionsExec::new(plan));
Ok(plan)
}
fn make_session_ctx() -> Result<(SessionContext, Arc<dyn MemoryPool>),
Box<dyn std::error::Error>> {
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(POOL_SIZE, 1.0)
.with_disk_manager_builder(
DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled),
)
.build_arc()?;
let pool: Arc<dyn MemoryPool> = Arc::clone(&runtime.memory_pool);
let config = SessionConfig::new()
.set_str(
"datafusion.execution.skip_partial_aggregation_probe_ratio_threshold",
"1.0",
)
.set_str(
"datafusion.execution.skip_partial_aggregation_probe_rows_threshold",
"1000000000",
);
let ctx = SessionContext::new_with_config_rt(config, runtime);
Ok((ctx, pool))
}
```
Here's the `Cargo.toml` for reference:
```
[package]
name = "datafusion-scratch"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.0", features = ["rt-multi-thread"] }
datafusion = { path = "../datafusion/datafusion/core" }
arrow = "57.3.0"
rand = "0.9.2"
env_logger = "0.11.9"
```
(the path in `datafusion` is the latest main
`9660c9874315354ff22245699785f5f77841be80`, but any v52 should have the same
behavior).
This fails with:
```
input batch: 2000000 rows, 46663800 bytes
plan:
CoalescePartitionsExec
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count]
RepartitionExec: partitioning=Hash([a@0], 100), input_partitions=100
AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
DataSourceExec: partitions=100, partition_sizes=[1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
pool reserved before collect: 0 bytes
Error: Shared(ResourcesExhausted("Memory Exhausted while SpillPool
(DiskManager is disabled)"))
```
On macos, I can get the actual memory usage using `/usr/bin/time -l`, which
gives me:
```
277151744 maximum resident set size
```
(around ~264MB for ~44MB of input data, but the memory tracker thinks we
have more than 1GB of memory allocations and fails the program).
The program runs after increasing the `POOL_SIZE: usize = 2048 * 1024 *
1024` and gives a `316456960 maximum resident set size` (~301MB). This is
lower than the Utf8/StringView case (below), but datafusion's tracker thinks
it's much higher!
### Expected behavior
At the top of the script, when set `USE_UTF8VIEW: bool = false;` to use
`Utf8/StringView` instead. The script runs just fine with this:
```
input batch: 2000000 rows, 41554616 bytes
plan:
CoalescePartitionsExec
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count]
RepartitionExec: partitioning=Hash([a@0], 100), input_partitions=100
AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
DataSourceExec: partitions=100, partition_sizes=[1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
pool reserved before collect: 0 bytes
pool reserved after collect: 0 bytes
```
`/usr/bin/time -l` gives me `375013376 maximum resident set size` (~357MB)
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]