This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d556851da [improve](analytic) support window function execute with 
incremental mode (#52138)
d4d556851da is described below

commit d4d556851dad1cfb57d77f7d57855340830d9a4d
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Jul 7 10:03:25 2025 +0800

    [improve](analytic) support window function execute with incremental mode 
(#52138)
    
    ### What problem does this PR solve?
    Problem Summary:
    ```
        some agg function used as  window function  could support incremental 
mode,
        eg: sum(col) over (rows between 3 preceding and 3 following),
        before the sum[i] need a for loop[current - 3, current + 3), sum all of 
data
        but now could resue the previous result, sum[i] = sum[i-1] - col[x] + 
col[y],
        so we only need one sub and one add at the sum[i-1] res data,
        the test performance will be add soon.
    ```
---
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  50 ++--
 be/src/pipeline/exec/analytic_sink_operator.h      |   5 +-
 .../vec/aggregate_functions/aggregate_function.h   |  52 +++-
 .../aggregate_functions/aggregate_function_avg.h   |  77 +++++-
 .../aggregate_functions/aggregate_function_count.h |  46 ++++
 .../aggregate_function_java_udaf.h                 |  11 +-
 .../aggregate_function_min_max.h                   |  98 ++++++++
 .../aggregate_functions/aggregate_function_null.h  | 106 +++++++-
 .../aggregate_function_reader_first_last.h         |   2 +-
 .../aggregate_functions/aggregate_function_sum.h   |  58 +++++
 .../aggregate_function_window.h                    |  14 +-
 .../window_functions/test_window_fn.out            | Bin 7822 -> 16313 bytes
 .../window_functions/test_window_fn.groovy         | 273 ++++++++++++++++++++-
 13 files changed, 752 insertions(+), 40 deletions(-)

diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 455f41fcd51..bf9bb08aca0 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -70,7 +70,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
             _executor.get_next_impl = 
&AnalyticSinkLocalState::_get_next_for_sliding_rows;
         }
         _streaming_mode = true;
-
+        _support_incremental_calculate = (p._has_window_start && 
p._has_window_end);
         if (p._has_window_start) { //calculate start boundary
             TAnalyticWindowBoundary b = p._window.window_start;
             if (b.__isset.rows_offset_value) { //[offset     ,   ]
@@ -114,6 +114,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
     _offsets_of_aggregate_states.resize(_agg_functions_size);
     _result_column_nullable_flags.resize(_agg_functions_size);
     _result_column_could_resize.resize(_agg_functions_size);
+    _use_null_result.resize(_agg_functions_size, 0);
+    _could_use_previous_result.resize(_agg_functions_size, 0);
 
     for (int i = 0; i < _agg_functions_size; ++i) {
         _agg_functions[i] = p._agg_functions[i]->clone(state, 
state->obj_pool());
@@ -132,6 +134,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
         if 
(PARTITION_FUNCTION_SET.contains(_agg_functions[i]->function()->get_name())) {
             _streaming_mode = false;
         }
+        _support_incremental_calculate &=
+                _agg_functions[i]->function()->supported_incremental_mode();
     }
 
     _partition_exprs_size = p._partition_by_eq_expr_ctxs.size();
@@ -185,8 +189,6 @@ Status AnalyticSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     return PipelineXSinkLocalState<AnalyticSharedState>::close(state, 
exec_status);
 }
 
-//TODO: eg sum/avg/count/min/max ROWS BETWEEN N PRECEDING AND M FOLLOWING
-//maybe could be optimized caculate at cumulative mode
 bool AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t 
current_block_rows,
                                                         int64_t 
current_block_base_pos) {
     const bool is_n_following_frame = _rows_end_offset > 0;
@@ -199,13 +201,18 @@ bool 
AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t current_block_ro
             _need_more_data = true;
             break;
         }
-        _reset_agg_status();
+        if (_support_incremental_calculate) {
+            _execute_for_function<true>(_partition_by_pose.start, 
_partition_by_pose.end,
+                                        current_row_start, current_row_end);
+        } else {
+            _reset_agg_status();
+            // Eg: rows between unbounded preceding and 10 preceding
+            // Make sure range_start <= range_end
+            current_row_start = std::min(current_row_start, current_row_end);
+            _execute_for_function(_partition_by_pose.start, 
_partition_by_pose.end,
+                                  current_row_start, current_row_end);
+        }
 
-        // Eg: rows between unbounded preceding and 10 preceding
-        // Make sure range_start <= range_end
-        current_row_start = std::min(current_row_start, current_row_end);
-        _execute_for_function(_partition_by_pose.start, 
_partition_by_pose.end, current_row_start,
-                              current_row_end);
         int64_t pos = current_pos_in_block();
         _insert_result_info(pos, pos + 1);
         _current_row_position++;
@@ -366,21 +373,27 @@ Status AnalyticSinkLocalState::_execute_impl() {
     return Status::OK();
 }
 
+template <bool incremental>
 void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, 
int64_t partition_end,
                                                    int64_t frame_start, 
int64_t frame_end) {
     // here is the core function, should not add timer
     for (size_t i = 0; i < _agg_functions_size; ++i) {
-        if (_result_column_nullable_flags[i] && _current_window_empty) {
-            continue;
-        }
         std::vector<const vectorized::IColumn*> agg_columns;
         for (int j = 0; j < _agg_input_columns[i].size(); ++j) {
             agg_columns.push_back(_agg_input_columns[i][j].get());
         }
-        _agg_functions[i]->function()->add_range_single_place(
-                partition_start, partition_end, frame_start, frame_end,
-                _fn_place_ptr + _offsets_of_aggregate_states[i], 
agg_columns.data(),
-                _agg_arena_pool.get());
+        if constexpr (incremental) {
+            _agg_functions[i]->function()->execute_function_with_incremental(
+                    partition_start, partition_end, frame_start, frame_end,
+                    _fn_place_ptr + _offsets_of_aggregate_states[i], 
agg_columns.data(),
+                    _agg_arena_pool.get(), false, false, false, 
&_use_null_result[i],
+                    &_could_use_previous_result[i]);
+        } else {
+            _agg_functions[i]->function()->add_range_single_place(
+                    partition_start, partition_end, frame_start, frame_end,
+                    _fn_place_ptr + _offsets_of_aggregate_states[i], 
agg_columns.data(),
+                    _agg_arena_pool.get(), &(_use_null_result[i]), 
&_could_use_previous_result[i]);
+        }
     }
 }
 
