This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1e75bfc1f4e [feature](be/fe) Add exponential_moving_average aggregate
function (#63499)
1e75bfc1f4e is described below
commit 1e75bfc1f4e6b744ebcf16fb0639248b10aedbb3
Author: HappenLee <[email protected]>
AuthorDate: Fri May 22 16:10:56 2026 +0800
[feature](be/fe) Add exponential_moving_average aggregate function (#63499)
Problem Summary:
Doris lacked an exponential moving average aggregate function. This
implements `exponential_moving_average(half_decay, value, time)` ported
from ClickHouse's ExponentialMovingAverage aggregate.
Algorithm:
- State stores (value, time, half_decay) as doubles.
- On add(): scale existing value to the new timestamp via
`2^(-dt/half_decay)`, then accumulate. Store half_decay in state so
it is available during merge().
- On merge(): advance both states to the later timestamp, then sum.
This is commutative and associative, so row order doesn't matter.
- Result: `value * (1 - 2^(-1/half_decay))` (normalised by sum of
weights).
Changes:
- BE: `aggregate_function_ema.h/cpp` — state struct + function class,
registered in `aggregate_function_simple_factory.cpp`.
- FE: `ExponentialMovingAverage.java` — 3-arg DOUBLE signature, visitor
method added to `AggregateFunctionVisitor`, registered in
`BuiltinAggregateFunctions`.
- Regression test: `query_p0/aggregate/exponential_moving_average/`.
### Release note
New aggregate function `exponential_moving_average(half_decay, value,
time)`
computes the exponential moving average over a stream of (value, time)
pairs with the given half-decay parameter.
### Check List (For Author)
- Test: Regression test added
(query_p0/aggregate/exponential_moving_average)
- Behavior changed: No (new function)
- Does this need documentation:
[docs](https://github.com/apache/doris-website/pull/3726)
---------
Co-authored-by: Copilot <[email protected]>
---
be/src/exprs/aggregate/aggregate_function_ema.cpp | 31 ++++
be/src/exprs/aggregate/aggregate_function_ema.h | 173 +++++++++++++++++++++
.../aggregate_function_simple_factory.cpp | 2 +
.../doris/catalog/BuiltinAggregateFunctions.java | 2 +
.../functions/agg/ExponentialMovingAverage.java | 122 +++++++++++++++
.../visitor/AggregateFunctionVisitor.java | 5 +
.../exponential_moving_average.out | 31 ++++
.../exponential_moving_average.groovy | 158 +++++++++++++++++++
8 files changed, 524 insertions(+)
diff --git a/be/src/exprs/aggregate/aggregate_function_ema.cpp
b/be/src/exprs/aggregate/aggregate_function_ema.cpp
new file mode 100644
index 00000000000..b3dde6c06bb
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_ema.cpp
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/aggregate/aggregate_function_ema.h"
+
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "exprs/aggregate/helpers.h"
+
+namespace doris {
+
+void register_aggregate_function_ema(AggregateFunctionSimpleFactory& factory) {
+ factory.register_function_both(
+ "exponential_moving_average",
+
creator_without_type::creator<AggregateFunctionExponentialMovingAverage>);
+}
+
+} // namespace doris
diff --git a/be/src/exprs/aggregate/aggregate_function_ema.h
b/be/src/exprs/aggregate/aggregate_function_ema.h
new file mode 100644
index 00000000000..edafb973bb0
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_ema.h
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file is adapted from
+//
https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionExponentialMovingAverage.cpp
+
+#pragma once
+
+#include <cmath>
+#include <memory>
+
+#include "core/assert_cast.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+#include "core/types.h"
+#include "exprs/aggregate/aggregate_function.h"
+
+namespace doris {
+class Arena;
+class BufferReadable;
+class BufferWritable;
+class IColumn;
+
+/**
+ * Exponentially smoothed moving average over time.
+ *
+ * Each value corresponds to a timeunit index. The half_decay parameter is the
+ * time lag at which exponential weights decay by one-half.
+ *
+ * State is a (value, time) pair representing the exponentially accumulated sum
+ * at a reference time. To get the average, divide by sumWeights(half_decay).
+ *
+ * Formula:
+ * scale(dt, x) = 2^(-dt/x)
+ * sumWeights(x) = 1 / (1 - 2^(-1/x))
+ * add(v, t): merge current state with point (v, t)
+ * merge(a, b): move both to the later time, then sum values
+ * get(): value / sumWeights(half_decay)
+ *
+ * Usage: exponential_moving_average(half_decay, value, timeunit)
+ * - half_decay: constant double, the half-life period in timeunit units
+ * - value: numeric column to average
+ * - timeunit: numeric time index (not raw timestamp; use intDiv if needed)
+ * Returns DOUBLE.
+ */
+struct ExponentialMovingAverageData {
+ double value = 0.0;
+ double time = 0.0;
+ double half_decay = 0.0;
+
+ static double scale(double time_passed, double hd) { return
std::exp2(-time_passed / hd); }
+
+ static double sum_weights(double hd) { return 1.0 / (1.0 - std::exp2(-1.0
/ hd)); }
+
+ void add(double new_value, double current_time, double hd) {
+ half_decay = hd;
+ ExponentialMovingAverageData other;
+ other.value = new_value;
+ other.time = current_time;
+ merge_point(other, hd);
+ }
+
+ void merge_point(const ExponentialMovingAverageData& other, double hd) {
+ if (time > other.time) {
+ value = value + other.value * scale(time - other.time, hd);
+ } else if (time < other.time) {
+ value = other.value + value * scale(other.time - time, hd);
+ time = other.time;
+ } else {
+ value = value + other.value;
+ }
+ }
+
+ void merge(const ExponentialMovingAverageData& rhs) {
+ double hd = half_decay != 0.0 ? half_decay : rhs.half_decay;
+ if (hd == 0.0) {
+ return;
+ }
+ half_decay = hd;
+ merge_point(rhs, hd);
+ }
+
+ double get() const {
+ if (half_decay == 0.0) {
+ return 0.0;
+ }
+ return value / sum_weights(half_decay);
+ }
+
+ void write(BufferWritable& buf) const {
+ buf.write_binary(value);
+ buf.write_binary(time);
+ buf.write_binary(half_decay);
+ }
+
+ void read(BufferReadable& buf) {
+ buf.read_binary(value);
+ buf.read_binary(time);
+ buf.read_binary(half_decay);
+ }
+
+ void reset() {
+ value = 0.0;
+ time = 0.0;
+ half_decay = 0.0;
+ }
+};
+
+class AggregateFunctionExponentialMovingAverage final
+ : public IAggregateFunctionDataHelper<ExponentialMovingAverageData,
+
AggregateFunctionExponentialMovingAverage>,
+ MultiExpression,
+ NullableAggregateFunction {
+public:
+ AggregateFunctionExponentialMovingAverage(const DataTypes& argument_types_)
+ : IAggregateFunctionDataHelper<ExponentialMovingAverageData,
+
AggregateFunctionExponentialMovingAverage>(
+ argument_types_) {}
+
+ String get_name() const override { return "exponential_moving_average"; }
+
+ DataTypePtr get_return_type() const override { return
std::make_shared<DataTypeFloat64>(); }
+
+ void reset(AggregateDataPtr __restrict place) const override {
this->data(place).reset(); }
+
+ void add(AggregateDataPtr __restrict place, const IColumn** columns,
ssize_t row_num,
+ Arena&) const override {
+ const double half_decay =
+ assert_cast<const ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(*columns[0])
+ .get_data()[row_num];
+ const double new_value =
+ assert_cast<const ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(*columns[1])
+ .get_data()[row_num];
+ const double current_time =
+ assert_cast<const ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(*columns[2])
+ .get_data()[row_num];
+ this->data(place).add(new_value, current_time, half_decay);
+ }
+
+ void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+ Arena&) const override {
+ this->data(place).merge(this->data(rhs));
+ }
+
+ void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
+ this->data(place).write(buf);
+ }
+
+ void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+ Arena&) const override {
+ this->data(place).read(buf);
+ }
+
+ void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
+
assert_cast<ColumnFloat64&>(to).get_data().push_back(this->data(place).get());
+ }
+};
+
+} // namespace doris
diff --git a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
index b9851bd4752..7083539de6b 100644
--- a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
@@ -79,6 +79,7 @@ void
register_aggregate_function_percentile_reservoir(AggregateFunctionSimpleFac
void register_aggregate_function_ai_agg(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_bool_union(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_sem(AggregateFunctionSimpleFactory& factory);
+void register_aggregate_function_ema(AggregateFunctionSimpleFactory& factory);
AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() {
static std::once_flag oc;
@@ -136,6 +137,7 @@ AggregateFunctionSimpleFactory&
AggregateFunctionSimpleFactory::instance() {
register_aggregate_function_ai_agg(instance);
register_aggregate_function_bool_union(instance);
register_aggregate_function_sem(instance);
+ register_aggregate_function_ema(instance);
// Register foreach and foreachv2 functions
register_aggregate_function_combinator_foreach(instance);
register_aggregate_function_combinator_foreachv2(instance);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
index 3104a89eb8d..aa136f3656a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
@@ -38,6 +38,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
import
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
import
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
import org.apache.doris.nereids.trees.expressions.functions.agg.GroupBitAnd;
@@ -139,6 +140,7 @@ public class BuiltinAggregateFunctions implements
FunctionHelper {
agg(CollectSet.class, "collect_set", "group_uniq_array"),
agg(Corr.class, "corr"),
agg(CorrWelford.class, "corr_welford"),
+ agg(ExponentialMovingAverage.class,
"exponential_moving_average"),
agg(Count.class, "count"),
agg(CountByEnum.class, "count_by_enum"),
agg(Covar.class, "covar", "covar_pop"),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java
new file mode 100644
index 00000000000..caeb3b82dc7
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ExponentialMovingAverage.java
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.agg;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.DoubleType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * Exponential Moving Average aggregate function.
+ *
+ * <p>Computes the exponentially smoothed moving average over time-indexed
values.
+ * The half_decay parameter controls the half-life period: the time after
which the
+ * exponential weight of a past value decays by a factor of 1/2.
+ *
+ * <p>Signature: {@code exponential_moving_average(half_decay DOUBLE, value
DOUBLE,
+ * timeunit DOUBLE) -> DOUBLE}
+ *
+ * <p>The timeunit argument is a numeric time index, not a raw timestamp. For
+ * timestamp columns use {@code intDiv(toUnixTimestamp(ts), interval_seconds)}.
+ */
+public class ExponentialMovingAverage extends NullableAggregateFunction
+ implements ExplicitlyCastableSignature {
+
+ public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
+ FunctionSignature.ret(DoubleType.INSTANCE)
+ .args(DoubleType.INSTANCE, DoubleType.INSTANCE,
DoubleType.INSTANCE)
+ );
+
+ /**
+ * Constructor with 3 arguments: (half_decay, value, timeunit).
+ */
+ public ExponentialMovingAverage(Expression halfDecay, Expression value,
Expression timeunit) {
+ this(false, halfDecay, value, timeunit);
+ }
+
+ /**
+ * Constructor with distinct flag and 3 arguments.
+ */
+ public ExponentialMovingAverage(boolean distinct, Expression halfDecay,
+ Expression value, Expression timeunit) {
+ this(distinct, false, halfDecay, value, timeunit);
+ }
+
+ /**
+ * Full constructor.
+ */
+ public ExponentialMovingAverage(boolean distinct, boolean alwaysNullable,
+ Expression halfDecay, Expression value, Expression timeunit) {
+ super("exponential_moving_average", distinct, alwaysNullable,
halfDecay, value, timeunit);
+ }
+
+ /** Constructor for withChildren and reuse signature. */
+ private ExponentialMovingAverage(NullableAggregateFunctionParams
functionParams) {
+ super(functionParams);
+ }
+
+ @Override
+ public void checkLegalityBeforeTypeCoercion() {
+ if (!getArgument(0).isConstant()) {
+ throw new AnalysisException("The half_decay argument of "
+ + getName() + " must be a constant");
+ }
+ if (!getArgumentType(0).isNumericType()) {
+ throw new AnalysisException("The half_decay argument of "
+ + getName() + " must be numeric");
+ }
+ if (!getArgumentType(1).isNumericType()) {
+ throw new AnalysisException("The value argument of "
+ + getName() + " must be numeric");
+ }
+ if (!getArgumentType(2).isNumericType()) {
+ throw new AnalysisException("The timeunit argument of "
+ + getName() + " must be numeric");
+ }
+ }
+
+ @Override
+ public ExponentialMovingAverage withDistinctAndChildren(boolean distinct,
+ List<Expression> children) {
+ Preconditions.checkArgument(children.size() == 3);
+ return new ExponentialMovingAverage(getFunctionParams(distinct,
children));
+ }
+
+ @Override
+ public ExponentialMovingAverage withAlwaysNullable(boolean alwaysNullable)
{
+ return new
ExponentialMovingAverage(getAlwaysNullableFunctionParams(alwaysNullable));
+ }
+
+ @Override
+ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+ return visitor.visitExponentialMovingAverage(this, context);
+ }
+
+ @Override
+ public List<FunctionSignature> getSignatures() {
+ return SIGNATURES;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
index 37bc0697268..2a09e907fc3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
@@ -39,6 +39,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
import
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
import
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
import org.apache.doris.nereids.trees.expressions.functions.agg.GroupBitAnd;
@@ -133,6 +134,10 @@ public interface AggregateFunctionVisitor<R, C> {
return visitAggregateFunction(bitmapAgg, context);
}
+ default R visitExponentialMovingAverage(ExponentialMovingAverage ema, C
context) {
+ return visitNullableAggregateFunction(ema, context);
+ }
+
default R visitBitmapIntersect(BitmapIntersect bitmapIntersect, C context)
{
return visitAggregateFunction(bitmapIntersect, context);
}
diff --git
a/regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out
b/regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out
new file mode 100644
index 00000000000..2ca841b794f
--- /dev/null
+++
b/regression-test/data/query_p0/aggregate/exponential_moving_average/exponential_moving_average.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !basic --
+92.25779635374204
+
+-- !single_row --
+47.5
+
+-- !group_by --
+1 11.25
+2 50
+
+-- !half_decay_zero --
+0
+
+-- !negative --
+2.196699141100893
+
+-- !null --
+11.25
+
+-- !empty --
+\N
+
+-- !dup_time --
+15.46875
+
+-- !window --
+0 5
+1 7.5
+2 8.75
+
diff --git
a/regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy
b/regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy
new file mode 100644
index 00000000000..18fb38bd2ec
--- /dev/null
+++
b/regression-test/suites/query_p0/aggregate/exponential_moving_average/exponential_moving_average.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("exponential_moving_average") {
+ // Prepare main test table (dropped first to keep the environment for
debugging)
+ sql "drop table if exists ema_test;"
+ sql """
+ create table ema_test (
+ id int,
+ v double,
+ t double
+ )
+ duplicate key (id)
+ distributed by hash(id) buckets 1
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into ema_test values
+ (1, 95, 1), (2, 95, 2), (3, 95, 3), (4, 96, 4), (5, 96, 5),
+ (6, 96, 6), (7, 96, 7), (8, 97, 8), (9, 97, 9), (10, 97, 10),
+ (11, 97, 11), (12, 98, 12), (13, 98, 13), (14, 98, 14), (15, 98,
15),
+ (16, 99, 16), (17, 99, 17), (18, 99, 18), (19, 100, 19), (20, 100,
20);
+ """
+
+ // Basic aggregate: result matches ClickHouse's
exponentialMovingAverage(5)(v, t)
+ qt_basic """
+ select exponential_moving_average(5.0, v, t) from ema_test;
+ """
+
+ // Single-row: EMA of one value (v=95, t=1) with half_decay=1
+ // state={95,1}, sum_weights(1)=1/(1-0.5)=2, result=95/2=47.5
+ qt_single_row """
+ select exponential_moving_average(1.0, v, t) from ema_test where id =
1;
+ """
+
+ // GROUP BY: two groups, group 1 has two rows, group 2 has one row
+ sql "drop table if exists ema_group_test;"
+ sql """
+ create table ema_group_test (
+ grp int,
+ v double,
+ t double
+ )
+ duplicate key (grp)
+ distributed by hash(grp) buckets 1
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into ema_group_test values
+ (1, 10, 1), (1, 20, 3), (2, 100, 1);
+ """
+ order_qt_group_by """
+ select grp, exponential_moving_average(1.0, v, t)
+ from ema_group_test group by grp order by grp;
+ """
+
+ // half_decay = 0 edge case
+ qt_half_decay_zero """
+ select exponential_moving_average(0.0, v, t) from ema_test where id =
1;
+ """
+
+ // Negative values
+ sql "drop table if exists ema_neg_test;"
+ sql """
+ create table ema_neg_test (
+ id int,
+ v double,
+ t double
+ )
+ duplicate key (id)
+ distributed by hash(id) buckets 1
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into ema_neg_test values (1, -10, 1), (2, 10, 5);
+ """
+ qt_negative """
+ select exponential_moving_average(2.0, v, t) from ema_neg_test;
+ """
+
+ // NULL handling
+ sql "drop table if exists ema_null_test;"
+ sql """
+ create table ema_null_test (
+ id int,
+ v double null,
+ t double null
+ )
+ duplicate key (id)
+ distributed by hash(id) buckets 1
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into ema_null_test values (1, 10, 1), (2, null, 2), (3, 20, 3);
+ """
+ qt_null """
+ select exponential_moving_average(1.0, v, t) from ema_null_test;
+ """
+
+ // Empty result set
+ qt_empty """
+ select exponential_moving_average(1.0, v, t) from ema_null_test where
v > 100;
+ """
+
+ // Duplicate times: same time values are summed directly
+ sql "drop table if exists ema_dup_time_test;"
+ sql """
+ create table ema_dup_time_test (
+ id int,
+ v double,
+ t double
+ )
+ duplicate key (id)
+ distributed by hash(id) buckets 1
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into ema_dup_time_test values (1, 10, 5), (2, 20, 5), (3, 30,
10);
+ """
+ qt_dup_time """
+ select exponential_moving_average(1.0, v, t) from ema_dup_time_test;
+ """
+
+ // Cumulative window function
+ sql "drop table if exists ema_window_test;"
+ sql """
+ create table ema_window_test (
+ id int,
+ t double,
+ v double
+ )
+ duplicate key (id)
+ distributed by hash(id) buckets 1
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into ema_window_test values (1, 0, 10), (2, 1, 10), (3, 2, 10);
+ """
+ order_qt_window """
+ select t, exponential_moving_average(1.0, v, t)
+ over (order by t rows between unbounded preceding and current row)
as ema
+ from ema_window_test order by t;
+ """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]