This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 07e63edfa0 Fix TopK aggregation for UTF-8/Utf8View group keys and add
safe fallback for unsupported string aggregates (#19285)
07e63edfa0 is described below
commit 07e63edfa01079acf1c7571b7be914edfe324b02
Author: kosiew <[email protected]>
AuthorDate: Fri Jan 9 21:27:31 2026 +0800
Fix TopK aggregation for UTF-8/Utf8View group keys and add safe fallback
for unsupported string aggregates (#19285)
## Which issue does this PR close?
* Closes #19219.
## Rationale for this change
A `GROUP BY ... ORDER BY <aggregate> ... LIMIT` query can trigger
DataFusion’s TopK aggregation optimization. In affected releases,
queries grouping by text columns—especially `Utf8View` produced via SQL
`varchar` mappings / `arrow_cast`—could fail at execution time with an
error such as `Can't group type: Utf8View`.
This happens because the optimizer may select the TopK aggregation path
even when the underlying TopK data structures (heap/hash table) do not
fully support the specific key/value Arrow types involved. Disabling
`datafusion.optimizer.enable_topk_aggregation` is a workaround, but it
forces users to trade correctness for performance.
This PR makes TopK type support explicit and consistent across the
optimizer and execution, adds support for UTF-8 string value heaps, and
ensures unsupported key/value combinations fall back to the standard
aggregation implementation rather than panicking.
## What changes are included in this PR?
* **Centralized TopK type validation**
* Introduced `topk_types_supported(key_type, value_type)` (in
`physical-plan/src/aggregates/mod.rs`) to validate both grouping key and
min/max value types.
* Optimizer now uses this shared check rather than duplicating partial
type logic.
* **Safer AggregateExec cloning for limit pushdown**
* Added `AggregateExec::with_new_limit` to clone an aggregate exec while
overriding only the TopK `limit` hint, avoiding manual reconstruction
and ensuring plan properties/fields remain consistent.
* **TopK hash table improvements + helper functions**
* Added `is_supported_hash_key_type` helper for grouping key
compatibility checks.
* Refactored string key extraction to a single helper function.
* Added `find_or_insert` entry API to avoid double lookups and unify
insertion behavior.
* **TopK heap support for string aggregate values**
* Added `StringHeap` implementation supporting `Utf8`, `LargeUtf8`, and
`Utf8View` aggregate values using lexicographic ordering.
* Added `is_supported_heap_type` helper for aggregate value
compatibility.
* Updated `new_heap` to create `StringHeap` for supported string types
and return a clearer error message for unsupported types.
* **Debug contract in TopK stream**
* Added a debug assertion in `GroupedTopKAggregateStream` documenting
that type validation should have already happened (optimizer +
can_use_topk), without affecting release builds.
## Are these changes tested?
Yes.
* Added a new physical optimizer test covering UTF-8 grouping with:
1. **Supported** numeric `max/min` value (TopK should be used and
results correct)
2. **Unsupported** string `max/min` value (must fall back to standard
aggregation and not use `GroupedTopKAggregateStream`)
* Added unit tests in `PriorityMap` to validate lexicographic `min/max`
tracking for:
* `Utf8`
* `LargeUtf8`
* `Utf8View`
* Added SQLLogicTest coverage (`aggregates_topk.slt`) for:
* `varchar` tables
* `Utf8View` via `arrow_cast`
* `EXPLAIN` verification that TopK limit propagation is applied and
plans remain stable
* Regression case for `max(trace_id)` with `ORDER BY ... LIMIT`
## Are there any user-facing changes?
Yes (bug fix).
* Queries that group by text columns (including `Utf8View`) and use
`ORDER BY <aggregate> ... LIMIT` should no longer error.
* TopK aggregation now supports UTF-8 string aggregate values for
min/max (lexicographic ordering) where applicable.
* For unsupported type combinations, DataFusion will fall back
gracefully to the standard aggregation path instead of panicking.
No breaking public API changes are intended. The only new public helper
APIs are internal to the physical plan modules.
## LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated
content has been manually reviewed and tested.
---
datafusion/core/benches/topk_aggregate.rs | 87 +++++++++-
.../physical_optimizer/aggregate_statistics.rs | 86 +++++++++
.../physical-optimizer/src/topk_aggregation.rs | 21 +--
datafusion/physical-plan/src/aggregates/mod.rs | 35 +++-
.../src/aggregates/topk/hash_table.rs | 178 ++++++++++---------
.../physical-plan/src/aggregates/topk/heap.rs | 192 ++++++++++++++++++++-
.../src/aggregates/topk/priority_map.rs | 103 +++++++++++
.../physical-plan/src/aggregates/topk_stream.rs | 14 ++
.../sqllogictest/test_files/aggregates_topk.slt | 88 ++++++++++
9 files changed, 698 insertions(+), 106 deletions(-)
diff --git a/datafusion/core/benches/topk_aggregate.rs
b/datafusion/core/benches/topk_aggregate.rs
index be193f8737..7979efdec6 100644
--- a/datafusion/core/benches/topk_aggregate.rs
+++ b/datafusion/core/benches/topk_aggregate.rs
@@ -28,6 +28,8 @@ use std::hint::black_box;
use std::sync::Arc;
use tokio::runtime::Runtime;
+const LIMIT: usize = 10;
+
async fn create_context(
partition_cnt: i32,
sample_cnt: i32,
@@ -52,6 +54,11 @@ fn run(rt: &Runtime, ctx: SessionContext, limit: usize,
use_topk: bool, asc: boo
black_box(rt.block_on(async { aggregate(ctx, limit, use_topk, asc).await
})).unwrap();
}
+fn run_string(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool)
{
+ black_box(rt.block_on(async { aggregate_string(ctx, limit, use_topk).await
}))
+ .unwrap();
+}
+
async fn aggregate(
ctx: SessionContext,
limit: usize,
@@ -72,7 +79,7 @@ async fn aggregate(
let batches = collect(plan, ctx.task_ctx()).await?;
assert_eq!(batches.len(), 1);
let batch = batches.first().unwrap();
- assert_eq!(batch.num_rows(), 10);
+ assert_eq!(batch.num_rows(), LIMIT);
let actual = format!("{}",
pretty_format_batches(&batches)?).to_lowercase();
let expected_asc = r#"
@@ -99,9 +106,36 @@ async fn aggregate(
Ok(())
}
+/// Benchmark for string aggregate functions with topk optimization.
+/// This tests grouping by a numeric column (timestamp_ms) and aggregating
+/// a string column (trace_id) with Utf8 or Utf8View data types.
+async fn aggregate_string(
+ ctx: SessionContext,
+ limit: usize,
+ use_topk: bool,
+) -> Result<()> {
+ let sql = format!(
+ "select max(trace_id) from traces group by timestamp_ms order by
max(trace_id) desc limit {limit};"
+ );
+ let df = ctx.sql(sql.as_str()).await?;
+ let plan = df.create_physical_plan().await?;
+ let actual_phys_plan = displayable(plan.as_ref()).indent(true).to_string();
+ assert_eq!(
+ actual_phys_plan.contains(&format!("lim=[{limit}]")),
+ use_topk
+ );
+
+ let batches = collect(plan, ctx.task_ctx()).await?;
+ assert_eq!(batches.len(), 1);
+ let batch = batches.first().unwrap();
+ assert_eq!(batch.num_rows(), LIMIT);
+
+ Ok(())
+}
+
fn criterion_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
- let limit = 10;
+ let limit = LIMIT;
let partitions = 10;
let samples = 1_000_000;
@@ -170,6 +204,55 @@ fn criterion_benchmark(c: &mut Criterion) {
.as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
);
+
+ // String aggregate benchmarks - grouping by timestamp, aggregating string
column
+ let ctx = rt
+ .block_on(create_context(partitions, samples, false, true, false))
+ .unwrap();
+ c.bench_function(
+ format!(
+ "top k={limit} string aggregate {} time-series rows [Utf8]",
+ partitions * samples
+ )
+ .as_str(),
+ |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
+ );
+
+ let ctx = rt
+ .block_on(create_context(partitions, samples, true, true, false))
+ .unwrap();
+ c.bench_function(
+ format!(
+ "top k={limit} string aggregate {} worst-case rows [Utf8]",
+ partitions * samples
+ )
+ .as_str(),
+ |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
+ );
+
+ let ctx = rt
+ .block_on(create_context(partitions, samples, false, true, true))
+ .unwrap();
+ c.bench_function(
+ format!(
+ "top k={limit} string aggregate {} time-series rows [Utf8View]",
+ partitions * samples
+ )
+ .as_str(),
+ |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
+ );
+
+ let ctx = rt
+ .block_on(create_context(partitions, samples, true, true, true))
+ .unwrap();
+ c.bench_function(
+ format!(
+ "top k={limit} string aggregate {} worst-case rows [Utf8View]",
+ partitions * samples
+ )
+ .as_str(),
+ |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
+ );
}
criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
index 1fdc0ae6c7..4218f76fa1 100644
--- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
@@ -20,11 +20,15 @@ use std::sync::Arc;
use crate::physical_optimizer::test_utils::TestAggregate;
use arrow::array::Int32Array;
+use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
+use datafusion::datasource::memory::MemTable;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
+use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::Result;
+use datafusion_common::assert_batches_eq;
use datafusion_common::cast::as_int64_array;
use datafusion_common::config::ConfigOptions;
use datafusion_execution::TaskContext;
@@ -38,6 +42,7 @@ use datafusion_physical_plan::aggregates::AggregateMode;
use datafusion_physical_plan::aggregates::PhysicalGroupBy;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::common;
+use datafusion_physical_plan::displayable;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::projection::ProjectionExec;
@@ -316,3 +321,84 @@ async fn test_count_with_nulls_inexact_stat() ->
Result<()> {
Ok(())
}
+
+/// Tests that TopK aggregation correctly handles UTF-8 (string) types in both
grouping keys and aggregate values.
+///
+/// The TopK optimization is designed to efficiently handle `GROUP BY ...
ORDER BY aggregate LIMIT n` queries
+/// by maintaining only the top K groups during aggregation. However, not all
type combinations are supported.
+///
+/// This test verifies two scenarios:
+/// 1. **Supported case**: UTF-8 grouping key with numeric aggregate (max/min)
- should use TopK optimization
+/// 2. **Unsupported case**: UTF-8 grouping key with UTF-8 aggregate value -
must gracefully fall back to
+/// standard aggregation without panicking
+///
+/// The fallback behavior is critical because attempting to use TopK with
unsupported types could cause
+/// runtime panics. This test ensures the optimizer correctly detects
incompatible types and chooses
+/// the appropriate execution path.
+#[tokio::test]
+async fn utf8_grouping_min_max_limit_fallbacks() -> Result<()> {
+ let mut config = SessionConfig::new();
+ config.options_mut().optimizer.enable_topk_aggregation = true;
+ let ctx = SessionContext::new_with_config(config);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(Schema::new(vec![
+ Field::new("g", DataType::Utf8, false),
+ Field::new("val_str", DataType::Utf8, false),
+ Field::new("val_num", DataType::Int64, false),
+ ])),
+ vec![
+ Arc::new(StringArray::from(vec!["a", "b", "a"])),
+ Arc::new(StringArray::from(vec!["alpha", "bravo", "charlie"])),
+ Arc::new(Int64Array::from(vec![1, 2, 3])),
+ ],
+ )?;
+ let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
+ ctx.register_table("t", Arc::new(table))?;
+
+ // Supported path: numeric min/max with UTF-8 grouping should still use
TopK aggregation
+ // and return correct results.
+ let supported_df = ctx
+ .sql("SELECT g, max(val_num) AS m FROM t GROUP BY g ORDER BY m DESC
LIMIT 1")
+ .await?;
+ let supported_batches = supported_df.collect().await?;
+ assert_batches_eq!(
+ &[
+ "+---+---+",
+ "| g | m |",
+ "+---+---+",
+ "| a | 3 |",
+ "+---+---+"
+ ],
+ &supported_batches
+ );
+
+ // Unsupported TopK value type: string min/max should fall back without
panicking.
+ let unsupported_df = ctx
+ .sql("SELECT g, max(val_str) AS s FROM t GROUP BY g ORDER BY s DESC
LIMIT 1")
+ .await?;
+ let unsupported_plan =
unsupported_df.clone().create_physical_plan().await?;
+ let unsupported_batches = unsupported_df.collect().await?;
+
+ // Ensure the plan avoided the TopK-specific stream implementation.
+ let plan_display = displayable(unsupported_plan.as_ref())
+ .indent(true)
+ .to_string();
+ assert!(
+ !plan_display.contains("GroupedTopKAggregateStream"),
+ "Unsupported UTF-8 aggregate value should not use TopK: {plan_display}"
+ );
+
+ assert_batches_eq!(
+ &[
+ "+---+---------+",
+ "| g | s |",
+ "+---+---------+",
+ "| a | charlie |",
+ "+---+---------+"
+ ],
+ &unsupported_batches
+ );
+
+ Ok(())
+}
diff --git a/datafusion/physical-optimizer/src/topk_aggregation.rs
b/datafusion/physical-optimizer/src/topk_aggregation.rs
index 7eb9e6a762..7b2983ee71 100644
--- a/datafusion/physical-optimizer/src/topk_aggregation.rs
+++ b/datafusion/physical-optimizer/src/topk_aggregation.rs
@@ -20,13 +20,12 @@
use std::sync::Arc;
use crate::PhysicalOptimizerRule;
-use arrow::datatypes::DataType;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::ExecutionPlan;
-use datafusion_physical_plan::aggregates::AggregateExec;
+use datafusion_physical_plan::aggregates::{AggregateExec,
topk_types_supported};
use datafusion_physical_plan::execution_plan::CardinalityEffect;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
@@ -55,11 +54,8 @@ impl TopKAggregation {
}
let group_key = aggr.group_expr().expr().iter().exactly_one().ok()?;
let kt = group_key.0.data_type(&aggr.input().schema()).ok()?;
- if !kt.is_primitive()
- && kt != DataType::Utf8
- && kt != DataType::Utf8View
- && kt != DataType::LargeUtf8
- {
+ let vt = field.data_type();
+ if !topk_types_supported(&kt, vt) {
return None;
}
if aggr.filter_expr().iter().any(|e| e.is_some()) {
@@ -72,16 +68,7 @@ impl TopKAggregation {
}
// We found what we want: clone, copy the limit down, and return
modified node
- let new_aggr = AggregateExec::try_new(
- *aggr.mode(),
- aggr.group_expr().clone(),
- aggr.aggr_expr().to_vec(),
- aggr.filter_expr().to_vec(),
- Arc::clone(aggr.input()),
- aggr.input_schema(),
- )
- .expect("Unable to copy Aggregate!")
- .with_limit(Some(limit));
+ let new_aggr = aggr.with_new_limit(Some(limit));
Some(Arc::new(new_aggr))
}
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index b0828ff232..4dd9482ac4 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -41,7 +41,7 @@ use parking_lot::Mutex;
use std::collections::HashSet;
use arrow::array::{ArrayRef, UInt8Array, UInt16Array, UInt32Array,
UInt64Array};
-use arrow::datatypes::{Field, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_schema::FieldRef;
use datafusion_common::stats::Precision;
@@ -64,6 +64,8 @@ use datafusion_physical_expr_common::sort_expr::{
use datafusion_expr::utils::AggregateOrderSensitivity;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use itertools::Itertools;
+use topk::hash_table::is_supported_hash_key_type;
+use topk::heap::is_supported_heap_type;
pub mod group_values;
mod no_grouping;
@@ -72,6 +74,17 @@ mod row_hash;
mod topk;
mod topk_stream;
+/// Returns true if TopK aggregation data structures support the provided key
and value types.
+///
+/// This function checks whether both the key type (used for grouping) and
value type
+/// (used in min/max aggregation) can be handled by the TopK aggregation heap
and hash table.
+/// Supported types include Arrow primitives (integers, floats, decimals,
intervals) and
+/// UTF-8 strings (`Utf8`, `LargeUtf8`, `Utf8View`).
+/// ```text
+pub fn topk_types_supported(key_type: &DataType, value_type: &DataType) ->
bool {
+ is_supported_hash_key_type(key_type) && is_supported_heap_type(value_type)
+}
+
/// Hard-coded seed for aggregations to ensure hash values differ from
`RepartitionExec`, avoiding collisions.
const AGGREGATION_HASH_SEED: ahash::RandomState =
ahash::RandomState::with_seeds('A' as u64, 'G' as u64, 'G' as u64, 'R' as
u64);
@@ -553,6 +566,26 @@ impl AggregateExec {
}
}
+ /// Clone this exec, overriding only the limit hint.
+ pub fn with_new_limit(&self, limit: Option<usize>) -> Self {
+ Self {
+ limit,
+ // clone the rest of the fields
+ required_input_ordering: self.required_input_ordering.clone(),
+ metrics: ExecutionPlanMetricsSet::new(),
+ input_order_mode: self.input_order_mode.clone(),
+ cache: self.cache.clone(),
+ mode: self.mode,
+ group_by: self.group_by.clone(),
+ aggr_expr: self.aggr_expr.clone(),
+ filter_expr: self.filter_expr.clone(),
+ input: Arc::clone(&self.input),
+ schema: Arc::clone(&self.schema),
+ input_schema: Arc::clone(&self.input_schema),
+ dynamic_filter: self.dynamic_filter.clone(),
+ }
+ }
+
pub fn cache(&self) -> &PlanProperties {
&self.cache
}
diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs
b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs
index 4a3f3ac258..418ec49ddd 100644
--- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs
+++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs
@@ -72,6 +72,19 @@ pub trait ArrowHashTable {
fn find_or_insert(&mut self, row_idx: usize, replace_idx: usize) ->
(usize, bool);
}
+/// Returns true if the given data type can be used as a top-K aggregation
hash key.
+///
+/// Supported types include Arrow primitives (integers, floats, decimals,
intervals)
+/// and UTF-8 strings (`Utf8`, `LargeUtf8`, `Utf8View`). This is used
internally by
+/// `PriorityMap::supports()` to validate grouping key type compatibility.
+pub fn is_supported_hash_key_type(kt: &DataType) -> bool {
+ kt.is_primitive()
+ || matches!(
+ kt,
+ DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8
+ )
+}
+
// An implementation of ArrowHashTable for String keys
pub struct StringHashTable {
owned: ArrayRef,
@@ -108,6 +121,34 @@ impl StringHashTable {
data_type,
}
}
+
+ /// Extracts the string value at the given row index, handling nulls and
different string types.
+ ///
+ /// Returns `None` if the value is null, otherwise
`Some(value.to_string())`.
+ fn extract_string_value(&self, row_idx: usize) -> Option<String> {
+ let is_null_and_value = match self.data_type {
+ DataType::Utf8 => {
+ let arr = self.owned.as_string::<i32>();
+ (arr.is_null(row_idx), arr.value(row_idx))
+ }
+ DataType::LargeUtf8 => {
+ let arr = self.owned.as_string::<i64>();
+ (arr.is_null(row_idx), arr.value(row_idx))
+ }
+ DataType::Utf8View => {
+ let arr = self.owned.as_string_view();
+ (arr.is_null(row_idx), arr.value(row_idx))
+ }
+ _ => panic!("Unsupported data type"),
+ };
+
+ let (is_null, value) = is_null_and_value;
+ if is_null {
+ None
+ } else {
+ Some(value.to_string())
+ }
+ }
}
impl ArrowHashTable for StringHashTable {
@@ -138,63 +179,15 @@ impl ArrowHashTable for StringHashTable {
}
fn find_or_insert(&mut self, row_idx: usize, replace_idx: usize) ->
(usize, bool) {
- let id = match self.data_type {
- DataType::Utf8 => {
- let ids = self
- .owned
- .as_any()
- .downcast_ref::<StringArray>()
- .expect("Expected StringArray for DataType::Utf8");
- if ids.is_null(row_idx) {
- None
- } else {
- Some(ids.value(row_idx))
- }
- }
- DataType::LargeUtf8 => {
- let ids = self
- .owned
- .as_any()
- .downcast_ref::<LargeStringArray>()
- .expect("Expected LargeStringArray for
DataType::LargeUtf8");
- if ids.is_null(row_idx) {
- None
- } else {
- Some(ids.value(row_idx))
- }
- }
- DataType::Utf8View => {
- let ids = self
- .owned
- .as_any()
- .downcast_ref::<StringViewArray>()
- .expect("Expected StringViewArray for DataType::Utf8View");
- if ids.is_null(row_idx) {
- None
- } else {
- Some(ids.value(row_idx))
- }
- }
- _ => panic!("Unsupported data type"),
- };
-
- // TODO: avoid double lookup by using entry API
-
- let hash = self.rnd.hash_one(id);
- if let Some(map_idx) = self
- .map
- .find(hash, |mi| id == mi.as_ref().map(|id| id.as_str()))
- {
- return (map_idx, false);
- }
+ let id = self.extract_string_value(row_idx);
- // we're full and this is a better value, so remove the worst
- let heap_idx = self.map.remove_if_full(replace_idx);
+ // Compute hash and create equality closure for hash table lookup.
+ let hash = self.rnd.hash_one(id.as_deref());
+ let id_for_eq = id.clone();
+ let eq = move |mi: &Option<String>| id_for_eq.as_deref() ==
mi.as_deref();
- // add the new group
- let id = id.map(|id| id.to_string());
- let map_idx = self.map.insert(hash, &id, heap_idx);
- (map_idx, true)
+ // Use entry API to avoid double lookup
+ self.map.find_or_insert(hash, id, replace_idx, eq)
}
}
@@ -260,19 +253,12 @@ where
} else {
Some(ids.value(row_idx))
};
-
+ // Compute hash and create equality closure for hash table lookup.
let hash: u64 = id.hash(&self.rnd);
- // TODO: avoid double lookup by using entry API
- if let Some(map_idx) = self.map.find(hash, |mi| id == *mi) {
- return (map_idx, false);
- }
-
- // we're full and this is a better value, so remove the worst
- let heap_idx = self.map.remove_if_full(replace_idx);
+ let eq = |mi: &Option<VAL::Native>| id == *mi;
- // add the new group
- let map_idx = self.map.insert(hash, &id, heap_idx);
- (map_idx, true)
+ // Use entry API to avoid double lookup
+ self.map.find_or_insert(hash, id, replace_idx, eq)
}
}
@@ -287,11 +273,6 @@ impl<ID: KeyType + PartialEq> TopKHashTable<ID> {
}
}
- pub fn find(&self, hash: u64, mut eq: impl FnMut(&ID) -> bool) ->
Option<usize> {
- let eq = |&idx: &usize| eq(&self.store[idx].as_ref().unwrap().id);
- self.map.find(hash, eq).copied()
- }
-
pub fn heap_idx_at(&self, map_idx: usize) -> usize {
self.store[map_idx].as_ref().unwrap().heap_idx
}
@@ -324,8 +305,27 @@ impl<ID: KeyType + PartialEq> TopKHashTable<ID> {
}
}
- pub fn insert(&mut self, hash: u64, id: &ID, heap_idx: usize) -> usize {
- let mi = HashTableItem::new(hash, id.clone(), heap_idx);
+ /// Find an existing entry or insert a new one, avoiding double hash table
lookup.
+ /// Returns (map_idx, is_new) where is_new indicates if this was a new
insertion.
+ /// If inserting a new entry and the table is full, replaces the entry at
replace_idx.
+ pub fn find_or_insert(
+ &mut self,
+ hash: u64,
+ id: ID,
+ replace_idx: usize,
+ mut eq: impl FnMut(&ID) -> bool,
+ ) -> (usize, bool) {
+ // Check if entry exists - this is the only hash table lookup
+ {
+ let eq_fn = |idx: &usize|
eq(&self.store[*idx].as_ref().unwrap().id);
+ if let Some(&map_idx) = self.map.find(hash, eq_fn) {
+ return (map_idx, false);
+ }
+ }
+
+ // Entry doesn't exist - compute heap_idx and prepare item
+ let heap_idx = self.remove_if_full(replace_idx);
+ let mi = HashTableItem::new(hash, id, heap_idx);
let store_idx = if let Some(idx) = self.free_index.take() {
self.store[idx] = Some(mi);
idx
@@ -334,19 +334,15 @@ impl<ID: KeyType + PartialEq> TopKHashTable<ID> {
self.store.len() - 1
};
+ // Reserve space if needed
let hasher = |idx: &usize| self.store[*idx].as_ref().unwrap().hash;
if self.map.len() == self.map.capacity() {
self.map.reserve(self.limit, hasher);
}
- let eq_fn = |idx: &usize| self.store[*idx].as_ref().unwrap().id == *id;
- match self.map.entry(hash, eq_fn, hasher) {
- Entry::Occupied(_) => unreachable!("Item should not exist"),
- Entry::Vacant(vacant) => {
- vacant.insert(store_idx);
- }
- }
- store_idx
+ // Insert without checking again since we already confirmed it doesn't
exist
+ self.map.insert_unique(hash, store_idx, hasher);
+ (store_idx, true)
}
pub fn len(&self) -> usize {
@@ -449,15 +445,29 @@ mod tests {
#[test]
fn should_resize_properly() -> Result<()> {
let mut heap_to_map = BTreeMap::<usize, usize>::new();
+ // Create TopKHashTable with limit=5 and capacity=3 to force resizing
let mut map = TopKHashTable::<Option<String>>::new(5, 3);
- for (heap_idx, id) in vec!["1", "2", "3", "4",
"5"].into_iter().enumerate() {
+
+ // Insert 5 entries, tracking the heap-to-map index mapping
+ for (heap_idx, id) in ["1", "2", "3", "4", "5"].iter().enumerate() {
+ let value = Some(id.to_string());
let hash = heap_idx as u64;
- let map_idx = map.insert(hash, &Some(id.to_string()), heap_idx);
- let _ = heap_to_map.insert(heap_idx, map_idx);
+ let (map_idx, is_new) =
+ map.find_or_insert(hash, value.clone(), heap_idx, |v| *v ==
value);
+ assert!(is_new, "Entry should be new");
+ heap_to_map.insert(heap_idx, map_idx);
}
+ // Verify all 5 entries are present
+ assert_eq!(map.len(), 5);
+
+ // Verify that the hash table resized properly (capacity should have
grown beyond 3)
+ // This is implicit - if it didn't resize, insertions would have
failed or been slow
+
+ // Drain all values in heap order
let (_heap_idxs, map_idxs): (Vec<_>, Vec<_>) =
heap_to_map.into_iter().unzip();
let ids = map.take_all(map_idxs);
+
assert_eq!(
format!("{ids:?}"),
r#"[Some("1"), Some("2"), Some("3"), Some("4"), Some("5")]"#
diff --git a/datafusion/physical-plan/src/aggregates/topk/heap.rs
b/datafusion/physical-plan/src/aggregates/topk/heap.rs
index b4569c3d08..9f0b697cca 100644
--- a/datafusion/physical-plan/src/aggregates/topk/heap.rs
+++ b/datafusion/physical-plan/src/aggregates/topk/heap.rs
@@ -15,10 +15,18 @@
// specific language governing permissions and limitations
// under the License.
-//! A custom binary heap implementation for performant top K aggregation
+//! A custom binary heap implementation for performant top K aggregation.
+//!
+//! the `new_heap` //! factory function selects an appropriate heap
implementation
+//! based on the Arrow data type.
+//!
+//! Supported value types include Arrow primitives (integers, floats,
decimals, intervals)
+//! and UTF-8 strings (`Utf8`, `LargeUtf8`, `Utf8View`) using lexicographic
ordering.
use arrow::array::{ArrayRef, ArrowPrimitiveType, PrimitiveArray,
downcast_primitive};
+use arrow::array::{LargeStringBuilder, StringBuilder, StringViewBuilder};
use arrow::array::{
+ StringArray,
cast::AsArray,
types::{IntervalDayTime, IntervalMonthDayNano},
};
@@ -156,6 +164,164 @@ where
}
}
+/// An implementation of `ArrowHeap` that deals with string values.
+///
+/// Supports all three UTF-8 string types: `Utf8`, `LargeUtf8`, and `Utf8View`.
+/// String values are compared lexicographically using the compare-first
pattern:
+/// borrowed strings are compared before allocation, and only allocated when
the
+/// heap confirms they improve the top-K set.
+///
+pub struct StringHeap {
+ batch: ArrayRef,
+ heap: TopKHeap<Option<String>>,
+ desc: bool,
+ data_type: DataType,
+}
+
+impl StringHeap {
+ pub fn new(limit: usize, desc: bool, data_type: DataType) -> Self {
+ let batch: ArrayRef = Arc::new(StringArray::from(Vec::<&str>::new()));
+ Self {
+ batch,
+ heap: TopKHeap::new(limit, desc),
+ desc,
+ data_type,
+ }
+ }
+
+ /// Extracts a string value from the current batch at the given row index.
+ ///
+ /// Panics if the row index is out of bounds or if the data type is not
one of
+ /// the supported UTF-8 string types.
+ ///
+ /// Note: Null values should not appear in the input; the aggregation layer
+ /// ensures nulls are filtered before reaching this code.
+ fn value(&self, row_idx: usize) -> &str {
+ extract_string_value(&self.batch, &self.data_type, row_idx)
+ }
+}
+
+/// Helper to extract a string value from an ArrayRef at a given index.
+///
+/// Supports `Utf8`, `LargeUtf8`, and `Utf8View` data types.
+///
+/// # Panics
+/// Panics if the index is out of bounds or if the data type is unsupported.
+fn extract_string_value<'a>(
+ batch: &'a ArrayRef,
+ data_type: &DataType,
+ idx: usize,
+) -> &'a str {
+ match data_type {
+ DataType::Utf8 => batch.as_string::<i32>().value(idx),
+ DataType::LargeUtf8 => batch.as_string::<i64>().value(idx),
+ DataType::Utf8View => batch.as_string_view().value(idx),
+ _ => unreachable!("Unsupported string type: {:?}", data_type),
+ }
+}
+
+impl ArrowHeap for StringHeap {
+ fn set_batch(&mut self, vals: ArrayRef) {
+ self.batch = vals;
+ }
+
+ fn is_worse(&self, row_idx: usize) -> bool {
+ if !self.heap.is_full() {
+ return false;
+ }
+ // Compare borrowed `&str` against the worst heap value first to avoid
+ // allocating a `String` unless this row would actually replace an
+ // existing heap entry.
+ let new_val = self.value(row_idx);
+ let worst_val = self.heap.worst_val().expect("Missing root");
+ match worst_val {
+ None => false,
+ Some(worst_str) => {
+ (!self.desc && new_val > worst_str.as_str())
+ || (self.desc && new_val < worst_str.as_str())
+ }
+ }
+ }
+
+ fn worst_map_idx(&self) -> usize {
+ self.heap.worst_map_idx()
+ }
+
+ fn insert(&mut self, row_idx: usize, map_idx: usize, map: &mut Vec<(usize,
usize)>) {
+ // When appending (heap not full) we must allocate to own the string
+ // because it will be stored in the heap. For replacements we avoid
+ // allocation until `replace_if_better` confirms a replacement is
+ // necessary.
+ let new_str = self.value(row_idx).to_string();
+ let new_val = Some(new_str);
+ self.heap.append_or_replace(new_val, map_idx, map);
+ }
+
+ fn replace_if_better(
+ &mut self,
+ heap_idx: usize,
+ row_idx: usize,
+ map: &mut Vec<(usize, usize)>,
+ ) {
+ let new_str = self.value(row_idx);
+ let existing = self.heap.heap[heap_idx]
+ .as_ref()
+ .expect("Missing heap item");
+
+ // Compare borrowed reference first—no allocation yet.
+ // We compare the borrowed `&str` with the stored `Option<String>` and
+ // only allocate (`to_string()`) when a replacement is required.
+ match &existing.val {
+ None => {
+ // Existing is null; new value always wins
+ let new_val = Some(new_str.to_string());
+ self.heap.replace_if_better(heap_idx, new_val, map);
+ }
+ Some(existing_str) => {
+ // Compare borrowed strings first
+ if (!self.desc && new_str < existing_str.as_str())
+ || (self.desc && new_str > existing_str.as_str())
+ {
+ let new_val = Some(new_str.to_string());
+ self.heap.replace_if_better(heap_idx, new_val, map);
+ }
+ // Else: no improvement, no allocation
+ }
+ }
+ }
+
+ fn drain(&mut self) -> (ArrayRef, Vec<usize>) {
+ let (vals, map_idxs) = self.heap.drain();
+ // Use Arrow builders to safely construct arrays from the owned
+ // `Option<String>` values. Builders avoid needing to maintain
+ // references to temporary storage.
+
+ // Macro to eliminate duplication across string builder types.
+ // All three builders share the same interface for append_value,
+ // append_null, and finish, differing only in their concrete types.
+ macro_rules! build_string_array {
+ ($builder_type:ty) => {{
+ let mut builder = <$builder_type>::new();
+ for val in vals {
+ match val {
+ Some(s) => builder.append_value(&s),
+ None => builder.append_null(),
+ }
+ }
+ Arc::new(builder.finish())
+ }};
+ }
+
+ let arr: ArrayRef = match self.data_type {
+ DataType::Utf8 => build_string_array!(StringBuilder),
+ DataType::LargeUtf8 => build_string_array!(LargeStringBuilder),
+ DataType::Utf8View => build_string_array!(StringViewBuilder),
+ _ => unreachable!("Unsupported string type: {:?}", self.data_type),
+ };
+ (arr, map_idxs)
+ }
+}
+
impl<VAL: ValueType> TopKHeap<VAL> {
pub fn new(limit: usize, desc: bool) -> Self {
Self {
@@ -438,11 +604,31 @@ compare_integer!(u8, u16, u32, u64);
compare_integer!(IntervalDayTime, IntervalMonthDayNano);
compare_float!(f16, f32, f64);
+/// Returns true if the given data type can be stored in a top-K aggregation
heap.
+///
+/// Supported types include Arrow primitives (integers, floats, decimals,
intervals)
+/// and UTF-8 strings (`Utf8`, `LargeUtf8`, `Utf8View`). This is used
internally by
+/// `PriorityMap::supports()` to validate aggregate value type compatibility.
+pub fn is_supported_heap_type(vt: &DataType) -> bool {
+ vt.is_primitive()
+ || matches!(
+ vt,
+ DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8
+ )
+}
+
pub fn new_heap(
limit: usize,
desc: bool,
vt: DataType,
) -> Result<Box<dyn ArrowHeap + Send>> {
+ if matches!(
+ vt,
+ DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
+ ) {
+ return Ok(Box::new(StringHeap::new(limit, desc, vt)));
+ }
+
macro_rules! downcast_helper {
($vt:ty, $d:ident) => {
return Ok(Box::new(PrimitiveHeap::<$vt>::new(limit, desc, vt)))
@@ -454,7 +640,9 @@ pub fn new_heap(
_ => {}
}
- Err(exec_datafusion_err!("Can't group type: {vt:?}"))
+ Err(exec_datafusion_err!(
+ "Unsupported TopK aggregate value type: {vt:?}"
+ ))
}
#[cfg(test)]
diff --git a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs
b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs
index 8e093d213e..c74b648d37 100644
--- a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs
+++ b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs
@@ -373,6 +373,102 @@ mod tests {
Ok(())
}
+ #[test]
+ fn should_track_lexicographic_min_utf8_value() -> Result<()> {
+ let ids: ArrayRef = Arc::new(Int64Array::from(vec![1, 1]));
+ let vals: ArrayRef = Arc::new(StringArray::from(vec!["zulu",
"alpha"]));
+ let mut agg = PriorityMap::new(DataType::Int64, DataType::Utf8, 1,
false)?;
+ agg.set_batch(ids, vals);
+ agg.insert(0)?;
+ agg.insert(1)?;
+
+ let cols = agg.emit()?;
+ let batch = RecordBatch::try_new(test_schema_value(DataType::Utf8),
cols)?;
+ let actual = format!("{}", pretty_format_batches(&[batch])?);
+
+ assert_snapshot!(actual, @r#"
++----------+--------------+
+| trace_id | timestamp_ms |
++----------+--------------+
+| 1 | alpha |
++----------+--------------+
+ "#);
+
+ Ok(())
+ }
+
+ #[test]
+ fn should_track_lexicographic_max_utf8_value_desc() -> Result<()> {
+ let ids: ArrayRef = Arc::new(Int64Array::from(vec![1, 1]));
+ let vals: ArrayRef = Arc::new(StringArray::from(vec!["alpha",
"zulu"]));
+ let mut agg = PriorityMap::new(DataType::Int64, DataType::Utf8, 1,
true)?;
+ agg.set_batch(ids, vals);
+ agg.insert(0)?;
+ agg.insert(1)?;
+
+ let cols = agg.emit()?;
+ let batch = RecordBatch::try_new(test_schema_value(DataType::Utf8),
cols)?;
+ let actual = format!("{}", pretty_format_batches(&[batch])?);
+
+ assert_snapshot!(actual, @r#"
++----------+--------------+
+| trace_id | timestamp_ms |
++----------+--------------+
+| 1 | zulu |
++----------+--------------+
+ "#);
+
+ Ok(())
+ }
+
+ #[test]
+ fn should_track_large_utf8_values() -> Result<()> {
+ let ids: ArrayRef = Arc::new(Int64Array::from(vec![1, 1]));
+ let vals: ArrayRef = Arc::new(LargeStringArray::from(vec!["zulu",
"alpha"]));
+ let mut agg = PriorityMap::new(DataType::Int64, DataType::LargeUtf8,
1, false)?;
+ agg.set_batch(ids, vals);
+ agg.insert(0)?;
+ agg.insert(1)?;
+
+ let cols = agg.emit()?;
+ let batch =
RecordBatch::try_new(test_schema_value(DataType::LargeUtf8), cols)?;
+ let actual = format!("{}", pretty_format_batches(&[batch])?);
+
+ assert_snapshot!(actual, @r#"
++----------+--------------+
+| trace_id | timestamp_ms |
++----------+--------------+
+| 1 | alpha |
++----------+--------------+
+ "#);
+
+ Ok(())
+ }
+
+ #[test]
+ fn should_track_utf8_view_values() -> Result<()> {
+ let ids: ArrayRef = Arc::new(Int64Array::from(vec![1, 1]));
+ let vals: ArrayRef = Arc::new(StringViewArray::from(vec!["alpha",
"zulu"]));
+ let mut agg = PriorityMap::new(DataType::Int64, DataType::Utf8View, 1,
true)?;
+ agg.set_batch(ids, vals);
+ agg.insert(0)?;
+ agg.insert(1)?;
+
+ let cols = agg.emit()?;
+ let batch =
RecordBatch::try_new(test_schema_value(DataType::Utf8View), cols)?;
+ let actual = format!("{}", pretty_format_batches(&[batch])?);
+
+ assert_snapshot!(actual, @r#"
++----------+--------------+
+| trace_id | timestamp_ms |
++----------+--------------+
+| 1 | zulu |
++----------+--------------+
+ "#);
+
+ Ok(())
+ }
+
#[test]
fn should_handle_null_ids() -> Result<()> {
let ids: ArrayRef = Arc::new(StringArray::from(vec![Some("1"), None,
None]));
@@ -419,4 +515,11 @@ mod tests {
Field::new("timestamp_ms", DataType::Int64, true),
]))
}
+
+ fn test_schema_value(value_type: DataType) -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("trace_id", DataType::Int64, true),
+ Field::new("timestamp_ms", value_type, true),
+ ]))
+ }
}
diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs
b/datafusion/physical-plan/src/aggregates/topk_stream.rs
index 1096eb64d3..a43b5cff12 100644
--- a/datafusion/physical-plan/src/aggregates/topk_stream.rs
+++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs
@@ -19,6 +19,7 @@
use crate::aggregates::group_values::GroupByMetrics;
use crate::aggregates::topk::priority_map::PriorityMap;
+use crate::aggregates::topk_types_supported;
use crate::aggregates::{
AggregateExec, PhysicalGroupBy, aggregate_expressions, evaluate_group_by,
evaluate_many,
@@ -73,6 +74,19 @@ impl GroupedTopKAggregateStream {
let kt = expr.data_type(&aggr.input().schema())?;
let vt = val_field.data_type().clone();
+ // Type validation is performed by the optimizer and can_use_topk()
check.
+ // This debug assertion documents the contract without runtime
overhead in release builds.
+ #[cfg(debug_assertions)]
+ {
+ debug_assert!(
+ topk_types_supported(&kt, &vt),
+ "TopK type validation should have been performed by optimizer
and can_use_topk(). \
+ Found unsupported types: key={kt:?}, value={vt:?}"
+ );
+ }
+
+ // Note: Null values in aggregate columns are filtered by the
aggregation layer
+ // before reaching the heap, so the heap implementations don't need
explicit null handling.
let priority_map = PriorityMap::new(kt, vt, limit, desc)?;
Ok(GroupedTopKAggregateStream {
diff --git a/datafusion/sqllogictest/test_files/aggregates_topk.slt
b/datafusion/sqllogictest/test_files/aggregates_topk.slt
index 58abecfacf..05f3e02bbc 100644
--- a/datafusion/sqllogictest/test_files/aggregates_topk.slt
+++ b/datafusion/sqllogictest/test_files/aggregates_topk.slt
@@ -195,6 +195,70 @@ a -1 -1
NULL 0 0
a 1 1
+statement ok
+CREATE TABLE string_topk(category varchar, val varchar) AS VALUES
+('x', 'apple'),
+('x', 'zebra'),
+('y', 'banana'),
+('y', 'apricot'),
+('z', 'mango');
+
+statement ok
+CREATE VIEW string_topk_view AS
+SELECT
+ arrow_cast(category, 'Utf8View') AS category,
+ arrow_cast(val, 'Utf8View') AS val
+FROM
+ string_topk;
+
+query TT
+select category, max(val) from string_topk group by category order by max(val)
desc limit 2;
+----
+x zebra
+z mango
+
+query TT
+explain select category, max(val) max_val from string_topk group by category
order by max_val desc limit 2;
+----
+logical_plan
+01)Sort: max_val DESC NULLS FIRST, fetch=2
+02)--Projection: string_topk.category, max(string_topk.val) AS max_val
+03)----Aggregate: groupBy=[[string_topk.category]],
aggr=[[max(string_topk.val)]]
+04)------TableScan: string_topk projection=[category, val]
+physical_plan
+01)SortPreservingMergeExec: [max_val@1 DESC], fetch=2
+02)--SortExec: TopK(fetch=2), expr=[max_val@1 DESC],
preserve_partitioning=[true]
+03)----ProjectionExec: expr=[category@0 as category, max(string_topk.val)@1 as
max_val]
+04)------AggregateExec: mode=FinalPartitioned, gby=[category@0 as category],
aggr=[max(string_topk.val)], lim=[2]
+05)--------RepartitionExec: partitioning=Hash([category@0], 4),
input_partitions=1
+06)----------AggregateExec: mode=Partial, gby=[category@0 as category],
aggr=[max(string_topk.val)], lim=[2]
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query TT
+select category, max(val) from string_topk_view group by category order by
max(val) desc limit 2;
+----
+x zebra
+z mango
+
+query TT
+explain select category, max(val) max_val from string_topk_view group by
category order by max_val desc limit 2;
+----
+logical_plan
+01)Sort: max_val DESC NULLS FIRST, fetch=2
+02)--Projection: string_topk_view.category, max(string_topk_view.val) AS
max_val
+03)----Aggregate: groupBy=[[string_topk_view.category]],
aggr=[[max(string_topk_view.val)]]
+04)------SubqueryAlias: string_topk_view
+05)--------Projection: string_topk.category AS category, string_topk.val AS val
+06)----------TableScan: string_topk projection=[category, val]
+physical_plan
+01)SortPreservingMergeExec: [max_val@1 DESC], fetch=2
+02)--SortExec: TopK(fetch=2), expr=[max_val@1 DESC],
preserve_partitioning=[true]
+03)----ProjectionExec: expr=[category@0 as category,
max(string_topk_view.val)@1 as max_val]
+04)------AggregateExec: mode=FinalPartitioned, gby=[category@0 as category],
aggr=[max(string_topk_view.val)], lim=[2]
+05)--------RepartitionExec: partitioning=Hash([category@0], 4),
input_partitions=1
+06)----------AggregateExec: mode=Partial, gby=[category@0 as category],
aggr=[max(string_topk_view.val)], lim=[2]
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+
query TII
select trace_id, min(other), MIN(timestamp) from traces group by trace_id
order by MIN(timestamp), MIN(other) limit 4;
----
@@ -203,6 +267,30 @@ a -1 -1
NULL 0 0
c 1 2
+# Regression tests for string max with ORDER BY ... LIMIT to ensure schema
stability
+query TT
+select trace_id, max(trace_id) as max_trace from traces group by trace_id
order by max_trace desc limit 2;
+----
+c c
+b b
+
+query TT
+explain select trace_id, max(trace_id) as max_trace from traces group by
trace_id order by max_trace desc limit 2;
+----
+logical_plan
+01)Sort: max_trace DESC NULLS FIRST, fetch=2
+02)--Projection: traces.trace_id, max(traces.trace_id) AS max_trace
+03)----Aggregate: groupBy=[[traces.trace_id]], aggr=[[max(traces.trace_id)]]
+04)------TableScan: traces projection=[trace_id]
+physical_plan
+01)SortPreservingMergeExec: [max_trace@1 DESC], fetch=2
+02)--SortExec: TopK(fetch=2), expr=[max_trace@1 DESC],
preserve_partitioning=[true]
+03)----ProjectionExec: expr=[trace_id@0 as trace_id, max(traces.trace_id)@1 as
max_trace]
+04)------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id],
aggr=[max(traces.trace_id)], lim=[2]
+05)--------RepartitionExec: partitioning=Hash([trace_id@0], 4),
input_partitions=1
+06)----------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[max(traces.trace_id)], lim=[2]
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+
# Setting to map varchar to utf8view, to test PR
https://github.com/apache/datafusion/pull/15152
# Before the PR, the test case would not work because the Utf8View will not be
supported by the TopK aggregation
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]