@@ -388,8 +401,7 @@ void AnalyticSinkLocalState::_insert_result_info(int64_t 
start, int64_t end) {
     // here is the core function, should not add timer
     for (size_t i = 0; i < _agg_functions_size; ++i) {
         if (_result_column_nullable_flags[i]) {
-            if (_current_window_empty) {
-                //TODO need check this logical???
+            if (_use_null_result[i]) {
                 _result_window_columns[i]->insert_many_defaults(end - start);
             } else {
                 auto* dst =
@@ -898,6 +910,8 @@ Status 
AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
 }
 
 void AnalyticSinkLocalState::_reset_agg_status() {
+    _use_null_result.assign(_agg_functions_size, 0);
+    _could_use_previous_result.assign(_agg_functions_size, 0);
     for (size_t i = 0; i < _agg_functions_size; ++i) {
         _agg_functions[i]->reset(_fn_place_ptr + 
_offsets_of_aggregate_states[i]);
     }
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 45b870eacfb..73fb1ef386d 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -88,6 +88,7 @@ private:
     bool _get_next_for_sliding_rows(int64_t current_block_rows, int64_t 
current_block_base_pos);
 
     void _init_result_columns();
+    template <bool incremental = false>
     void _execute_for_function(int64_t partition_start, int64_t partition_end, 
int64_t frame_start,
                                int64_t frame_end);
     void _insert_result_info(int64_t start, int64_t end);
@@ -144,8 +145,10 @@ private:
     };
     executor _executor;
 
-    bool _current_window_empty = false;
+    std::vector<uint8_t> _use_null_result;
+    std::vector<uint8_t> _could_use_previous_result;
     bool _streaming_mode = false;
+    bool _support_incremental_calculate = true;
     bool _need_more_data = false;
     int64_t _current_row_position = 0;
     int64_t _output_block_index = 0;
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h 
b/be/src/vec/aggregate_functions/aggregate_function.h
index f4b98052e29..b6304212c49 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -211,7 +211,8 @@ public:
     virtual void add_range_single_place(int64_t partition_start, int64_t 
partition_end,
                                         int64_t frame_start, int64_t frame_end,
                                         AggregateDataPtr place, const 
IColumn** columns,
-                                        Arena*) const = 0;
+                                        Arena* arena, UInt8* use_null_result,
+                                        UInt8* could_use_previous_result) 
const = 0;
 
     virtual void streaming_agg_serialize(const IColumn** columns, 
BufferWritable& buf,
                                          const size_t num_rows, Arena*) const 
= 0;
@@ -246,6 +247,41 @@ public:
         }
     }
 
+    /// some agg function like sum/count/avg/min/max could support incremental 
mode,
+    /// eg sum(col) over (rows between 3 preceding and 3 following), could 
resue the previous result
+    /// sum[i] = sum[i-1] - col[x] + col[y]
+    virtual bool supported_incremental_mode() const { return false; }
+
+    /**
+    * Executes the aggregate function in incremental mode.
+    * This is a virtual function that should be overridden by aggregate 
functions supporting incremental calculation.
+    * 
+    * @param partition_start Start position of the current partition 
(inclusive)
+    * @param partition_end End position of the current partition (exclusive)
+    * @param frame_start Start position of the current window frame (inclusive)
+    * @param frame_end End position of the current window frame (exclusive)
+    * @param place Memory location to store aggregation results
+    * @param columns Input columns for aggregation
+    * @param arena Memory pool for allocations
+    * @param previous_is_nul Whether previous value is NULL, if true, no need 
to subtract previous value
+    * @param end_is_nul Whether the end boundary is NULL, if true, no need to 
add end value
+    * @param has_null Whether the current column contains NULL values
+    * @param use_null_result Output: whether to use NULL as result when the 
frame is empty
+    * @param could_use_previous_result Output: whether previous result can be 
reused
+    * @throws doris::Exception when called on a function that doesn't support 
incremental mode
+    */
+    virtual void execute_function_with_incremental(int64_t partition_start, 
int64_t partition_end,
+                                                   int64_t frame_start, 
int64_t frame_end,
+                                                   AggregateDataPtr place, 
const IColumn** columns,
+                                                   Arena* arena, bool 
previous_is_nul,
+                                                   bool end_is_nul, bool 
has_null,
+                                                   UInt8* use_null_result,
+                                                   UInt8* 
could_use_previous_result) const {
+        throw doris::Exception(Status::FatalError(
+                "Aggregate function " + get_name() +
+                " does not support cumulative mode, but it is called in 
cumulative mode"));
+    }
+
 protected:
     DataTypes argument_types;
     int version {};
@@ -318,17 +354,25 @@ public:
             derived->add(place, columns, i, arena);
         }
     }
-    //now this is use for sum/count/avg/min/max win function, other win 
function should override this function in class
-    
//stddev_pop/stddev_samp/variance_pop/variance_samp/hll_union_agg/group_concat
+
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena* arena) const override {
+                                Arena* arena, UInt8* use_null_result,
+                                UInt8* could_use_previous_result) const 
override {
         const Derived* derived = assert_cast<const Derived*>(this);
         frame_start = std::max<int64_t>(frame_start, partition_start);
         frame_end = std::min<int64_t>(frame_end, partition_end);
         for (int64_t i = frame_start; i < frame_end; ++i) {
             derived->add(place, columns, i, arena);
         }
+        if (frame_start >= frame_end) {
+            if (!*could_use_previous_result) {
+                *use_null_result = true;
+            }
+        } else {
+            *use_null_result = false;
+            *could_use_previous_result = true;
+        }
     }
 
     void add_batch_range(size_t batch_begin, size_t batch_end, 
AggregateDataPtr place,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h 
b/be/src/vec/aggregate_functions/aggregate_function_avg.h
index facef6ac874..14b7e2bea27 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h
@@ -141,19 +141,34 @@ public:
         }
     }
 
