This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e75bfc1f4e [feature](be/fe) Add exponential_moving_average aggregate 
function (#63499)
1e75bfc1f4e is described below

commit 1e75bfc1f4e6b744ebcf16fb0639248b10aedbb3
Author: HappenLee <[email protected]>
AuthorDate: Fri May 22 16:10:56 2026 +0800

    [feature](be/fe) Add exponential_moving_average aggregate function (#63499)
    
    
    
        Problem Summary:
        Doris lacked an exponential moving average aggregate function. This
    implements `exponential_moving_average(half_decay, value, time)` ported
        from ClickHouse's ExponentialMovingAverage aggregate.
    
        Algorithm:
        - State stores (value, time, half_decay) as doubles.
        - On add(): scale existing value to the new timestamp via
    `2^(-dt/half_decay)`, then accumulate. Store half_decay in state so
          it is available during merge().
        - On merge(): advance both states to the later timestamp, then sum.
          This is commutative and associative, so row order doesn't matter.
        - Result: `value * (1 - 2^(-1/half_decay))` (normalised by sum of
          weights).
    
        Changes:
    - BE: `aggregate_function_ema.h/cpp` — state struct + function class,
          registered in `aggregate_function_simple_factory.cpp`.
    - FE: `ExponentialMovingAverage.java` — 3-arg DOUBLE signature, visitor
          method added to `AggregateFunctionVisitor`, registered in
          `BuiltinAggregateFunctions`.
        - Regression test: `query_p0/aggregate/exponential_moving_average/`.
    
        ### Release note
    
    New aggregate function `exponential_moving_average(half_decay, value,
    time)`
    computes the exponential moving average over a stream of (value, time)
        pairs with the given half-decay parameter.
    
        ### Check List (For Author)
    
    - Test: Regression test added
    (query_p0/aggregate/exponential_moving_average)
        - Behavior changed: No (new function)
    - Does this need documentation:
    [docs](https://github.com/apache/doris-website/pull/3726)
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 be/src/exprs/aggregate/aggregate_function_ema.cpp  |  31 ++++
 be/src/exprs/aggregate/aggregate_function_ema.h    | 173 +++++++++++++++++++++
 .../aggregate_function_simple_factory.cpp          |   2 +
 .../doris/catalog/BuiltinAggregateFunctions.java   |   2 +
 .../functions/agg/ExponentialMovingAverage.java    | 122 +++++++++++++++
 .../visitor/AggregateFunctionVisitor.java          |   5 +
 .../exponential_moving_average.out                 |  31 ++++
 .../exponential_moving_average.groovy              | 158 +++++++++++++++++++
 8 files changed, 524 insertions(+)

diff --git a/be/src/exprs/aggregate/aggregate_function_ema.cpp 
b/be/src/exprs/aggregate/aggregate_function_ema.cpp
new file mode 100644
index 00000000000..b3dde6c06bb
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_ema.cpp
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/aggregate/aggregate_function_ema.h"
+
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "exprs/aggregate/helpers.h"
+
+namespace doris {
+
+void register_aggregate_function_ema(AggregateFunctionSimpleFactory& factory) {
+    factory.register_function_both(
+            "exponential_moving_average",
+            
creator_without_type::creator<AggregateFunctionExponentialMovingAverage>);
+}
+
+} // namespace doris
diff --git a/be/src/exprs/aggregate/aggregate_function_ema.h 
b/be/src/exprs/aggregate/aggregate_function_ema.h
new file mode 100644
index 00000000000..edafb973bb0
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_ema.h
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file is adapted from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionExponentialMovingAverage.cpp
+
+#pragma once
+
+#include <cmath>
+#include <memory>
+
+#include "core/assert_cast.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+#include "core/types.h"
+#include "exprs/aggregate/aggregate_function.h"
+
+namespace doris {
+class Arena;
+class BufferReadable;
+class BufferWritable;
+class IColumn;
+
+/**
+ * Exponentially smoothed moving average over time.
+ *
+ * Each value corresponds to a timeunit index. The half_decay parameter is the
+ * time lag at which exponential weights decay by one-half.
+ *
+ * State is a (value, time) pair representing the exponentially accumulated sum
+ * at a reference time. To get the average, divide by sumWeights(half_decay).
+ *
+ * Formula:
+ *   scale(dt, x) = 2^(-dt/x)
+ *   sumWeights(x) = 1 / (1 - 2^(-1/x))
+ *   add(v, t): merge current state with point (v, t)
+ *   merge(a, b): move both to the later time, then sum values
+ *   get():  value / sumWeights(half_decay)
+ *
+ * Usage: exponential_moving_average(half_decay, value, timeunit)
+ *   - half_decay: constant double, the half-life period in timeunit units
+ *   - value:      numeric column to average
+ *   - timeunit:   numeric time index (not raw timestamp; use intDiv if needed)
+ * Returns DOUBLE.
+ */
+struct ExponentialMovingAverageData {
+    double value = 0.0;
+    double time = 0.0;
+    double half_decay = 0.0;
+
+    static double scale(double time_passed, double hd) { return 
std::exp2(-time_passed / hd); }
+
+    static double sum_weights(double hd) { return 1.0 / (1.0 - std::exp2(-1.0 
/ hd)); }
+
+    void add(double new_value, double current_time, double hd) {
+        half_decay = hd;
+        ExponentialMovingAverageData other;
+        other.value = new_value;
+        other.time = current_time;
+        merge_point(other, hd);
+    }
+
+    void merge_point(const ExponentialMovingAverageData& other, double hd) {
+        if (time > other.time) {
+            value = value + other.value * scale(time - other.time, hd);
+        } else if (time < other.time) {
+            value = other.value + value * scale(other.time - time, hd);
+            time = other.time;
+        } else {
+            value = value + other.value;
+        }
+    }
+
+    void merge(const ExponentialMovingAverageData& rhs) {
+        double hd = half_decay != 0.0 ? half_decay : rhs.half_decay;
+        if (hd == 0.0) {
+            return;
+        }
+        half_decay = hd;
+        merge_point(rhs, hd);
+    }
+
+    double get() const {
+        if (half_decay == 0.0) {
+            return 0.0;
+        }
+        return value / sum_weights(half_decay);
+    }
+
+    void write(BufferWritable& buf) const {
+        buf.write_binary(value);
+        buf.write_binary(time);
+        buf.write_binary(half_decay);
+    }
+
+    void read(BufferReadable& buf) {
+        buf.read_binary(value);
+        buf.read_binary(time);
+        buf.read_binary(half_decay);
+    }
+
+    void reset() {
+        value = 0.0;
+        time = 0.0;
+        half_decay = 0.0;
+    }
+};
+
+class AggregateFunctionExponentialMovingAverage final
+        : public IAggregateFunctionDataHelper<ExponentialMovingAverageData,
+                                              
AggregateFunctionExponentialMovingAverage>,
+          MultiExpression,
+          NullableAggregateFunction {
+public:
+    AggregateFunctionExponentialMovingAverage(const DataTypes& argument_types_)
+            : IAggregateFunctionDataHelper<ExponentialMovingAverageData,
+                                           
AggregateFunctionExponentialMovingAverage>(
+                      argument_types_) {}
+
+    String get_name() const override { return "exponential_moving_average"; }
+
+    DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeFloat64>(); }
+
+    void reset(AggregateDataPtr __restrict place) const override { 
this->data(place).reset(); }
+
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
+             Arena&) const override {
+        const double half_decay =
+                assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[0])
+                        .get_data()[row_num];
+        const double new_value =
+                assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[1])
+                        .get_data()[row_num];
+        const double current_time =
+                assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[2])
+                        .get_data()[row_num];
+        this->data(place).add(new_value, current_time, half_decay);
+    }
+
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+               Arena&) const override {
+        this->data(place).merge(this->data(rhs));
+    }
+
+    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
+        this->data(place).write(buf);
+    }
+
+    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+                     Arena&) const override {
+        this->data(place).read(buf);
+    }
+
+    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
+        
assert_cast<ColumnFloat64&>(to).get_data().push_back(this->data(place).get());
+    }
+};
+
+} // namespace doris
diff --git a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp 
b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
index b9851bd4752..7083539de6b 100644
--- a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
@@ -79,6 +79,7 @@ void 
register_aggregate_function_percentile_reservoir(AggregateFunctionSimpleFac
 void register_aggregate_function_ai_agg(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_bool_union(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_sem(AggregateFunctionSimpleFactory& factory);
+void register_aggregate_function_ema(AggregateFunctionSimpleFactory& factory);
 
 AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() {
     static std::once_flag oc;
@@ -136,6 +137,7 @@ AggregateFunctionSimpleFactory& 
AggregateFunctionSimpleFactory::instance() {
         register_aggregate_function_ai_agg(instance);
         register_aggregate_function_bool_union(instance);
         register_aggregate_function_sem(instance);
+        register_aggregate_function_ema(instance);
         // Register foreach and foreachv2 functions
         register_aggregate_function_combinator_foreach(instance);
         register_aggregate_function_combinator_foreachv2(instance);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
index 3104a89eb8d..aa136f3656a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
@@ -38,6 +38,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
 import org.apache.doris.nereids.trees.expressions.functions.agg.GroupBitAnd;
@@ -139,6 +140,7 @@ public class BuiltinAggregateFunctions implements 
FunctionHelper {
                 agg(CollectSet.class, "collect_set", "group_uniq_array"),
                 agg(Corr.class, "corr"),
                 agg(CorrWelford.class, "corr_welford"),
+                agg(ExponentialMovingAverage.class, 
"exponential_moving_average"),
                 agg(Count.class, "count"),
                 agg(CountByEnum.class, "count_by_enum"),
                 agg(Covar.class, "covar", "covar_pop"),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java
new file mode 100644
index 00000000000..caeb3b82dc7
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.agg;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.DoubleType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * Exponential Moving Average aggregate function.
+ *
+ * <p>Computes the exponentially smoothed moving average over time-indexed 
values.
+ * The half_decay parameter controls the half-life period: the time after 
which the
+ * exponential weight of a past value decays by a factor of 1/2.
+ *
+ * <p>Signature: {@code exponential_moving_average(half_decay DOUBLE, value 
DOUBLE,
+ * timeunit DOUBLE) -> DOUBLE}
+ *
+ * <p>The timeunit argument is a numeric time index, not a raw timestamp. For
+ * timestamp columns use {@code intDiv(toUnixTimestamp(ts), interval_seconds)}.
+ */
+public class ExponentialMovingAverage extends NullableAggregateFunction
+        implements ExplicitlyCastableSignature {
+
+    public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
+            FunctionSignature.ret(DoubleType.INSTANCE)
+                    .args(DoubleType.INSTANCE, DoubleType.INSTANCE, 
DoubleType.INSTANCE)
+    );
+
+    /**
+     * Constructor with 3 arguments: (half_decay, value, timeunit).
+     */
+    public ExponentialMovingAverage(Expression halfDecay, Expression value, 
Expression timeunit) {
+        this(false, halfDecay, value, timeunit);
+    }
+
+    /**
+     * Constructor with distinct flag and 3 arguments.
+     */
+    public ExponentialMovingAverage(boolean distinct, Expression halfDecay,
+            Expression value, Expression timeunit) {
+        this(distinct, false, halfDecay, value, timeunit);
+    }
+
+    /**
+     * Full constructor.
+     */
+    public ExponentialMovingAverage(boolean distinct, boolean alwaysNullable,
+            Expression halfDecay, Expression value, Expression timeunit) {
+        super("exponential_moving_average", distinct, alwaysNullable, 
halfDecay, value, timeunit);
+    }
+
+    /** Constructor for withChildren and reuse signature. */
+    private ExponentialMovingAverage(NullableAggregateFunctionParams 
functionParams) {
+        super(functionParams);
+    }
+
+    @Override
+    public void checkLegalityBeforeTypeCoercion() {
+        if (!getArgument(0).isConstant()) {
+            throw new AnalysisException("The half_decay argument of "
+                    + getName() + " must be a constant");
+        }
+        if (!getArgumentType(0).isNumericType()) {
+            throw new AnalysisException("The half_decay argument of "
+                    + getName() + " must be numeric");
+        }
+        if (!getArgumentType(1).isNumericType()) {
+            throw new AnalysisException("The value argument of "
+                    + getName() + " must be numeric");
+        }
+        if (!getArgumentType(2).isNumericType()) {
+            throw new AnalysisException("The timeunit argument of "
+                    + getName() + " must be numeric");
+        }
+    }
+
+    @Override
+    public ExponentialMovingAverage withDistinctAndChildren(boolean distinct,
+            List<Expression> children) {
+        Preconditions.checkArgument(children.size() == 3);
+        return new ExponentialMovingAverage(getFunctionParams(distinct, 
children));
+    }
+
+    @Override
+    public ExponentialMovingAverage withAlwaysNullable(boolean alwaysNullable) 
{
+        return new 
ExponentialMovingAverage(getAlwaysNullableFunctionParams(alwaysNullable));
+    }
+
+    @Override
+    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+        return visitor.visitExponentialMovingAverage(this, context);
+    }
+
+    @Override
+    public List<FunctionSignature> getSignatures() {
+        return SIGNATURES;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
index 37bc0697268..2a09e907fc3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
@@ -39,6 +39,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
 import org.apache.doris.nereids.trees.expressions.functions.agg.GroupBitAnd;
@@ -133,6 +134,10 @@ public interface AggregateFunctionVisitor<R, C> {
         return visitAggregateFunction(bitmapAgg, context);
     }
 
+    default R visitExponentialMovingAverage(ExponentialMovingAverage ema, C 
context) {
+        return visitNullableAggregateFunction(ema, context);
+    }
+
     default R visitBitmapIntersect(BitmapIntersect bitmapIntersect, C context) 
{
         return visitAggregateFunction(bitmapIntersect, context);
     }
diff --git 
a/regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out
 
b/regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out
new file mode 100644
index 00000000000..2ca841b794f
--- /dev/null
+++ 
b/regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !basic --
+92.25779635374204
+
+-- !single_row --
+47.5
+
+-- !group_by --
+1      11.25
+2      50
+
+-- !half_decay_zero --
+0
+
+-- !negative --
+2.196699141100893
+
+-- !null --
+11.25
+
+-- !empty --
+\N
+
+-- !dup_time --
+15.46875
+
+-- !window --
+0      5
+1      7.5
+2      8.75
+
diff --git 
a/regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy
 
b/regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy
new file mode 100644
index 00000000000..18fb38bd2ec
--- /dev/null
+++ 
b/regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("exponential_moving_average") {
+    // Prepare main test table (dropped first to keep the environment for 
debugging)
+    sql "drop table if exists ema_test;"
+    sql """
+        create table ema_test (
+            id      int,
+            v       double,
+            t       double
+        )
+        duplicate key (id)
+        distributed by hash(id) buckets 1
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into ema_test values
+            (1,  95, 1), (2,  95, 2), (3,  95, 3), (4,  96, 4), (5,  96, 5),
+            (6,  96, 6), (7,  96, 7), (8,  97, 8), (9,  97, 9), (10, 97, 10),
+            (11, 97, 11), (12, 98, 12), (13, 98, 13), (14, 98, 14), (15, 98, 
15),
+            (16, 99, 16), (17, 99, 17), (18, 99, 18), (19, 100, 19), (20, 100, 
20);
+    """
+
+    // Basic aggregate: result matches ClickHouse's 
exponentialMovingAverage(5)(v, t)
+    qt_basic """
+        select exponential_moving_average(5.0, v, t) from ema_test;
+    """
+
+    // Single-row: EMA of one value (v=95, t=1) with half_decay=1
+    // state={95,1}, sum_weights(1)=1/(1-0.5)=2, result=95/2=47.5
+    qt_single_row """
+        select exponential_moving_average(1.0, v, t) from ema_test where id = 
1;
+    """
+
+    // GROUP BY: two groups, group 1 has two rows, group 2 has one row
+    sql "drop table if exists ema_group_test;"
+    sql """
+        create table ema_group_test (
+            grp int,
+            v   double,
+            t   double
+        )
+        duplicate key (grp)
+        distributed by hash(grp) buckets 1
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into ema_group_test values
+            (1, 10, 1), (1, 20, 3), (2, 100, 1);
+    """
+    order_qt_group_by """
+        select grp, exponential_moving_average(1.0, v, t)
+        from ema_group_test group by grp order by grp;
+    """
+
+    // half_decay = 0 edge case
+    qt_half_decay_zero """
+        select exponential_moving_average(0.0, v, t) from ema_test where id = 
1;
+    """
+
+    // Negative values
+    sql "drop table if exists ema_neg_test;"
+    sql """
+        create table ema_neg_test (
+            id int,
+            v double,
+            t double
+        )
+        duplicate key (id)
+        distributed by hash(id) buckets 1
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into ema_neg_test values (1, -10, 1), (2, 10, 5);
+    """
+    qt_negative """
+        select exponential_moving_average(2.0, v, t) from ema_neg_test;
+    """
+
+    // NULL handling
+    sql "drop table if exists ema_null_test;"
+    sql """
+        create table ema_null_test (
+            id int,
+            v double null,
+            t double null
+        )
+        duplicate key (id)
+        distributed by hash(id) buckets 1
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into ema_null_test values (1, 10, 1), (2, null, 2), (3, 20, 3);
+    """
+    qt_null """
+        select exponential_moving_average(1.0, v, t) from ema_null_test;
+    """
+
+    // Empty result set
+    qt_empty """
+        select exponential_moving_average(1.0, v, t) from ema_null_test where 
v > 100;
+    """
+
+    // Duplicate times: same time values are summed directly
+    sql "drop table if exists ema_dup_time_test;"
+    sql """
+        create table ema_dup_time_test (
+            id int,
+            v double,
+            t double
+        )
+        duplicate key (id)
+        distributed by hash(id) buckets 1
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into ema_dup_time_test values (1, 10, 5), (2, 20, 5), (3, 30, 
10);
+    """
+    qt_dup_time """
+        select exponential_moving_average(1.0, v, t) from ema_dup_time_test;
+    """
+
+    // Cumulative window function
+    sql "drop table if exists ema_window_test;"
+    sql """
+        create table ema_window_test (
+            id int,
+            t double,
+            v double
+        )
+        duplicate key (id)
+        distributed by hash(id) buckets 1
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into ema_window_test values (1, 0, 10), (2, 1, 10), (3, 2, 10);
+    """
+    order_qt_window """
+        select t, exponential_moving_average(1.0, v, t)
+            over (order by t rows between unbounded preceding and current row) 
as ema
+        from ema_window_test order by t;
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to