This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e21b031545 NTH_VALUE reverse support (#8327)
e21b031545 is described below

commit e21b03154511cd61e03e299a595db6be6b1852c1
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Nov 29 08:48:26 2023 +0300

    NTH_VALUE reverse support (#8327)
    
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../src/physical_optimizer/enforce_distribution.rs |   6 +-
 .../core/src/physical_optimizer/enforce_sorting.rs |   3 +-
 .../src/physical_optimizer/projection_pushdown.rs  |   3 +-
 .../replace_with_order_preserving_variants.rs      |   9 +-
 datafusion/core/src/physical_optimizer/utils.rs    |   9 +-
 datafusion/physical-expr/src/window/nth_value.rs   |  89 ++++++++++----
 datafusion/physical-expr/src/window/window_expr.rs |  16 +--
 datafusion/physical-plan/src/lib.rs                |   7 ++
 .../src/windows/bounded_window_agg_exec.rs         | 128 +++++++++++++++++++++
 datafusion/proto/src/physical_plan/to_proto.rs     |   8 +-
 datafusion/sqllogictest/test_files/window.slt      |  50 ++++++++
 11 files changed, 269 insertions(+), 59 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index a34958a6c9..4befea741c 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -28,8 +28,8 @@ use std::sync::Arc;
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::{
-    add_sort_above, get_children_exectrees, get_plan_string, 
is_coalesce_partitions,
-    is_repartition, is_sort_preserving_merge, ExecTree,
+    add_sort_above, get_children_exectrees, is_coalesce_partitions, 
is_repartition,
+    is_sort_preserving_merge, ExecTree,
 };
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
@@ -54,8 +54,8 @@ use 
datafusion_physical_expr::utils::map_columns_before_projection;
 use datafusion_physical_expr::{
     physical_exprs_equal, EquivalenceProperties, PhysicalExpr,
 };
-use datafusion_physical_plan::unbounded_output;
 use datafusion_physical_plan::windows::{get_best_fitting_window, 
BoundedWindowAggExec};
+use datafusion_physical_plan::{get_plan_string, unbounded_output};
 
 use itertools::izip;
 
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 6fec74f608..ff052b5f04 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -765,9 +765,8 @@ mod tests {
         repartition_exec, sort_exec, sort_expr, sort_expr_options, 
sort_merge_join_exec,
         sort_preserving_merge_exec, spr_repartition_exec, union_exec,
     };
-    use crate::physical_optimizer::utils::get_plan_string;
     use crate::physical_plan::repartition::RepartitionExec;
-    use crate::physical_plan::{displayable, Partitioning};
+    use crate::physical_plan::{displayable, get_plan_string, Partitioning};
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::csv_exec_sorted;
 
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index c0e512ffe5..7ebb64ab85 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1130,7 +1130,6 @@ mod tests {
     use crate::physical_optimizer::projection_pushdown::{
         join_table_borders, update_expr, ProjectionPushdown,
     };
-    use crate::physical_optimizer::utils::get_plan_string;
     use crate::physical_optimizer::PhysicalOptimizerRule;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::filter::FilterExec;
@@ -1141,7 +1140,7 @@ mod tests {
     use crate::physical_plan::repartition::RepartitionExec;
     use crate::physical_plan::sorts::sort::SortExec;
     use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
-    use crate::physical_plan::ExecutionPlan;
+    use crate::physical_plan::{get_plan_string, ExecutionPlan};
 
     use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
     use datafusion_common::config::ConfigOptions;
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 5f130848de..09274938cb 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -286,7 +286,7 @@ mod tests {
     use crate::physical_plan::repartition::RepartitionExec;
     use crate::physical_plan::sorts::sort::SortExec;
     use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
-    use crate::physical_plan::{displayable, Partitioning};
+    use crate::physical_plan::{displayable, get_plan_string, Partitioning};
     use crate::prelude::SessionConfig;
 
     use arrow::compute::SortOptions;
@@ -958,11 +958,4 @@ mod tests {
             FileCompressionType::UNCOMPRESSED,
         ))
     }
-
-    // Util function to get string representation of a physical plan
-    fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
-        let formatted = displayable(plan.as_ref()).indent(true).to_string();
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        actual.iter().map(|elem| elem.to_string()).collect()
-    }
 }
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index 530df374ca..fccc1db0d3 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -28,7 +28,7 @@ use crate::physical_plan::sorts::sort::SortExec;
 use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
-use crate::physical_plan::{displayable, ExecutionPlan};
+use crate::physical_plan::{get_plan_string, ExecutionPlan};
 
 use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement};
 