-    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
-             Arena*) const override {
+    template <bool is_add>
+    void update_value(AggregateDataPtr __restrict place, const IColumn** 
columns,
+                      ssize_t row_num) const {
 #ifdef __clang__
 #pragma clang fp reassociate(on)
 #endif
         const auto& column =
                 assert_cast<const ColVecType&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
-        if constexpr (is_decimal(T)) {
-            this->data(place).sum += 
(DataType)column.get_data()[row_num].value;
+        if constexpr (is_add) {
+            if constexpr (is_decimal(T)) {
+                this->data(place).sum += 
(DataType)column.get_data()[row_num].value;
+            } else {
+                this->data(place).sum += (DataType)column.get_data()[row_num];
+            }
+            ++this->data(place).count;
         } else {
-            this->data(place).sum += (DataType)column.get_data()[row_num];
+            if constexpr (is_decimal(T)) {
+                this->data(place).sum -= 
(DataType)column.get_data()[row_num].value;
+            } else {
+                this->data(place).sum -= (DataType)column.get_data()[row_num];
+            }
+            --this->data(place).count;
         }
-        ++this->data(place).count;
+    }
+
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
+             Arena*) const override {
+        update_value<true>(place, columns, row_num);
     }
 
     void reset(AggregateDataPtr place) const override {
@@ -278,6 +293,56 @@ public:
         return std::make_shared<DataTypeFixedLengthObject>();
     }
 
+    bool supported_incremental_mode() const override { return true; }
+
+    void execute_function_with_incremental(int64_t partition_start, int64_t 
partition_end,
+                                           int64_t frame_start, int64_t 
frame_end,
+                                           AggregateDataPtr place, const 
IColumn** columns,
+                                           Arena* arena, bool previous_is_nul, 
bool end_is_nul,
+                                           bool has_null, UInt8* 
use_null_result,
+                                           UInt8* could_use_previous_result) 
const override {
+        int64_t current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        int64_t current_frame_end = std::min<int64_t>(frame_end, 
partition_end);
+        if (current_frame_start >= current_frame_end) {
+            *use_null_result = true;
+            return;
+        }
+        if (*could_use_previous_result) {
+            auto outcoming_pos = frame_start - 1;
+            auto incoming_pos = frame_end - 1;
+            if (!previous_is_nul && outcoming_pos >= partition_start &&
+                outcoming_pos < partition_end) {
+                update_value<false>(place, columns, outcoming_pos);
+            }
+            if (!end_is_nul && incoming_pos >= partition_start && incoming_pos 
< partition_end) {
+                update_value<true>(place, columns, incoming_pos);
+            }
+        } else {
+            this->add_range_single_place(partition_start, partition_end, 
frame_start, frame_end,
+                                         place, columns, arena, 
use_null_result,
+                                         could_use_previous_result);
+        }
+    }
+
+    void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
+                                int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena* arena, UInt8* use_null_result,
+                                UInt8* could_use_previous_result) const 
override {
+        auto current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+        if (current_frame_start >= current_frame_end) {
+            if (!*could_use_previous_result) {
+                *use_null_result = true;
+            }
+        } else {
+            for (size_t row_num = current_frame_start; row_num < 
current_frame_end; ++row_num) {
+                update_value<true>(place, columns, row_num);
+            }
+            *use_null_result = false;
+            *could_use_previous_result = true;
+        }
+    }
+
 private:
     UInt32 scale;
 };
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h 
b/be/src/vec/aggregate_functions/aggregate_function_count.h
index 8915b88ea3b..99e7422b108 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count.h
@@ -174,6 +174,23 @@ public:
     DataTypePtr get_serialized_type() const override {
         return std::make_shared<DataTypeFixedLengthObject>();
     }
