This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 92a471e55f Fix undeterministic behaviour of schema nullability of lag
window query (#9508)
92a471e55f is described below
commit 92a471e55faf9077e1bbb1b8cfad52b9ff89faba
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat Mar 9 00:32:34 2024 +0300
Fix undeterministic behaviour of schema nullability of lag window query
(#9508)
* Initial commit
* Update comment, remove leftovers
* Add test
* Update datafusion/sqllogictest/test_files/window.slt
Co-authored-by: comphead <[email protected]>
---------
Co-authored-by: comphead <[email protected]>
---
.../src/windows/bounded_window_agg_exec.rs | 31 +++++++++++++++++-----
datafusion/sqllogictest/test_files/window.slt | 10 +++++++
2 files changed, 35 insertions(+), 6 deletions(-)
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index c99ec59959..4cba571054 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -163,6 +163,7 @@ impl BoundedWindowAggExec {
fn get_search_algo(&self) -> Result<Box<dyn PartitionSearcher>> {
let partition_by_sort_keys = self.partition_by_sort_keys()?;
let ordered_partition_by_indices =
self.ordered_partition_by_indices.clone();
+ let input_schema = self.input().schema();
Ok(match &self.input_order_mode {
InputOrderMode::Sorted => {
// In Sorted mode, all partition by columns should be ordered.
@@ -174,11 +175,12 @@ impl BoundedWindowAggExec {
Box::new(SortedSearch {
partition_by_sort_keys,
ordered_partition_by_indices,
+ input_schema,
})
}
- InputOrderMode::Linear | InputOrderMode::PartiallySorted(_) => {
- Box::new(LinearSearch::new(ordered_partition_by_indices))
- }
+ InputOrderMode::Linear | InputOrderMode::PartiallySorted(_) =>
Box::new(
+ LinearSearch::new(ordered_partition_by_indices, input_schema),
+ ),
})
}
@@ -378,12 +380,16 @@ trait PartitionSearcher: Send {
let partition_batch_state = partition_buffers
.entry(partition_row)
.or_insert_with(|| PartitionBatchState {
- record_batch:
RecordBatch::new_empty(partition_batch.schema()),
+ // Use input_schema, for the buffer schema.
+ // record_batch.schema may not have necessary schema,
in terms of
+ // nullability constraints of the output.
+ // See issue:
https://github.com/apache/arrow-datafusion/issues/9320
+ record_batch:
RecordBatch::new_empty(self.input_schema().clone()),
is_end: false,
n_out_row: 0,
});
partition_batch_state.record_batch = concat_batches(
- &partition_batch.schema(),
+ self.input_schema(),
[&partition_batch_state.record_batch, &partition_batch],
)?;
}
@@ -398,6 +404,8 @@ trait PartitionSearcher: Send {
Ok(())
}
+
+ fn input_schema(&self) -> &SchemaRef;
}
/// This object encapsulates the algorithm state for a simple linear scan
@@ -423,6 +431,7 @@ pub struct LinearSearch {
/// The third entry stores how many new outputs are calculated for the
/// corresponding partition.
row_map_out: RawTable<(u64, usize, usize)>,
+ input_schema: SchemaRef,
}
impl PartitionSearcher for LinearSearch {
@@ -561,17 +570,22 @@ impl PartitionSearcher for LinearSearch {
}
}
}
+
+ fn input_schema(&self) -> &SchemaRef {
+ &self.input_schema
+ }
}
impl LinearSearch {
/// Initialize a new [`LinearSearch`] partition searcher.
- fn new(ordered_partition_by_indices: Vec<usize>) -> Self {
+ fn new(ordered_partition_by_indices: Vec<usize>, input_schema: SchemaRef)
-> Self {
LinearSearch {
input_buffer_hashes: VecDeque::new(),
random_state: Default::default(),
ordered_partition_by_indices,
row_map_batch: RawTable::with_capacity(256),
row_map_out: RawTable::with_capacity(256),
+ input_schema,
}
}
@@ -693,6 +707,7 @@ pub struct SortedSearch {
/// is ordered by a, b and the window expression contains a PARTITION BY
b, a
/// clause, this attribute stores [1, 0].
ordered_partition_by_indices: Vec<usize>,
+ input_schema: SchemaRef,
}
impl PartitionSearcher for SortedSearch {
@@ -758,6 +773,10 @@ impl PartitionSearcher for SortedSearch {
partition_batch_state.is_end |= idx < n_partitions - 1;
}
}
+
+ fn input_schema(&self) -> &SchemaRef {
+ &self.input_schema
+ }
}
impl SortedSearch {
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 92d2208029..c7241cae30 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -4269,3 +4269,13 @@ LIMIT 5;
3 53
24 31
14 94
+
+# Tests schema and data are in sync for mixed nulls and not nulls values for
builtin window function
+query T rowsort
+select lag(a) over () as x1
+ from
+ (select 2 id, 'b' a union all select 1 id, null a union all select 3
id, null);
+----
+NULL
+NULL
+b