@@ -154,10 +154,3 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
 pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
     plan.as_any().is::<RepartitionExec>()
 }
-
-/// Utility function yielding a string representation of the given 
[`ExecutionPlan`].
-pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
-    let formatted = displayable(plan.as_ref()).indent(true).to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    actual.iter().map(|elem| elem.to_string()).collect()
-}
diff --git a/datafusion/physical-expr/src/window/nth_value.rs 
b/datafusion/physical-expr/src/window/nth_value.rs
index 262a50969b..b3c89122eb 100644
--- a/datafusion/physical-expr/src/window/nth_value.rs
+++ b/datafusion/physical-expr/src/window/nth_value.rs
@@ -15,21 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines physical expressions for `first_value`, `last_value`, and 
`nth_value`
-//! that can evaluated at runtime during query execution
+//! Defines physical expressions for `FIRST_VALUE`, `LAST_VALUE`, and 
`NTH_VALUE`
+//! functions that can be evaluated at run time during query execution.
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::ops::Range;
+use std::sync::Arc;
 
 use crate::window::window_expr::{NthValueKind, NthValueState};
 use crate::window::BuiltInWindowFunctionExpr;
 use crate::PhysicalExpr;
+
 use arrow::array::{Array, ArrayRef};
 use arrow::datatypes::{DataType, Field};
 use datafusion_common::{exec_err, ScalarValue};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::window_state::WindowAggState;
 use datafusion_expr::PartitionEvaluator;
-use std::any::Any;
-use std::ops::Range;
-use std::sync::Arc;
 
 /// nth_value expression
 #[derive(Debug)]
@@ -77,17 +80,17 @@ impl NthValue {
         n: u32,
     ) -> Result<Self> {
         match n {
-            0 => exec_err!("nth_value expect n to be > 0"),
+            0 => exec_err!("NTH_VALUE expects n to be non-zero"),
             _ => Ok(Self {
                 name: name.into(),
                 expr,
                 data_type,
-                kind: NthValueKind::Nth(n),
+                kind: NthValueKind::Nth(n as i64),
             }),
         }
     }
 
-    /// Get nth_value kind
+    /// Get the NTH_VALUE kind
     pub fn get_kind(&self) -> NthValueKind {
         self.kind
     }