+
+    void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
+                                int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena* arena, UInt8* use_null_result,
+                                UInt8* could_use_previous_result) const 
override {
+        frame_start = std::max<int64_t>(frame_start, partition_start);
+        frame_end = std::min<int64_t>(frame_end, partition_end);
+        if (frame_start >= frame_end) {
+            if (!*could_use_previous_result) {
+                *use_null_result = true;
+            }
+        } else {
+            AggregateFunctionCount::data(place).count += frame_end - 
frame_start;
+            *use_null_result = false;
+            *could_use_previous_result = true;
+        }
+    }
 };
 
 // TODO: Maybe AggregateFunctionCountNotNullUnary should be a subclass of 
AggregateFunctionCount
@@ -313,6 +330,35 @@ public:
     DataTypePtr get_serialized_type() const override {
         return std::make_shared<DataTypeFixedLengthObject>();
     }
+
+    void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
+                                int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena* arena, UInt8* use_null_result,
+                                UInt8* could_use_previous_result) const 
override {
+        frame_start = std::max<int64_t>(frame_start, partition_start);
+        frame_end = std::min<int64_t>(frame_end, partition_end);
+        if (frame_start >= frame_end) {
+            if (!*could_use_previous_result) {
+                *use_null_result = true;
+            }
+        } else {
+            const auto& nullable_column =
+                    assert_cast<const ColumnNullable&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
+            size_t count = 0;
+            if (nullable_column.has_null()) {
+                for (int64_t i = frame_start; i < frame_end; ++i) {
+                    if (!nullable_column.is_null_at(i)) {
+                        ++count;
+                    }
+                }
+            } else {
+                count = frame_end - frame_start;
+            }
+            *use_null_result = false;
+            *could_use_previous_result = true;
+            AggregateFunctionCountNotNullUnary::data(place).count += count;
+        }
+    }
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h 
b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index 63c0f03cdfc..f8815c30d70 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -354,13 +354,22 @@ public:
 
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena*) const override {
+                                Arena*, UInt8* current_window_empty,
+                                UInt8* current_window_has_inited) const 
override {
         frame_start = std::max<int64_t>(frame_start, partition_start);
         frame_end = std::min<int64_t>(frame_end, partition_end);
         int64_t places_address = reinterpret_cast<int64_t>(place);
         Status st = this->data(_exec_place)
                             .add(places_address, true, columns, frame_start, 
frame_end,
                                  argument_types, 0);
+        if (frame_start >= frame_end) {
+            if (!*current_window_has_inited) {
+                *current_window_empty = true;
+            }
+        } else {
+            *current_window_empty = false;
+            *current_window_has_inited = true;
+        }
         if (UNLIKELY(!st.ok())) {
             throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string());
         }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h 
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index a139e0e6a47..fafbc2b36c0 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -31,6 +31,7 @@
 
 #include "common/cast_set.h"
 #include "common/logging.h"
+#include "runtime/primitive_type.h"
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_fixed_length_object.h"
@@ -165,6 +166,15 @@ public:
         }
     }
 
+    bool check_if_equal(const IColumn& column, size_t row_num) const {
+        if (!has()) {
+            return false;
+        }
+        return assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
+                           TypeCheckOnRelease::DISABLE>(column)
+                       .get_data()[row_num] == value;
+    }
+
     bool change_if_greater(const Self& to, Arena*) {
         if (to.has() && (!has() || to.value > value)) {
             change(to, nullptr);
@@ -304,6 +314,15 @@ public:
         }
     }
 
+    bool check_if_equal(const IColumn& column, size_t row_num) const {
+        if (!has()) {
+            return false;
+        }
+        return assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
+                           TypeCheckOnRelease::DISABLE>(column)
+                       .get_data()[row_num] == value;
+    }
+
     void change_first_time(const IColumn& column, size_t row_num, Arena*) {
         if (UNLIKELY(!has())) {
             change(column, row_num, nullptr);
@@ -472,6 +491,14 @@ public:
         }
     }
 
