Samyak2 opened a new issue, #20491:
URL: https://github.com/apache/datafusion/issues/20491

   ### Describe the bug
   
   Aggregations over `Utf8View`/`StringViewArray` consume significantly more 
memory as tracked by the memory pool, compared to `Utf8`/`StringArray`. The 
actual memory usage is within 10% of each other, but the reported memory usage 
can be more than 2x higher.
   
   ### To Reproduce
   
   Minimal reproducer:
   ```rust
   use std::sync::Arc;
   
   use arrow::array::{ArrayRef, RecordBatch, StringArray, StringViewArray};
   use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
   use datafusion::execution::disk_manager::{DiskManagerBuilder, 
DiskManagerMode};
   use datafusion::execution::memory_pool::MemoryPool;
   use datafusion::execution::runtime_env::RuntimeEnvBuilder;
   use datafusion::functions_aggregate::count::count_udaf;
   use datafusion::physical_expr::aggregate::AggregateExprBuilder;
   use datafusion::physical_expr::expressions::{Column, Literal};
   use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
   use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
   use datafusion::physical_plan::displayable;
   use datafusion::physical_plan::repartition::RepartitionExec;
   use datafusion::physical_plan::test::TestMemoryExec;
   use datafusion::physical_plan::{ExecutionPlan, Partitioning, collect};
   use datafusion::prelude::{SessionConfig, SessionContext};
   
   // --- Knobs ---
   const USE_UTF8VIEW: bool = true;
   const NUM_ROWS: usize = 2_000_000;
   const DATA_PARTITIONS: usize = 100;
   const HASH_PARTITIONS: usize = 100;
   const POOL_SIZE: usize = 1024 * 1024 * 1024; // 1GB
   
   #[tokio::main]
   async fn main() -> Result<(), Box<dyn std::error::Error>> {
       env_logger::init();
   
       let (schema, batch) = make_data();
       println!(
           "input batch: {} rows, {} bytes",
           batch.num_rows(),
           batch.get_array_memory_size()
       );
   
       let plan = build_plan(schema, batch)?;
       println!("plan:\n{}", displayable(plan.as_ref()).indent(true));
   
       let (ctx, pool) = make_session_ctx()?;
   
       println!("pool reserved before collect: {} bytes", pool.reserved());
       let results = collect(plan, ctx.task_ctx()).await?;
       println!("pool reserved after collect:  {} bytes", pool.reserved());
   
       let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
       let total_bytes: usize = results.iter().map(|b| 
b.get_array_memory_size()).sum();
       println!(
           "{} batches, {total_rows} rows, {total_bytes} bytes in output",
           results.len()
       );
   
       Ok(())
   }
   
   fn make_data() -> (SchemaRef, RecordBatch) {
       let values: Vec<String> = (0..NUM_ROWS).map(|i| 
format!("value_{i}")).collect();
       let string_refs: Vec<&str> = values.iter().map(|s| s.as_str()).collect();
   
       let (dt, col): (DataType, ArrayRef) = if USE_UTF8VIEW {
           (
               DataType::Utf8View,
               Arc::new(StringViewArray::from(string_refs)),
           )
       } else {
           (DataType::Utf8, Arc::new(StringArray::from(string_refs)))
       };
   
       let schema = Arc::new(Schema::new(vec![Field::new("a", dt, false)]));
       let batch = RecordBatch::try_new(Arc::clone(&schema), 
vec![col]).unwrap();
       (schema, batch)
   }
   
   fn build_plan(
       schema: SchemaRef,
       batch: RecordBatch,
   ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
       let rows_per_partition = NUM_ROWS / DATA_PARTITIONS;
       let partitions: Vec<Vec<RecordBatch>> = (0..DATA_PARTITIONS)
           .map(|i| vec![batch.slice(i * rows_per_partition, 
rows_per_partition)])
           .collect();
       let plan = TestMemoryExec::try_new_exec(&partitions, 
Arc::clone(&schema), None)?;
   
       // GROUP BY "a", COUNT(*)
       let group_by =
           PhysicalGroupBy::new_single(vec![(Arc::new(Column::new("a", 0)), 
"a".to_string())]);
       let count_expr = Arc::new(
           AggregateExprBuilder::new(count_udaf(), 
vec![Arc::new(Literal::new(1i8.into()))])
               .schema(Arc::clone(&schema))
               .alias("count")
               .build()?,
       );
   
       let plan = Arc::new(AggregateExec::try_new(
           AggregateMode::Partial,
           group_by.clone(),
           vec![count_expr.clone()],
           vec![None],
           plan,
           schema,
       )?);
       let partial_schema = plan.schema();
   
       let plan = Arc::new(RepartitionExec::try_new(
           plan,
           Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 
HASH_PARTITIONS),
       )?);
   
       let plan = Arc::new(AggregateExec::try_new(
           AggregateMode::FinalPartitioned,
           group_by,
           vec![count_expr],
           vec![None],
           plan,
           partial_schema,
       )?);
   
       let plan: Arc<dyn ExecutionPlan> = 
Arc::new(CoalescePartitionsExec::new(plan));
       Ok(plan)
   }
   
   fn make_session_ctx() -> Result<(SessionContext, Arc<dyn MemoryPool>), 
Box<dyn std::error::Error>> {
       let runtime = RuntimeEnvBuilder::new()
           .with_memory_limit(POOL_SIZE, 1.0)
           .with_disk_manager_builder(
               
DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled),
           )
           .build_arc()?;
       let pool: Arc<dyn MemoryPool> = Arc::clone(&runtime.memory_pool);
   
       let config = SessionConfig::new()
           .set_str(
               
"datafusion.execution.skip_partial_aggregation_probe_ratio_threshold",
               "1.0",
           )
           .set_str(
               
"datafusion.execution.skip_partial_aggregation_probe_rows_threshold",
               "1000000000",
           );
       let ctx = SessionContext::new_with_config_rt(config, runtime);
       Ok((ctx, pool))
   }
   ```
   
   Here's the `Cargo.toml` for reference:
   ```
   [package]
   name = "datafusion-scratch"
   version = "0.1.0"
   edition = "2024"
   
   [dependencies]
   tokio = { version = "1.0", features = ["rt-multi-thread"] }
   datafusion = { path = "../datafusion/datafusion/core" }
   arrow = "57.3.0"
   rand = "0.9.2"
   env_logger = "0.11.9"
   ```
   (the path in `datafusion` is the latest main 
`9660c9874315354ff22245699785f5f77841be80`, but any v52 should have the same 
behavior).
   
   This fails with:
   ```
   input batch: 2000000 rows, 46663800 bytes
   plan:
   CoalescePartitionsExec
     AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count]
       RepartitionExec: partitioning=Hash([a@0], 100), input_partitions=100
         AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
           DataSourceExec: partitions=100, partition_sizes=[1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
   
   pool reserved before collect: 0 bytes
   Error: Shared(ResourcesExhausted("Memory Exhausted while SpillPool 
(DiskManager is disabled)"))
   ```
   
   On macos, I can get the actual memory usage using `/usr/bin/time -l`, which 
gives me:
   ```
   277151744  maximum resident set size
   ```
   (around ~264MB for ~44MB of input data, but the memory tracker thinks we 
have more than 1GB of memory allocations and fails the program).
   
   The program runs after increasing the `POOL_SIZE: usize = 2048 * 1024 * 
1024` and gives a `316456960  maximum resident set size` (~301MB). This is 
lower than the Utf8/StringView case (below), but datafusion's tracker thinks 
it's much higher!
   
   ### Expected behavior
   
   At the top of the script, when set `USE_UTF8VIEW: bool = false;` to use 
`Utf8/StringView` instead. The script runs just fine with this:
   ```
   input batch: 2000000 rows, 41554616 bytes
   plan:
   CoalescePartitionsExec
     AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count]
       RepartitionExec: partitioning=Hash([a@0], 100), input_partitions=100
         AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
           DataSourceExec: partitions=100, partition_sizes=[1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
   
   pool reserved before collect: 0 bytes
   pool reserved after collect:  0 bytes
   ```
   
   `/usr/bin/time -l` gives me `375013376  maximum resident set size` (~357MB)
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to