@@ -125,7 +128,7 @@ impl BuiltInWindowFunctionExpr for NthValue {
         let reversed_kind = match self.kind {
             NthValueKind::First => NthValueKind::Last,
             NthValueKind::Last => NthValueKind::First,
-            NthValueKind::Nth(_) => return None,
+            NthValueKind::Nth(idx) => NthValueKind::Nth(-idx),
         };
         Some(Arc::new(Self {
             name: self.name.clone(),
@@ -143,16 +146,17 @@ pub(crate) struct NthValueEvaluator {
 }
 
 impl PartitionEvaluator for NthValueEvaluator {
-    /// When the window frame has a fixed beginning (e.g UNBOUNDED
-    /// PRECEDING), for some functions such as FIRST_VALUE, LAST_VALUE and
-    /// NTH_VALUE we can memoize result.  Once result is calculated it
-    /// will always stay same. Hence, we do not need to keep past data
-    /// as we process the entire dataset. This feature enables us to
-    /// prune rows from table. The default implementation does nothing
+    /// When the window frame has a fixed beginning (e.g UNBOUNDED PRECEDING),
+    /// for some functions such as FIRST_VALUE, LAST_VALUE and NTH_VALUE, we
+    /// can memoize the result.  Once result is calculated, it will always stay
+    /// same. Hence, we do not need to keep past data as we process the entire
+    /// dataset.
     fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> {
         let out = &state.out_col;
         let size = out.len();
-        let (is_prunable, is_last) = match self.state.kind {
+        let mut buffer_size = 1;
+        // Decide if we arrived at a final result yet:
+        let (is_prunable, is_reverse_direction) = match self.state.kind {
             NthValueKind::First => {
                 let n_range =
                     state.window_frame_range.end - 
state.window_frame_range.start;
@@ -162,16 +166,30 @@ impl PartitionEvaluator for NthValueEvaluator {
             NthValueKind::Nth(n) => {
                 let n_range =
                     state.window_frame_range.end - 
state.window_frame_range.start;
-                (n_range >= (n as usize) && size >= (n as usize), false)
+                match n.cmp(&0) {
+                    Ordering::Greater => {
+                        (n_range >= (n as usize) && size > (n as usize), false)
+                    }
+                    Ordering::Less => {
+                        let reverse_index = (-n) as usize;
+                        buffer_size = reverse_index;
+                        // Negative index represents reverse direction.
+                        (n_range >= reverse_index, true)
+                    }
+                    Ordering::Equal => {
+                        // The case n = 0 is not valid for the NTH_VALUE 
function.
+                        unreachable!();
+                    }
+                }
             }
         };
         if is_prunable {
-            if self.state.finalized_result.is_none() && !is_last {
+            if self.state.finalized_result.is_none() && !is_reverse_direction {
                 let result = ScalarValue::try_from_array(out, size - 1)?;
                 self.state.finalized_result = Some(result);
             }
             state.window_frame_range.start =
-                state.window_frame_range.end.saturating_sub(1);
+                state.window_frame_range.end.saturating_sub(buffer_size);
         }
         Ok(())
     }
@@ -195,12 +213,33 @@ impl PartitionEvaluator for NthValueEvaluator {
                 NthValueKind::First => ScalarValue::try_from_array(arr, 
range.start),
                 NthValueKind::Last => ScalarValue::try_from_array(arr, 
range.end - 1),
                 NthValueKind::Nth(n) => {
-                    // We are certain that n > 0.
-                    let index = (n as usize) - 1;
-                    if index >= n_range {
-                        ScalarValue::try_from(arr.data_type())
-                    } else {
-                        ScalarValue::try_from_array(arr, range.start + index)
+                    match n.cmp(&0) {
+                        Ordering::Greater => {
+                            // SQL indices are not 0-based.
+                            let index = (n as usize) - 1;
+                            if index >= n_range {
+                                // Outside the range, return NULL:
+                                ScalarValue::try_from(arr.data_type())
+                            } else {
+                                ScalarValue::try_from_array(arr, range.start + 
index)
+                            }
+                        }
+                        Ordering::Less => {
+                            let reverse_index = (-n) as usize;
+                            if n_range >= reverse_index {
+                                ScalarValue::try_from_array(
+                                    arr,
+                                    range.start + n_range - reverse_index,
+                                )
+                            } else {
+                                // Outside the range, return NULL:
+                                ScalarValue::try_from(arr.data_type())
+                            }
+                        }
+                        Ordering::Equal => {
+                            // The case n = 0 is not valid for the NTH_VALUE 
function.
+                            unreachable!();
+                        }
                     }
                 }
             }
diff --git a/datafusion/physical-expr/src/window/window_expr.rs 
b/datafusion/physical-expr/src/window/window_expr.rs
index b282e35797..4211a616e1 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -15,7 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::any::Any;
+use std::fmt::Debug;
+use std::ops::Range;
+use std::sync::Arc;
+
 use crate::{PhysicalExpr, PhysicalSortExpr};
+
 use arrow::array::{new_empty_array, Array, ArrayRef};
 use arrow::compute::kernels::sort::SortColumn;
 use arrow::compute::SortOptions;
@@ -25,13 +31,9 @@ use datafusion_common::{internal_err, DataFusionError, 
Result, ScalarValue};
 use datafusion_expr::window_state::{
     PartitionBatchState, WindowAggState, WindowFrameContext,
 };
-use datafusion_expr::PartitionEvaluator;
-use datafusion_expr::{Accumulator, WindowFrame};
+use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame};
+
 use indexmap::IndexMap;
-use std::any::Any;
-use std::fmt::Debug;
-use std::ops::Range;
-use std::sync::Arc;
 
 /// Common trait for [window function] implementations
 ///
@@ -292,7 +294,7 @@ pub struct NumRowsState {
 pub enum NthValueKind {
     First,
     Last,
-    Nth(u32),
+    Nth(i64),
 }
 
 #[derive(Debug, Clone)]
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index e5cd5e674c..b2c69b467e 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -570,5 +570,12 @@ pub fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> 
bool {
         .unwrap_or(true)
 }
 
+/// Utility function yielding a string representation of the given 
[`ExecutionPlan`].
+pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
+    let formatted = displayable(plan.as_ref()).indent(true).to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    actual.iter().map(|elem| elem.to_string()).collect()
+}
+
 #[cfg(test)]
 pub mod test;
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index fb679b0138..8156ab1fa3 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -1109,3 +1109,131 @@ fn get_aggregate_result_out_column(
     result
         .ok_or_else(|| DataFusionError::Execution("Should contain 
something".to_string()))
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::common::collect;
+    use crate::memory::MemoryExec;
+    use crate::windows::{BoundedWindowAggExec, PartitionSearchMode};
+    use crate::{get_plan_string, ExecutionPlan};
+    use arrow_array::RecordBatch;
+    use arrow_schema::{DataType, Field, Schema};
+    use datafusion_common::{assert_batches_eq, Result, ScalarValue};
+    use datafusion_execution::config::SessionConfig;
+    use datafusion_execution::TaskContext;
+    use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+    use datafusion_physical_expr::expressions::col;
+    use datafusion_physical_expr::expressions::NthValue;
+    use datafusion_physical_expr::window::BuiltInWindowExpr;
+    use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
+    use std::sync::Arc;
+
+    // Tests NTH_VALUE(negative index) with memoize feature.
+    // To be able to trigger memoize feature for NTH_VALUE we need to
+    // - feed BoundedWindowAggExec with batch stream data.
+    // - Window frame should contain UNBOUNDED PRECEDING.
+    // It hard to ensure these conditions are met, from the sql query.
+    #[tokio::test]
+    async fn test_window_nth_value_bounded_memoize() -> Result<()> {
+        let config = SessionConfig::new().with_target_partitions(1);
+        let task_ctx = 
Arc::new(TaskContext::default().with_session_config(config));
+
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int32, false)]));
+        // Create a new batch of data to insert into the table
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
+        )?;
+
+        let memory_exec = MemoryExec::try_new(
+            &[vec![batch.clone(), batch.clone(), batch.clone()]],
+            schema.clone(),
+            None,
+        )
+        .map(|e| Arc::new(e) as Arc<dyn ExecutionPlan>)?;
+        let col_a = col("a", &schema)?;
+        let nth_value_func1 =
+            NthValue::nth("nth_value(-1)", col_a.clone(), DataType::Int32, 1)?
+                .reverse_expr()
+                .unwrap();
+        let nth_value_func2 =
+            NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2)?
+                .reverse_expr()
+                .unwrap();
+        let last_value_func =
+            Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32)) 
as _;
+        let window_exprs = vec![
+            // LAST_VALUE(a)
+            Arc::new(BuiltInWindowExpr::new(
+                last_value_func,
+                &[],
+                &[],
+                Arc::new(WindowFrame {
+                    units: WindowFrameUnits::Rows,
+                    start_bound: 
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+                    end_bound: WindowFrameBound::CurrentRow,
+                }),
+            )) as _,
+            // NTH_VALUE(a, -1)
+            Arc::new(BuiltInWindowExpr::new(
+                nth_value_func1,
+                &[],
+                &[],
+                Arc::new(WindowFrame {
+                    units: WindowFrameUnits::Rows,
+                    start_bound: 
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+                    end_bound: WindowFrameBound::CurrentRow,
+                }),
+            )) as _,
+            // NTH_VALUE(a, -2)
+            Arc::new(BuiltInWindowExpr::new(
+                nth_value_func2,
+                &[],
+                &[],
+                Arc::new(WindowFrame {
+                    units: WindowFrameUnits::Rows,
+                    start_bound: 
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+                    end_bound: WindowFrameBound::CurrentRow,
+                }),
+            )) as _,
+        ];
+        let physical_plan = BoundedWindowAggExec::try_new(
+            window_exprs,
+            memory_exec,
+            vec![],
+            PartitionSearchMode::Sorted,
+        )
+        .map(|e| Arc::new(e) as Arc<dyn ExecutionPlan>)?;
+
+        let batches = collect(physical_plan.execute(0, task_ctx)?).await?;
+
+        let expected = vec![
+            "BoundedWindowAggExec: wdw=[last: Ok(Field { name: \"last\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: CurrentRow }, nth_value(-1): Ok(Field { name: \"nth_value(-1)\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: CurrentRow },  [...]
+            "  MemoryExec: partitions=1, partition_sizes=[3]",
+        ];
+        // Get string representation of the plan
+        let actual = get_plan_string(&physical_plan);
+        assert_eq!(
+            expected, actual,
+            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
+
+        let expected = [
+            "+---+------+---------------+---------------+",
+            "| a | last | nth_value(-1) | nth_value(-2) |",
+            "+---+------+---------------+---------------+",
+            "| 1 | 1    | 1             |               |",
+            "| 2 | 2    | 2             | 1             |",
+            "| 3 | 3    | 3             | 2             |",
+            "| 1 | 1    | 1             | 3             |",
+            "| 2 | 2    | 2             | 1             |",
+            "| 3 | 3    | 3             | 2             |",
+            "| 1 | 1    | 1             | 3             |",
+            "| 2 | 2    | 2             | 1             |",
+            "| 3 | 3    | 3             | 2             |",
+            "+---+------+---------------+---------------+",
+        ];
+        assert_batches_eq!(expected, &batches);
+        Ok(())
+    }
+}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 44864be947..ea00b726b9 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -27,11 +27,11 @@ use crate::protobuf::{
     physical_aggregate_expr_node, PhysicalSortExprNode, 
PhysicalSortExprNodeCollection,
     ScalarValue,
 };
+
 use datafusion::datasource::{
-    file_format::json::JsonSink, physical_plan::FileScanConfig,
-};
-use datafusion::datasource::{
+    file_format::json::JsonSink,
     listing::{FileRange, PartitionedFile},
+    physical_plan::FileScanConfig,
     physical_plan::FileSinkConfig,
 };
 use datafusion::logical_expr::BuiltinScalarFunction;
@@ -180,7 +180,7 @@ impl TryFrom<Arc<dyn WindowExpr>> for 
protobuf::PhysicalWindowExprNode {
                         args.insert(
                             1,
                             Arc::new(Literal::new(
-                                datafusion_common::ScalarValue::Int64(Some(n 
as i64)),
+                                datafusion_common::ScalarValue::Int64(Some(n)),
                             )),
                         );
                         protobuf::BuiltInWindowFunction::NthValue
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 4edac211b3..55b8843a0b 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3493,6 +3493,56 @@ select sum(1) over() x, sum(1) over () y
 ----
 1 1
 
+# NTH_VALUE requirement is c DESC, However existing ordering is c ASC
+# if we reverse window expression: "NTH_VALUE(c, 2) OVER(order by c DESC ) as 
nv1"
+# as "NTH_VALUE(c, -2) OVER(order by c ASC RANGE BETWEEN CURRENT ROW AND 
UNBOUNDED FOLLOWING) as nv1"
+# Please note that: "NTH_VALUE(c, 2) OVER(order by c DESC ) as nv1" is same 
with
+#                   "NTH_VALUE(c, 2) OVER(order by c DESC RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW) as nv1" "
+# we can produce same result without re-sorting the table.
+# Unfortunately since window expression names are string, this change is not 
seen the plan (we do not do string manipulation).
+# TODO: Reflect window expression reversal in the plans.
+query TT
+EXPLAIN SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1
+  FROM multiple_ordered_table
+  ORDER BY c ASC
+  LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: multiple_ordered_table.c ASC NULLS LAST, fetch=5
+----Projection: multiple_ordered_table.c, 
NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS nv1
+------WindowAggr: windowExpr=[[NTH_VALUE(multiple_ordered_table.c, Int64(2)) 
ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
+--------TableScan: multiple_ordered_table projection=[c]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--ProjectionExec: expr=[c@0 as c, NTH_VALUE(multiple_ordered_table.c,Int64(2)) 
ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@1 as nv1]
+----WindowAggExec: wdw=[NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY 
[multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "NTH_VALUE(multiple_ordered_table.c,Int64(2)) 
ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: CurrentRow, end_ [...]
+------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+query II
+SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1
+  FROM multiple_ordered_table
+  ORDER BY c ASC
+  LIMIT 5
+----
+0 98
+1 98
+2 98
+3 98
+4 98
+
+query II
+SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1
+  FROM multiple_ordered_table
+  ORDER BY c DESC
+  LIMIT 5
+----
+99 NULL
+98 98
+97 98
+96 98
+95 98
+
 statement ok
 set datafusion.execution.target_partitions = 2;
 

Reply via email to