+    bool check_if_equal(const IColumn& column, size_t row_num) const {
+        if (!has()) {
+            return false;
+        }
+        return assert_cast<const ColumnString&, 
TypeCheckOnRelease::DISABLE>(column).get_data_at(
+                       row_num) == get_string_ref();
+    }
+
     void change_first_time(const IColumn& column, size_t row_num, Arena*) {
         if (UNLIKELY(!has())) {
             change(column, row_num, nullptr);
@@ -633,6 +660,8 @@ struct SingleValueDataComplexType {
 
     void change_if_better(const Self& to, Arena* arena) { 
this->change_first_time(to, nullptr); }
 
+    bool check_if_equal(const IColumn& column, size_t row_num) const { return 
false; }
+
 private:
     bool has_value = false;
     MutableColumnPtr column_data;
@@ -825,6 +854,75 @@ public:
             return std::make_shared<DataTypeString>();
         }
     }
+
+    bool supported_incremental_mode() const override { return !(Data::IS_ANY); 
}
+
+    void execute_function_with_incremental(int64_t partition_start, int64_t 
partition_end,
+                                           int64_t frame_start, int64_t 
frame_end,
+                                           AggregateDataPtr place, const 
IColumn** columns,
+                                           Arena* arena, bool previous_is_nul, 
bool end_is_nul,
+                                           bool has_null, UInt8* 
use_null_result,
+                                           UInt8* could_use_previous_result) 
const override {
+        int64_t current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        int64_t current_frame_end = std::min<int64_t>(frame_end, 
partition_end);
+        if (current_frame_start >= current_frame_end) {
+            *use_null_result = true;
+            return;
+        }
+        if (*could_use_previous_result) {
+            auto outcoming_pos = frame_start - 1;
+            auto incoming_pos = frame_end - 1;
+            if (!previous_is_nul && outcoming_pos >= partition_start &&
+                outcoming_pos < partition_end) {
+                if (this->data(place).check_if_equal(*columns[0], 
outcoming_pos)) {
+                    this->data(place).reset();
+                    if (has_null) {
+                        const auto& null_map_data =
+                                assert_cast<const 
ColumnUInt8*>(columns[1])->get_data();
+                        for (size_t i = current_frame_start; i < 
current_frame_end; ++i) {
+                            if (null_map_data[i] == 0) {
+                                
this->data(place).change_if_better(*columns[0], i, arena);
+                            }
+                        }
+                    } else {
+                        this->add_range_single_place(partition_start, 
partition_end,
+                                                     current_frame_start, 
current_frame_end, place,
+                                                     columns, arena, 
use_null_result,
+                                                     
could_use_previous_result);
+                    }
+                    return;
+                }
+            }
+            if (!end_is_nul && incoming_pos >= partition_start && incoming_pos 
< partition_end) {
+                this->data(place).change_if_better(*columns[0], incoming_pos, 
arena);
+            }
+
+        } else {
+            this->add_range_single_place(partition_start, partition_end, 
frame_start, frame_end,
+                                         place, columns, arena, 
use_null_result,
+                                         could_use_previous_result);
+        }
+    }
+
+    void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
+                                int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena* arena, UInt8* use_null_result,
+                                UInt8* could_use_previous_result) const 
override {
+        auto current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+
+        if (current_frame_start >= current_frame_end) {
+            if (!*could_use_previous_result) {
+                *use_null_result = true;
+            }
+        } else {
+            for (size_t row_num = current_frame_start; row_num < 
current_frame_end; ++row_num) {
+                this->data(place).change_if_better(*columns[0], row_num, 
nullptr);
+            }
+            *use_null_result = false;
+            *could_use_previous_result = true;
+        }
+    }
 };
 
 template <template <typename> class Data>
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h 
b/be/src/vec/aggregate_functions/aggregate_function_null.h
index b46bcbe3537..274a39d8409 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_null.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_null.h
@@ -28,6 +28,7 @@
 #include "vec/aggregate_functions/aggregate_function_distinct.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/common/assert_cast.h"
+#include "vec/core/types.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/io/io_helper.h"
 
@@ -39,6 +40,7 @@ class AggregateFunctionNullBaseInline : public 
IAggregateFunctionHelper<Derived>
 protected:
     std::unique_ptr<NestFunction> nested_function;
     size_t prefix_size;
