This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e21b031545 NTH_VALUE reverse support (#8327)
e21b031545 is described below
commit e21b03154511cd61e03e299a595db6be6b1852c1
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Nov 29 08:48:26 2023 +0300
NTH_VALUE reverse support (#8327)
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../src/physical_optimizer/enforce_distribution.rs | 6 +-
.../core/src/physical_optimizer/enforce_sorting.rs | 3 +-
.../src/physical_optimizer/projection_pushdown.rs | 3 +-
.../replace_with_order_preserving_variants.rs | 9 +-
datafusion/core/src/physical_optimizer/utils.rs | 9 +-
datafusion/physical-expr/src/window/nth_value.rs | 89 ++++++++++----
datafusion/physical-expr/src/window/window_expr.rs | 16 +--
datafusion/physical-plan/src/lib.rs | 7 ++
.../src/windows/bounded_window_agg_exec.rs | 128 +++++++++++++++++++++
datafusion/proto/src/physical_plan/to_proto.rs | 8 +-
datafusion/sqllogictest/test_files/window.slt | 50 ++++++++
11 files changed, 269 insertions(+), 59 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index a34958a6c9..4befea741c 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -28,8 +28,8 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
- add_sort_above, get_children_exectrees, get_plan_string,
is_coalesce_partitions,
- is_repartition, is_sort_preserving_merge, ExecTree,
+ add_sort_above, get_children_exectrees, is_coalesce_partitions,
is_repartition,
+ is_sort_preserving_merge, ExecTree,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
@@ -54,8 +54,8 @@ use
datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr,
};
-use datafusion_physical_plan::unbounded_output;
use datafusion_physical_plan::windows::{get_best_fitting_window,
BoundedWindowAggExec};
+use datafusion_physical_plan::{get_plan_string, unbounded_output};
use itertools::izip;
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 6fec74f608..ff052b5f04 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -765,9 +765,8 @@ mod tests {
repartition_exec, sort_exec, sort_expr, sort_expr_options,
sort_merge_join_exec,
sort_preserving_merge_exec, spr_repartition_exec, union_exec,
};
- use crate::physical_optimizer::utils::get_plan_string;
use crate::physical_plan::repartition::RepartitionExec;
- use crate::physical_plan::{displayable, Partitioning};
+ use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::csv_exec_sorted;
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index c0e512ffe5..7ebb64ab85 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1130,7 +1130,6 @@ mod tests {
use crate::physical_optimizer::projection_pushdown::{
join_table_borders, update_expr, ProjectionPushdown,
};
- use crate::physical_optimizer::utils::get_plan_string;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::filter::FilterExec;
@@ -1141,7 +1140,7 @@ mod tests {
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
- use crate::physical_plan::ExecutionPlan;
+ use crate::physical_plan::{get_plan_string, ExecutionPlan};
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
use datafusion_common::config::ConfigOptions;
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 5f130848de..09274938cb 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -286,7 +286,7 @@ mod tests {
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
- use crate::physical_plan::{displayable, Partitioning};
+ use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::SessionConfig;
use arrow::compute::SortOptions;
@@ -958,11 +958,4 @@ mod tests {
FileCompressionType::UNCOMPRESSED,
))
}
-
- // Util function to get string representation of a physical plan
- fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
- let formatted = displayable(plan.as_ref()).indent(true).to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- actual.iter().map(|elem| elem.to_string()).collect()
- }
}
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index 530df374ca..fccc1db0d3 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -28,7 +28,7 @@ use crate::physical_plan::sorts::sort::SortExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
-use crate::physical_plan::{displayable, ExecutionPlan};
+use crate::physical_plan::{get_plan_string, ExecutionPlan};
use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement};
@@ -154,10 +154,3 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}
-
-/// Utility function yielding a string representation of the given
[`ExecutionPlan`].
-pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
- let formatted = displayable(plan.as_ref()).indent(true).to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- actual.iter().map(|elem| elem.to_string()).collect()
-}
diff --git a/datafusion/physical-expr/src/window/nth_value.rs
b/datafusion/physical-expr/src/window/nth_value.rs
index 262a50969b..b3c89122eb 100644
--- a/datafusion/physical-expr/src/window/nth_value.rs
+++ b/datafusion/physical-expr/src/window/nth_value.rs
@@ -15,21 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines physical expressions for `first_value`, `last_value`, and
`nth_value`
-//! that can evaluated at runtime during query execution
+//! Defines physical expressions for `FIRST_VALUE`, `LAST_VALUE`, and
`NTH_VALUE`
+//! functions that can be evaluated at run time during query execution.
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::ops::Range;
+use std::sync::Arc;
use crate::window::window_expr::{NthValueKind, NthValueState};
use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
+
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::window_state::WindowAggState;
use datafusion_expr::PartitionEvaluator;
-use std::any::Any;
-use std::ops::Range;
-use std::sync::Arc;
/// nth_value expression
#[derive(Debug)]
@@ -77,17 +80,17 @@ impl NthValue {
n: u32,
) -> Result<Self> {
match n {
- 0 => exec_err!("nth_value expect n to be > 0"),
+ 0 => exec_err!("NTH_VALUE expects n to be non-zero"),
_ => Ok(Self {
name: name.into(),
expr,
data_type,
- kind: NthValueKind::Nth(n),
+ kind: NthValueKind::Nth(n as i64),
}),
}
}
- /// Get nth_value kind
+ /// Get the NTH_VALUE kind
pub fn get_kind(&self) -> NthValueKind {
self.kind
}
@@ -125,7 +128,7 @@ impl BuiltInWindowFunctionExpr for NthValue {
let reversed_kind = match self.kind {
NthValueKind::First => NthValueKind::Last,
NthValueKind::Last => NthValueKind::First,
- NthValueKind::Nth(_) => return None,
+ NthValueKind::Nth(idx) => NthValueKind::Nth(-idx),
};
Some(Arc::new(Self {
name: self.name.clone(),
@@ -143,16 +146,17 @@ pub(crate) struct NthValueEvaluator {
}
impl PartitionEvaluator for NthValueEvaluator {
- /// When the window frame has a fixed beginning (e.g UNBOUNDED
- /// PRECEDING), for some functions such as FIRST_VALUE, LAST_VALUE and
- /// NTH_VALUE we can memoize result. Once result is calculated it
- /// will always stay same. Hence, we do not need to keep past data
- /// as we process the entire dataset. This feature enables us to
- /// prune rows from table. The default implementation does nothing
+ /// When the window frame has a fixed beginning (e.g UNBOUNDED PRECEDING),
+ /// for some functions such as FIRST_VALUE, LAST_VALUE and NTH_VALUE, we
+ /// can memoize the result. Once result is calculated, it will always stay
+ /// same. Hence, we do not need to keep past data as we process the entire
+ /// dataset.
fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> {
let out = &state.out_col;
let size = out.len();
- let (is_prunable, is_last) = match self.state.kind {
+ let mut buffer_size = 1;
+ // Decide if we arrived at a final result yet:
+ let (is_prunable, is_reverse_direction) = match self.state.kind {
NthValueKind::First => {
let n_range =
state.window_frame_range.end -
state.window_frame_range.start;
@@ -162,16 +166,30 @@ impl PartitionEvaluator for NthValueEvaluator {
NthValueKind::Nth(n) => {
let n_range =
state.window_frame_range.end -
state.window_frame_range.start;
- (n_range >= (n as usize) && size >= (n as usize), false)
+ match n.cmp(&0) {
+ Ordering::Greater => {
+ (n_range >= (n as usize) && size > (n as usize), false)
+ }
+ Ordering::Less => {
+ let reverse_index = (-n) as usize;
+ buffer_size = reverse_index;
+ // Negative index represents reverse direction.
+ (n_range >= reverse_index, true)
+ }
+ Ordering::Equal => {
+ // The case n = 0 is not valid for the NTH_VALUE
function.
+ unreachable!();
+ }
+ }
}
};
if is_prunable {
- if self.state.finalized_result.is_none() && !is_last {
+ if self.state.finalized_result.is_none() && !is_reverse_direction {
let result = ScalarValue::try_from_array(out, size - 1)?;
self.state.finalized_result = Some(result);
}
state.window_frame_range.start =
- state.window_frame_range.end.saturating_sub(1);
+ state.window_frame_range.end.saturating_sub(buffer_size);
}
Ok(())
}
@@ -195,12 +213,33 @@ impl PartitionEvaluator for NthValueEvaluator {
NthValueKind::First => ScalarValue::try_from_array(arr,
range.start),
NthValueKind::Last => ScalarValue::try_from_array(arr,
range.end - 1),
NthValueKind::Nth(n) => {
- // We are certain that n > 0.
- let index = (n as usize) - 1;
- if index >= n_range {
- ScalarValue::try_from(arr.data_type())
- } else {
- ScalarValue::try_from_array(arr, range.start + index)
+ match n.cmp(&0) {
+ Ordering::Greater => {
+ // SQL indices are not 0-based.
+ let index = (n as usize) - 1;
+ if index >= n_range {
+ // Outside the range, return NULL:
+ ScalarValue::try_from(arr.data_type())
+ } else {
+ ScalarValue::try_from_array(arr, range.start +
index)
+ }
+ }
+ Ordering::Less => {
+ let reverse_index = (-n) as usize;
+ if n_range >= reverse_index {
+ ScalarValue::try_from_array(
+ arr,
+ range.start + n_range - reverse_index,
+ )
+ } else {
+ // Outside the range, return NULL:
+ ScalarValue::try_from(arr.data_type())
+ }
+ }
+ Ordering::Equal => {
+ // The case n = 0 is not valid for the NTH_VALUE
function.
+ unreachable!();
+ }
}
}
}
diff --git a/datafusion/physical-expr/src/window/window_expr.rs
b/datafusion/physical-expr/src/window/window_expr.rs
index b282e35797..4211a616e1 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -15,7 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
+use std::fmt::Debug;
+use std::ops::Range;
+use std::sync::Arc;
+
use crate::{PhysicalExpr, PhysicalSortExpr};
+
use arrow::array::{new_empty_array, Array, ArrayRef};
use arrow::compute::kernels::sort::SortColumn;
use arrow::compute::SortOptions;
@@ -25,13 +31,9 @@ use datafusion_common::{internal_err, DataFusionError,
Result, ScalarValue};
use datafusion_expr::window_state::{
PartitionBatchState, WindowAggState, WindowFrameContext,
};
-use datafusion_expr::PartitionEvaluator;
-use datafusion_expr::{Accumulator, WindowFrame};
+use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame};
+
use indexmap::IndexMap;
-use std::any::Any;
-use std::fmt::Debug;
-use std::ops::Range;
-use std::sync::Arc;
/// Common trait for [window function] implementations
///
@@ -292,7 +294,7 @@ pub struct NumRowsState {
pub enum NthValueKind {
First,
Last,
- Nth(u32),
+ Nth(i64),
}
#[derive(Debug, Clone)]
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index e5cd5e674c..b2c69b467e 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -570,5 +570,12 @@ pub fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) ->
bool {
.unwrap_or(true)
}
+/// Utility function yielding a string representation of the given
[`ExecutionPlan`].
+pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
+ let formatted = displayable(plan.as_ref()).indent(true).to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ actual.iter().map(|elem| elem.to_string()).collect()
+}
+
#[cfg(test)]
pub mod test;
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index fb679b0138..8156ab1fa3 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -1109,3 +1109,131 @@ fn get_aggregate_result_out_column(
result
.ok_or_else(|| DataFusionError::Execution("Should contain
something".to_string()))
}
+
+#[cfg(test)]
+mod tests {
+ use crate::common::collect;
+ use crate::memory::MemoryExec;
+ use crate::windows::{BoundedWindowAggExec, PartitionSearchMode};
+ use crate::{get_plan_string, ExecutionPlan};
+ use arrow_array::RecordBatch;
+ use arrow_schema::{DataType, Field, Schema};
+ use datafusion_common::{assert_batches_eq, Result, ScalarValue};
+ use datafusion_execution::config::SessionConfig;
+ use datafusion_execution::TaskContext;
+ use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+ use datafusion_physical_expr::expressions::col;
+ use datafusion_physical_expr::expressions::NthValue;
+ use datafusion_physical_expr::window::BuiltInWindowExpr;
+ use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
+ use std::sync::Arc;
+
+ // Tests NTH_VALUE(negative index) with memoize feature.
+ // To be able to trigger memoize feature for NTH_VALUE we need to
+ // - feed BoundedWindowAggExec with batch stream data.
+ // - Window frame should contain UNBOUNDED PRECEDING.
+ // It hard to ensure these conditions are met, from the sql query.
+ #[tokio::test]
+ async fn test_window_nth_value_bounded_memoize() -> Result<()> {
+ let config = SessionConfig::new().with_target_partitions(1);
+ let task_ctx =
Arc::new(TaskContext::default().with_session_config(config));
+
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ // Create a new batch of data to insert into the table
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
+ )?;
+
+ let memory_exec = MemoryExec::try_new(
+ &[vec![batch.clone(), batch.clone(), batch.clone()]],
+ schema.clone(),
+ None,
+ )
+ .map(|e| Arc::new(e) as Arc<dyn ExecutionPlan>)?;
+ let col_a = col("a", &schema)?;
+ let nth_value_func1 =
+ NthValue::nth("nth_value(-1)", col_a.clone(), DataType::Int32, 1)?
+ .reverse_expr()
+ .unwrap();
+ let nth_value_func2 =
+ NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2)?
+ .reverse_expr()
+ .unwrap();
+ let last_value_func =
+ Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32))
as _;
+ let window_exprs = vec![
+ // LAST_VALUE(a)
+ Arc::new(BuiltInWindowExpr::new(
+ last_value_func,
+ &[],
+ &[],
+ Arc::new(WindowFrame {
+ units: WindowFrameUnits::Rows,
+ start_bound:
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+ end_bound: WindowFrameBound::CurrentRow,
+ }),
+ )) as _,
+ // NTH_VALUE(a, -1)
+ Arc::new(BuiltInWindowExpr::new(
+ nth_value_func1,
+ &[],
+ &[],
+ Arc::new(WindowFrame {
+ units: WindowFrameUnits::Rows,
+ start_bound:
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+ end_bound: WindowFrameBound::CurrentRow,
+ }),
+ )) as _,
+ // NTH_VALUE(a, -2)
+ Arc::new(BuiltInWindowExpr::new(
+ nth_value_func2,
+ &[],
+ &[],
+ Arc::new(WindowFrame {
+ units: WindowFrameUnits::Rows,
+ start_bound:
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+ end_bound: WindowFrameBound::CurrentRow,
+ }),
+ )) as _,
+ ];
+ let physical_plan = BoundedWindowAggExec::try_new(
+ window_exprs,
+ memory_exec,
+ vec![],
+ PartitionSearchMode::Sorted,
+ )
+ .map(|e| Arc::new(e) as Arc<dyn ExecutionPlan>)?;
+
+ let batches = collect(physical_plan.execute(0, task_ctx)?).await?;
+
+ let expected = vec![
+ "BoundedWindowAggExec: wdw=[last: Ok(Field { name: \"last\",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: CurrentRow }, nth_value(-1): Ok(Field { name: \"nth_value(-1)\",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: CurrentRow }, [...]
+ " MemoryExec: partitions=1, partition_sizes=[3]",
+ ];
+ // Get string representation of the plan
+ let actual = get_plan_string(&physical_plan);
+ assert_eq!(
+ expected, actual,
+ "\n**Optimized Plan
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let expected = [
+ "+---+------+---------------+---------------+",
+ "| a | last | nth_value(-1) | nth_value(-2) |",
+ "+---+------+---------------+---------------+",
+ "| 1 | 1 | 1 | |",
+ "| 2 | 2 | 2 | 1 |",
+ "| 3 | 3 | 3 | 2 |",
+ "| 1 | 1 | 1 | 3 |",
+ "| 2 | 2 | 2 | 1 |",
+ "| 3 | 3 | 3 | 2 |",
+ "| 1 | 1 | 1 | 3 |",
+ "| 2 | 2 | 2 | 1 |",
+ "| 3 | 3 | 3 | 2 |",
+ "+---+------+---------------+---------------+",
+ ];
+ assert_batches_eq!(expected, &batches);
+ Ok(())
+ }
+}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 44864be947..ea00b726b9 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -27,11 +27,11 @@ use crate::protobuf::{
physical_aggregate_expr_node, PhysicalSortExprNode,
PhysicalSortExprNodeCollection,
ScalarValue,
};
+
use datafusion::datasource::{
- file_format::json::JsonSink, physical_plan::FileScanConfig,
-};
-use datafusion::datasource::{
+ file_format::json::JsonSink,
listing::{FileRange, PartitionedFile},
+ physical_plan::FileScanConfig,
physical_plan::FileSinkConfig,
};
use datafusion::logical_expr::BuiltinScalarFunction;
@@ -180,7 +180,7 @@ impl TryFrom<Arc<dyn WindowExpr>> for
protobuf::PhysicalWindowExprNode {
args.insert(
1,
Arc::new(Literal::new(
- datafusion_common::ScalarValue::Int64(Some(n
as i64)),
+ datafusion_common::ScalarValue::Int64(Some(n)),
)),
);
protobuf::BuiltInWindowFunction::NthValue
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 4edac211b3..55b8843a0b 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3493,6 +3493,56 @@ select sum(1) over() x, sum(1) over () y
----
1 1
+# NTH_VALUE requirement is c DESC, However existing ordering is c ASC
+# if we reverse window expression: "NTH_VALUE(c, 2) OVER(order by c DESC ) as
nv1"
+# as "NTH_VALUE(c, -2) OVER(order by c ASC RANGE BETWEEN CURRENT ROW AND
UNBOUNDED FOLLOWING) as nv1"
+# Please note that: "NTH_VALUE(c, 2) OVER(order by c DESC ) as nv1" is same
with
+# "NTH_VALUE(c, 2) OVER(order by c DESC RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW) as nv1" "
+# we can produce same result without re-sorting the table.
+# Unfortunately since window expression names are string, this change is not
seen the plan (we do not do string manipulation).
+# TODO: Reflect window expression reversal in the plans.
+query TT
+EXPLAIN SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1
+ FROM multiple_ordered_table
+ ORDER BY c ASC
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: multiple_ordered_table.c ASC NULLS LAST, fetch=5
+----Projection: multiple_ordered_table.c,
NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS nv1
+------WindowAggr: windowExpr=[[NTH_VALUE(multiple_ordered_table.c, Int64(2))
ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
+--------TableScan: multiple_ordered_table projection=[c]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--ProjectionExec: expr=[c@0 as c, NTH_VALUE(multiple_ordered_table.c,Int64(2))
ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW@1 as nv1]
+----WindowAggExec: wdw=[NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY
[multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "NTH_VALUE(multiple_ordered_table.c,Int64(2))
ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: CurrentRow, end_ [...]
+------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+query II
+SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1
+ FROM multiple_ordered_table
+ ORDER BY c ASC
+ LIMIT 5
+----
+0 98
+1 98
+2 98
+3 98
+4 98
+
+query II
+SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1
+ FROM multiple_ordered_table
+ ORDER BY c DESC
+ LIMIT 5
+----
+99 NULL
+98 98
+97 98
+96 98
+95 98
+
statement ok
set datafusion.execution.target_partitions = 2;