+    mutable int64_t null_count = 0;
 
     /** In addition to data for nested aggregate function, we keep a flag
       *  indicating - was there at least one non-NULL value accumulated.
@@ -91,7 +93,7 @@ public:
 
     String get_name() const override {
         /// This is just a wrapper. The function for Nullable arguments is 
named the same as the nested function itself.
-        return nested_function->get_name();
+        return "Nullable(" + nested_function->get_name() + ")";
     }
 
     DataTypePtr get_return_type() const override {
@@ -110,6 +112,7 @@ public:
     void reset(AggregateDataPtr place) const override {
         init_flag(place);
         nested_function->reset(nested_place(place));
+        null_count = 0;
     }
 
     bool has_trivial_destructor() const override {
@@ -281,6 +284,107 @@ public:
                                                    false);
         }
     }
+
+    void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
+                                int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena* arena, UInt8* use_null_result,
+                                UInt8* could_use_previous_result) const 
override {
+        auto current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+        if (current_frame_start >= current_frame_end) {
+            if (!*could_use_previous_result) {
+                this->init_flag(place);
+                *use_null_result = true;
+                return;
+            }
+        } else {
+            *use_null_result = false;
+            *could_use_previous_result = true;
+        }
+        const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+        bool has_null = column->has_null();
+        if (has_null) {
+            for (size_t i = current_frame_start; i < current_frame_end; ++i) {
+                this->add(place, columns, i, arena);
+            }
+        } else {
+            const IColumn* nested_column = &(column->get_nested_column());
+            this->set_flag(place);
+            this->nested_function->add_range_single_place(
+                    partition_start, partition_end, frame_start, frame_end,
+                    this->nested_place(place), &nested_column, arena, 
use_null_result,
+                    could_use_previous_result);
+        }
+    }
+
+    bool supported_incremental_mode() const override {
+        return this->nested_function->supported_incremental_mode();
+    }
+
+    void execute_function_with_incremental(int64_t partition_start, int64_t 
partition_end,
+                                           int64_t frame_start, int64_t 
frame_end,
+                                           AggregateDataPtr place, const 
IColumn** columns,
+                                           Arena* arena, bool previous_is_nul, 
bool end_is_nul,
+                                           bool has_null, UInt8* 
use_null_result,
+                                           UInt8* could_use_previous_result) 
const override {
+        int64_t current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        int64_t current_frame_end = std::min<int64_t>(frame_end, 
partition_end);
+        if (current_frame_start >= current_frame_end) {
+            *use_null_result = true;
+            this->init_flag(place);
+            return;
+        }
+
+        DCHECK(columns[0]->is_nullable()) << columns[0]->get_name();
+        const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+        const IColumn* nested_column = &column->get_nested_column();
+
+        if (!column->has_null()) {
+            if (*could_use_previous_result) {
+                this->nested_function->execute_function_with_incremental(
+                        partition_start, partition_end, frame_start, frame_end,
+                        this->nested_place(place), &nested_column, arena, 
previous_is_nul,
+                        end_is_nul, false, use_null_result, 
could_use_previous_result);
+            } else {
+                this->nested_function->add_range_single_place(
+                        partition_start, partition_end, frame_start, frame_end,
+                        this->nested_place(place), &nested_column, arena, 
use_null_result,
+                        could_use_previous_result);
+            }
+            this->set_flag(place);
+            return;
+        }
+
+        const auto* __restrict null_map_data = 
column->get_null_map_data().data();
+        if (*could_use_previous_result) {
+            auto outcoming_pos = frame_start - 1;
+            auto incoming_pos = frame_end - 1;
+            bool is_previous_frame_start_null = false;
+            if (outcoming_pos >= partition_start && outcoming_pos < 
partition_end &&
+                null_map_data[outcoming_pos] == 1) {
+                is_previous_frame_start_null = true;
+                this->null_count--;
+            }
+            bool is_current_frame_end_null = false;
+            if (incoming_pos >= partition_start && incoming_pos < 
partition_end &&
+                null_map_data[incoming_pos] == 1) {
+                is_current_frame_end_null = true;
+                this->null_count++;
+            }
+            const IColumn* columns_tmp[2] {nested_column, 
&(*column->get_null_map_column_ptr())};
+            this->nested_function->execute_function_with_incremental(
+                    partition_start, partition_end, frame_start, frame_end,
+                    this->nested_place(place), columns_tmp, arena, 
is_previous_frame_start_null,
+                    is_current_frame_end_null, true, use_null_result, 
could_use_previous_result);
+            if (current_frame_end - current_frame_start != this->null_count) {
+                this->set_flag(place);
+            }
+        } else {
+            this->add_range_single_place(partition_start, partition_end, 
frame_start, frame_end,
+                                         place, columns, arena, 
use_null_result,
+                                         could_use_previous_result);
+        }
+    }
 };
 
 template <typename NestFuction, bool result_is_nullable>
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h 
b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
index 6daa0f99fef..e7b77494bd2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
@@ -237,7 +237,7 @@ public:
 
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena*) const override {
+                                Arena* arena, UInt8*, UInt8*) const override {
         throw doris::Exception(
                 Status::FatalError("ReaderFunctionData do not support 
add_range_single_place"));
     }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h 
b/be/src/vec/aggregate_functions/aggregate_function_sum.h
index 608d0f1dc1d..8e4d647b20c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h
@@ -212,6 +212,64 @@ public:
         return std::make_shared<DataTypeFixedLengthObject>();
     }
 
+    bool supported_incremental_mode() const override { return true; }
+
+    void execute_function_with_incremental(int64_t partition_start, int64_t 
partition_end,
+                                           int64_t frame_start, int64_t 
frame_end,
+                                           AggregateDataPtr place, const 
IColumn** columns,
+                                           Arena* arena, bool previous_is_nul, 
bool end_is_nul,
+                                           bool has_null, UInt8* 
use_null_result,
+                                           UInt8* could_use_previous_result) 
const override {
+        int64_t current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        int64_t current_frame_end = std::min<int64_t>(frame_end, 
partition_end);
+
+        if (current_frame_start >= current_frame_end) {
+            *use_null_result = true;
+            return;
+        }
+        if (*could_use_previous_result) {
+            const auto& column =
+                    assert_cast<const ColVecType&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
+            const auto* data = column.get_data().data();
+            auto outcoming_pos = frame_start - 1;
+            auto incoming_pos = frame_end - 1;
+            if (!previous_is_nul && outcoming_pos >= partition_start &&
+                outcoming_pos < partition_end) {
+                this->data(place).sum -= data[outcoming_pos];
+            }
+            if (!end_is_nul && incoming_pos >= partition_start && incoming_pos 
< partition_end) {
+                this->data(place).sum += data[incoming_pos];
+            }
+        } else {
+            this->add_range_single_place(partition_start, partition_end, 
frame_start, frame_end,
+                                         place, columns, arena, 
use_null_result,
+                                         could_use_previous_result);
+        }
+    }
+
+    void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
+                                int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena* arena, UInt8* use_null_result,
+                                UInt8* could_use_previous_result) const 
override {
+        auto current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+
+        if (current_frame_start >= current_frame_end) {
+            if (!*could_use_previous_result) {
+                *use_null_result = true;
+            }
+        } else {
+            const auto& column =
+                    assert_cast<const ColVecType&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
+            for (size_t row_num = current_frame_start; row_num < 
current_frame_end; ++row_num) {
+                this->data(place).add(typename 
PrimitiveTypeTraits<TResult>::ColumnItemType(
+                        column.get_data()[row_num]));
+            }
+            *use_null_result = false;
+            *could_use_previous_result = true;
+        }
+    }
+
 private:
     UInt32 scale;
 };
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h 
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 811d5d2a175..d0288941554 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -63,7 +63,7 @@ public:
 
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena*) const override {
+                                Arena*, UInt8*, UInt8*) const override {
         ++data(place).count;
     }
 
@@ -112,7 +112,7 @@ public:
 
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena*) const override {
+                                Arena*, UInt8*, UInt8*) const override {
         int64_t peer_group_count = frame_end - frame_start;
         if (WindowFunctionRank::data(place).peer_group_start != frame_start) {
             WindowFunctionRank::data(place).peer_group_start = frame_start;
@@ -168,7 +168,7 @@ public:
 
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena*) const override {
+                                Arena*, UInt8*, UInt8*) const override {
         if (WindowFunctionDenseRank::data(place).peer_group_start != 
frame_start) {
             WindowFunctionDenseRank::data(place).peer_group_start = 
frame_start;
             WindowFunctionDenseRank::data(place).rank++;
@@ -226,7 +226,7 @@ public:
 
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena*) const override {
+                                Arena*, UInt8*, UInt8*) const override {
         int64_t peer_group_count = frame_end - frame_start;
         if (WindowFunctionPercentRank::data(place).peer_group_start != 
frame_start) {
             WindowFunctionPercentRank::data(place).peer_group_start = 
frame_start;
@@ -296,7 +296,7 @@ public:
 
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena*) const override {
+                                Arena*, UInt8*, UInt8*) const override {
         check_default(place, partition_start, partition_end);
         int64_t peer_group_count = frame_end - frame_start;
         if (WindowFunctionCumeDist::data(place).peer_group_start != 
frame_start) {
@@ -352,7 +352,7 @@ public:
 
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena*) const override {
+                                Arena*, UInt8*, UInt8*) const override {
         // some variables are partition related, but there is no chance to 
init them
         // when the new partition arrives, so we calculate them every time now.
         // Partition = big_bucket_num * big_bucket_size + small_bucket_num * 
small_bucket_size
@@ -655,7 +655,7 @@ public:
 
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
-                                Arena*) const override {
+                                Arena*, UInt8*, UInt8*) const override {
         this->data(place).add_range_single_place(partition_start, 
partition_end, frame_start,
                                                  frame_end, columns);
     }
diff --git 
a/regression-test/data/query_p0/sql_functions/window_functions/test_window_fn.out
 
b/regression-test/data/query_p0/sql_functions/window_functions/test_window_fn.out
index 016d3a299dc..c4ea6bc9c2c 100644
Binary files 
a/regression-test/data/query_p0/sql_functions/window_functions/test_window_fn.out
 and 
b/regression-test/data/query_p0/sql_functions/window_functions/test_window_fn.out
 differ
diff --git 
a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
 
b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
index dd875749c91..111a85a5701 100644
--- 
a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
+++ 
b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
@@ -399,7 +399,278 @@ suite("test_window_fn") {
     sql """set enable_nereids_planner=true;"""
     sql """SELECT SUM(MAX(c1) OVER (PARTITION BY c2, c3)) FROM  
test_window_in_agg;"""
 
-    sql "DROP TABLE IF EXISTS test_window_in_agg;"
+    sql "DROP TABLE IF EXISTS test2;"
+
+    sql """ DROP TABLE IF EXISTS test2; """
+
+    sql """ CREATE TABLE IF NOT EXISTS test2 (
+                    `pk` int NULL, 
+                    `col_datetime_3__undef_signed_not_null` datetime(3) not 
null
+                    )
+    DUPLICATE KEY(pk) 
+    DISTRIBUTED BY HASH(pk) BUCKETS 3 
+    PROPERTIES ( 
+        "replication_num" = "1"
+    ); 
+    """
+
+    sql """ INSERT into test2 (pk, col_datetime_3__undef_signed_not_null) 
values 
+                                                    ('0', '2005-01-11 
03:43:25.000'),
+                                                    ('1', '2000-05-27 
10:52:55.000'),
+                                                    ('2', '2003-07-22 
04:04:57.000'),
+                                                    ('3', '2024-07-01 
00:00:00.000'),
+                                                    ('4', '9999-12-31 
00:00:00.000'),
+                                                    ('5', '2022-03-13 
01:30:00'),
+                                                    ('6', '2022-03-13 
04:45:00'),
+                                                    ('7', '2022-03-13 
07:15:00'),
+                                                    ('8', '2022-03-13 
10:05:00'),
+                                                    ('9', '2022-03-13 
12:50:00'); 
+
+    """
+
+    qt_sql_window_null """
+        select col_datetime_3__undef_signed_not_null,pk, 
max(col_datetime_3__undef_signed_not_null) over (order by pk rows between 4 
preceding and 2 preceding) as res from test2 order by pk;
+    """
+
+
+    sql "DROP TABLE IF EXISTS test_baseall_null"
+    sql """
+         CREATE TABLE IF NOT EXISTS `test_baseall_null` (
+             `k0` boolean null comment "",
+             `k1` tinyint(4) null comment "",
+             `k2` smallint(6) null comment "",
+             `k3` int(11) null comment "",
+             `k4` bigint(20) null comment "",
+             `k5` decimal(10, 6) null comment "",
+             `k6` char(5) null comment "",
+             `k10` date null comment "",
+             `k11` datetime null comment "",
+             `k7` varchar(20) null comment "",
+             `k8` double max null comment "",
+             `k9` float sum null comment "",
+             `k12` string replace null comment "",
+             `k13` largeint(40) replace null comment ""
+         ) engine=olap
+         DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"1")
+         """
+
+    streamLoad {
+         table "test_baseall_null"
+         db 'regression_test_query_p0_sql_functions_window_functions'
+         set 'column_separator', ','
+         file "../../baseall.txt"
+    }
+    sql "sync"
+
+    sql "DROP TABLE IF EXISTS test_baseall_not"
+    sql """
+         CREATE TABLE IF NOT EXISTS `test_baseall_not` (
+             `k0` boolean not null comment "",
+             `k1` tinyint(4) not null comment "",
+             `k2` smallint(6) not null comment "",
+             `k3` int(11) not null comment "",
+             `k4` bigint(20) not null comment "",
+             `k5` decimal(10, 6) not null comment "",
+             `k6` char(5) not null comment "",
+             `k10` date not null comment "",
+             `k11` datetime not null comment "",
+             `k7` varchar(20) not null comment "",
+             `k8` double max not null comment "",
+             `k9` float sum not null comment "",
+             `k12` string replace not null comment "",
+             `k13` largeint(40) replace not null comment ""
+         ) engine=olap
+         DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"1")
+    """
+
+    qt_sql_table_count """
+        select count(1) from test_baseall_null;
+    """
+
+    sql """
+        insert into test_baseall_not select * from test_baseall_null where k1 
is not null; 
+    """
+
+    qt_sql_table_count2 """
+        select count(1) from test_baseall_not;
+    """
+
+    qt_sql_test_1 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 
unbounded preceding and 2 following) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_test_2 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 range between 
unbounded preceding and unbounded following) from test_baseall_not order by 
k6,k1;
+    """
+
+    qt_sql_test_3 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 2 
following and 3 following) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_test_4 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 4 
preceding and 2 preceding) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_test_5 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 
unbounded preceding and 2 following) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_6 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 range between 
unbounded preceding and unbounded following) from test_baseall_null order by 
k6,k1;
+    """
+
+    qt_sql_test_7 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 2 
following and 3 following) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_8 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 4 
preceding and 2 preceding) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_1 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows 2 
preceding) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_2 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows current 
row) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_3 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
CURRENT ROW AND UNBOUNDED FOLLOWING) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_4 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows unbounded 
preceding) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_5 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 range unbounded 
preceding) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_6 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 range BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_7 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_8 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_not order by 
k6,k1;
+    """
+
+    qt_sql_9 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 range BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_not order by 
k6,k1;
+    """
+
+    qt_sql_10 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2 
PRECEDING AND 3 FOLLOWING) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_11 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 3 
PRECEDING AND 2 PRECEDING) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_12 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2 
FOLLOWING AND 3 FOLLOWING) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_13 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
UNBOUNDED PRECEDING AND 3 FOLLOWING) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_14 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
UNBOUNDED PRECEDING AND 2 PRECEDING) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_15 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2 
PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_16 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2 
FOLLOWING AND UNBOUNDED FOLLOWING) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_17 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 1 
PRECEDING AND CURRENT ROW) from test_baseall_not order by k6,k1;
+    """
+
+    qt_sql_18 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
CURRENT ROW AND 1 FOLLOWING) from test_baseall_not order by k6,k1;
+    """
+
+
+    qt_sql_test_1 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows 2 
preceding) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_2 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows current 
row) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_3 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
CURRENT ROW AND UNBOUNDED FOLLOWING) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_4 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows unbounded 
preceding) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_5 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 range unbounded 
preceding) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_6 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 range BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_7 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_8 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_null order by 
k6,k1;
+    """
+
+    qt_sql_test_9 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 range BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_null order by 
k6,k1;
+    """
+
+    qt_sql_test_10 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2 
PRECEDING AND 3 FOLLOWING) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_11 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 3 
PRECEDING AND 2 PRECEDING) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_12 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2 
FOLLOWING AND 3 FOLLOWING) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_13 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
UNBOUNDED PRECEDING AND 3 FOLLOWING) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_14 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
UNBOUNDED PRECEDING AND 2 PRECEDING) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_15 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2 
PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_16 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2 
FOLLOWING AND UNBOUNDED FOLLOWING) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_17 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 1 
PRECEDING AND CURRENT ROW) from test_baseall_null order by k6,k1;
+    """
+
+    qt_sql_test_18 """
+        select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 
CURRENT ROW AND 1 FOLLOWING) from test_baseall_null order by k6,k1;
+    """
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to