This is an automated email from the ASF dual-hosted git repository.
lancelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 13ff9c8cf01 [IOTDB-6244] Add built-in variance and standard deviation
aggregator. (#11581)
13ff9c8cf01 is described below
commit 13ff9c8cf013624bf6a680ea492014c37a2c3963
Author: Zhihao Shen <[email protected]>
AuthorDate: Tue Nov 21 15:02:05 2023 +0800
[IOTDB-6244] Add built-in variance and standard deviation aggregator.
(#11581)
* Finish basic variance population function.
* Finish variance population function, prepare to add more relevant
functions.
* Finish all variance and standard deviation functions.
* Add all functions' unit tests.
* Apply `mvn spotless:apply` to run IT separately.
* Add integration test of all variance aggregators.
* Rename method `removeInput` to `removeIntermediate`.
* Format all files and add license header to each new file.
* Add `removeIntermediate` method test.
* Apply `mvn spotless:apply` again for making a new PR.
* Change import order due to maven spotless plugin.
* Resolve name conflict with stddev in library-udf.
* Modify files according to the committer.
---
.../constant/BuiltinAggregationFunctionEnum.java | 6 +
.../apache/iotdb/itbase/constant/TestConstant.java | 24 +
.../db/it/aggregation/IoTDBAggregationIT.java | 10 +-
.../aggregation/IoTDBAggregationSmallDataIT.java | 4 +-
.../iotdb/db/it/aggregation/IoTDBVarianceIT.java | 733 +++++++++++++++++++++
.../iotdb/libudf/it/dprofile/DProfileIT.java | 27 -
.../java/org/apache/iotdb/tool/ExportTsFile.java | 8 +-
.../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java | 6 +
.../sql/factory/IoTDBDynamicTableFactory.java | 6 +
.../execution/aggregation/Accumulator.java | 9 +
.../execution/aggregation/AccumulatorFactory.java | 10 +
.../execution/aggregation/AvgAccumulator.java | 13 +
.../execution/aggregation/CountAccumulator.java | 9 +
.../execution/aggregation/SumAccumulator.java | 9 +
.../execution/aggregation/VarianceAccumulator.java | 273 ++++++++
.../slidingwindow/SlidingWindowAggregator.java | 34 -
.../SlidingWindowAggregatorFactory.java | 6 +
.../SmoothQueueSlidingWindowAggregator.java | 4 +-
.../db/queryengine/plan/parser/ASTVisitor.java | 6 +
.../plan/parameter/AggregationDescriptor.java | 18 +
.../org/apache/iotdb/db/utils/SchemaUtils.java | 24 +
.../apache/iotdb/db/utils/TypeInferenceUtils.java | 20 +-
.../iotdb/db/utils/constant/SqlConstant.java | 6 +
.../execution/aggregation/AccumulatorTest.java | 332 ++++++++++
.../udf/builtin/BuiltinAggregationFunction.java | 20 +-
.../thrift-commons/src/main/thrift/common.thrift | 8 +-
library-udf/src/assembly/tools/register-UDF.bat | 1 -
library-udf/src/assembly/tools/register-UDF.sh | 1 -
.../apache/iotdb/library/dprofile/UDAFStddev.java | 66 --
29 files changed, 1551 insertions(+), 142 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
index 6ee2bc49ba7..a5d73836715 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
@@ -31,6 +31,12 @@ public enum BuiltinAggregationFunctionEnum {
MIN_VALUE("min_value"),
EXTREME("extreme"),
FIRST_VALUE("first_value"),
+ STDDEV("stddev"),
+ STDDEV_POP("stddev_pop"),
+ STDDEV_SAMP("stddev_samp"),
+ VARIANCE("variance"),
+ VAR_POP("var_pop"),
+ VAR_SAMP("var_samp"),
LAST_VALUE("last_value"),
COUNT("count"),
AVG("avg"),
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/TestConstant.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/TestConstant.java
index 8ab1cc4f831..0300ada1561 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/TestConstant.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/TestConstant.java
@@ -119,6 +119,30 @@ public class TestConstant {
return String.format("mode(%s)", path);
}
+ public static String stddev(String path) {
+ return String.format("stddev(%s)", path);
+ }
+
+ public static String stddevPop(String path) {
+ return String.format("stddev_pop(%s)", path);
+ }
+
+ public static String stddevSamp(String path) {
+ return String.format("stddev_samp(%s)", path);
+ }
+
+ public static String variance(String path) {
+ return String.format("variance(%s)", path);
+ }
+
+ public static String varPop(String path) {
+ return String.format("var_pop(%s)", path);
+ }
+
+ public static String varSamp(String path) {
+ return String.format("var_samp(%s)", path);
+ }
+
public static String recordToInsert(TSRecord record) {
StringBuilder measurements = new StringBuilder();
StringBuilder values = new StringBuilder();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
index 2721ff595e1..563347bee48 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
@@ -715,7 +715,7 @@ public class IoTDBAggregationIT {
e.getMessage(),
e.getMessage()
.contains(
- "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE] only support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
}
try {
try (ResultSet resultSet =
@@ -728,7 +728,7 @@ public class IoTDBAggregationIT {
Assert.assertTrue(
e.getMessage()
.contains(
- "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE] only support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
}
try {
try (ResultSet resultSet =
@@ -741,7 +741,7 @@ public class IoTDBAggregationIT {
Assert.assertTrue(
e.getMessage()
.contains(
- "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE] only support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
}
try {
try (ResultSet resultSet =
@@ -755,7 +755,7 @@ public class IoTDBAggregationIT {
e.getMessage(),
e.getMessage()
.contains(
- "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE] only support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
}
try {
try (ResultSet resultSet =
@@ -768,7 +768,7 @@ public class IoTDBAggregationIT {
e.getMessage(),
e.getMessage()
.contains(
- "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE] only support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
}
} catch (Exception e) {
e.printStackTrace();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationSmallDataIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationSmallDataIT.java
index a6e9451f1f0..a08b42a8ff7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationSmallDataIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationSmallDataIT.java
@@ -212,7 +212,7 @@ public class IoTDBAggregationSmallDataIT {
Assert.assertTrue(
e.toString()
.contains(
- "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE] only support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
}
int cnt = 0;
@@ -251,7 +251,7 @@ public class IoTDBAggregationSmallDataIT {
Assert.assertTrue(
e.toString()
.contains(
- "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE] only support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
}
int cnt = 0;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBVarianceIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBVarianceIT.java
new file mode 100644
index 00000000000..10210e0f48e
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBVarianceIT.java
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.*;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+import static org.apache.iotdb.itbase.constant.TestConstant.*;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBVarianceIT {
+ protected static final String[] SQLs =
+ new String[] {
+ "CREATE DATABASE root.db",
+ "CREATE TIMESERIES root.db.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.db.d1.s2 WITH DATATYPE=INT64, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.db.d1.s3 WITH DATATYPE=BOOLEAN,
ENCODING=PLAIN",
+ "CREATE TIMESERIES root.db.d1.s4 WITH DATATYPE=FLOAT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.db.d1.s5 WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.db.d1.s6 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ // for group by level use
+ "CREATE TIMESERIES root.db.d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.db.d2.s2 WITH DATATYPE=INT64, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.db.d2.s4 WITH DATATYPE=FLOAT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.db.d2.s5 WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
+ "INSERT INTO root.db.d1(timestamp,s1,s2,s3,s4,s5,s6) values(1, 1, 1,
true, 1, 1, \"1\")",
+ "INSERT INTO root.db.d1(timestamp,s1,s2,s3,s4,s5,s6) values(2, 2, 2,
false, 2, 2, \"2\")",
+ "INSERT INTO root.db.d1(timestamp,s1,s2,s3,s4,s5,s6) values(3, 3, 2,
false, 3, 2, \"2\")",
+ "INSERT INTO root.db.d1(timestamp,s1,s2,s3,s4,s5,s6)
values(10000000000, 4, 1, true, 4, 1, \"1\")",
+ "INSERT INTO root.db.d1(timestamp,s1,s2,s3,s4,s5,s6)
values(10000000001, 5, 1, true, 5, 1, \"1\")",
+ "INSERT INTO root.db.d2(timestamp,s1,s2,s4,s5) values(1, 1, 2, 3, 4)",
+ "INSERT INTO root.db.d2(timestamp,s1,s2,s4,s5) values(2, 1, 2, 3, 4)",
+ "INSERT INTO root.db.d2(timestamp,s1,s2,s4,s5) values(10000000000, 1,
2, 3, 4)",
+ "INSERT INTO root.db.d2(timestamp,s1,s2,s4,s5) values(10000000001, 1,
2, 3, 4)",
+ "INSERT INTO root.db.d2(timestamp,s1,s2,s4,s5) values(10000000002, 1,
2, 3, 4)",
+ "flush"
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData(SQLs);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testAllVarianceWithUnsupportedTypes() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT stddev(s3)
FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT stddev(s6)
FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet =
+ statement.executeQuery("SELECT stddev_pop(s3) FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet =
+ statement.executeQuery("SELECT stddev_pop(s6) FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet =
+ statement.executeQuery("SELECT stddev_samp(s3) FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet =
+ statement.executeQuery("SELECT stddev_samp(s6) FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT variance(s3)
FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT variance(s6)
FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT var_pop(s3)
FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT var_pop(s6)
FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT var_samp(s3)
FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT var_samp(s6)
FROM root.db.d1")) {
+ resultSet.next();
+ fail();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE,
MAX_VALUE, STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]"));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStddevWithDifferentTypes() {
+ String[] expectedHeader =
+ new String[] {
+ stddev("root.db.d1.s1"),
+ stddev("root.db.d1.s2"),
+ stddev("root.db.d1.s4"),
+ stddev("root.db.d1.s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"1.5811388300841898,0.5477225575051661,1.5811388300841898,0.5477225575051661,"
+ };
+ resultSetEqualTest(
+ "select stddev(s1),stddev(s2),stddev(s4),stddev(s5) from root.db.d1",
+ expectedHeader,
+ retArray);
+
+ retArray = new String[] {"1.0,0.5773502691896257,1.0,0.5773502691896257,"};
+ resultSetEqualTest(
+ "select stddev(s1),stddev(s2),stddev(s4),stddev(s5) from root.db.d1
where time < 10",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevPopWithDifferentTypes() {
+ String[] expectedHeader =
+ new String[] {
+ stddevPop("root.db.d1.s1"),
+ stddevPop("root.db.d1.s2"),
+ stddevPop("root.db.d1.s4"),
+ stddevPop("root.db.d1.s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"1.4142135623730951,0.4898979485566356,1.4142135623730951,0.4898979485566356,"
+ };
+ resultSetEqualTest(
+ "select stddev_pop(s1),stddev_pop(s2),stddev_pop(s4),stddev_pop(s5)
from root.db.d1",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[]
{"0.816496580927726,0.4714045207910317,0.816496580927726,0.4714045207910317,"};
+ resultSetEqualTest(
+ "select stddev_pop(s1),stddev_pop(s2),stddev_pop(s4),stddev_pop(s5)
from root.db.d1 where time < 10",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevSampWithDifferentTypes() {
+ String[] expectedHeader =
+ new String[] {
+ stddevSamp("root.db.d1.s1"),
+ stddevSamp("root.db.d1.s2"),
+ stddevSamp("root.db.d1.s4"),
+ stddevSamp("root.db.d1.s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"1.5811388300841898,0.5477225575051661,1.5811388300841898,0.5477225575051661,"
+ };
+ resultSetEqualTest(
+ "select
stddev_samp(s1),stddev_samp(s2),stddev_samp(s4),stddev_samp(s5) from
root.db.d1",
+ expectedHeader,
+ retArray);
+
+ retArray = new String[] {"1.0,0.5773502691896257,1.0,0.5773502691896257,"};
+ resultSetEqualTest(
+ "select
stddev_samp(s1),stddev_samp(s2),stddev_samp(s4),stddev_samp(s5) from root.db.d1
where time < 10",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarianceWithDifferentTypes() {
+ String[] expectedHeader =
+ new String[] {
+ variance("root.db.d1.s1"),
+ variance("root.db.d1.s2"),
+ variance("root.db.d1.s4"),
+ variance("root.db.d1.s5"),
+ };
+ String[] retArray = new String[] {"2.5,0.3,2.5,0.3,"};
+ resultSetEqualTest(
+ "select variance(s1),variance(s2),variance(s4),variance(s5) from
root.db.d1",
+ expectedHeader,
+ retArray);
+
+ retArray = new String[] {"1.0,0.3333333333333333,1.0,0.3333333333333333,"};
+ resultSetEqualTest(
+ "select variance(s1),variance(s2),variance(s4),variance(s5) from
root.db.d1 where time < 10",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarPopWithDifferentTypes() {
+ String[] expectedHeader =
+ new String[] {
+ varPop("root.db.d1.s1"),
+ varPop("root.db.d1.s2"),
+ varPop("root.db.d1.s4"),
+ varPop("root.db.d1.s5"),
+ };
+ String[] retArray = new String[] {"2.0,0.24,2.0,0.24,"};
+ resultSetEqualTest(
+ "select var_pop(s1),var_pop(s2),var_pop(s4),var_pop(s5) from
root.db.d1",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+
"0.6666666666666666,0.2222222222222222,0.6666666666666666,0.2222222222222222,"
+ };
+ resultSetEqualTest(
+ "select var_pop(s1),var_pop(s2),var_pop(s4),var_pop(s5) from
root.db.d1 where time < 10",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarSampWithDifferentTypes() {
+ String[] expectedHeader =
+ new String[] {
+ varSamp("root.db.d1.s1"),
+ varSamp("root.db.d1.s2"),
+ varSamp("root.db.d1.s4"),
+ varSamp("root.db.d1.s5"),
+ };
+ String[] retArray = new String[] {"2.5,0.3,2.5,0.3,"};
+ resultSetEqualTest(
+ "select var_samp(s1),var_samp(s2),var_samp(s4),var_samp(s5) from
root.db.d1",
+ expectedHeader,
+ retArray);
+
+ retArray = new String[] {"1.0,0.3333333333333333,1.0,0.3333333333333333,"};
+ resultSetEqualTest(
+ "select var_samp(s1),var_samp(s2),var_samp(s4),var_samp(s5) from
root.db.d1 where time < 10",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevAlignByDevice() {
+ String[] expectedHeader =
+ new String[] {
+ DEVICE, stddev("s1"), stddev("s2"), stddev("s4"), stddev("s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"root.db.d1,1.5811388300841898,0.5477225575051661,1.5811388300841898,0.5477225575051661,"
+ };
+ resultSetEqualTest(
+ "select stddev(s1),stddev(s2),stddev(s4),stddev(s5) from root.db.d1
align by device",
+ expectedHeader,
+ retArray);
+
+ retArray = new String[]
{"root.db.d1,1.0,0.5773502691896257,1.0,0.5773502691896257,"};
+ resultSetEqualTest(
+ "select stddev(s1),stddev(s2),stddev(s4),stddev(s5) from root.db.d1
where time < 10 align by device",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevPopAlignByDevice() {
+ String[] expectedHeader =
+ new String[] {
+ DEVICE, stddevPop("s1"), stddevPop("s2"), stddevPop("s4"),
stddevPop("s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"root.db.d1,1.4142135623730951,0.4898979485566356,1.4142135623730951,0.4898979485566356,"
+ };
+ resultSetEqualTest(
+ "select stddev_pop(s1),stddev_pop(s2),stddev_pop(s4),stddev_pop(s5)
from root.db.d1 align by device",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+
"root.db.d1,0.816496580927726,0.4714045207910317,0.816496580927726,0.4714045207910317,"
+ };
+ resultSetEqualTest(
+ "select stddev_pop(s1),stddev_pop(s2),stddev_pop(s4),stddev_pop(s5)
from root.db.d1 where time < 10 align by device",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevSampAlignByDevice() {
+ String[] expectedHeader =
+ new String[] {
+ DEVICE, stddevSamp("s1"), stddevSamp("s2"), stddevSamp("s4"),
stddevSamp("s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"root.db.d1,1.5811388300841898,0.5477225575051661,1.5811388300841898,0.5477225575051661,"
+ };
+ resultSetEqualTest(
+ "select
stddev_samp(s1),stddev_samp(s2),stddev_samp(s4),stddev_samp(s5) from root.db.d1
align by device",
+ expectedHeader,
+ retArray);
+
+ retArray = new String[]
{"root.db.d1,1.0,0.5773502691896257,1.0,0.5773502691896257,"};
+ resultSetEqualTest(
+ "select
stddev_samp(s1),stddev_samp(s2),stddev_samp(s4),stddev_samp(s5) from root.db.d1
where time < 10 align by device",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarianceAlignByDevice() {
+ String[] expectedHeader =
+ new String[] {
+ DEVICE, variance("s1"), variance("s2"), variance("s4"),
variance("s5"),
+ };
+ String[] retArray = new String[] {"root.db.d1,2.5,0.3,2.5,0.3,"};
+ resultSetEqualTest(
+ "select variance(s1),variance(s2),variance(s4),variance(s5) from
root.db.d1 align by device",
+ expectedHeader,
+ retArray);
+
+ retArray = new String[]
{"root.db.d1,1.0,0.3333333333333333,1.0,0.3333333333333333,"};
+ resultSetEqualTest(
+ "select variance(s1),variance(s2),variance(s4),variance(s5) from
root.db.d1 where time < 10 align by device",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarPopAlignByDevice() {
+ String[] expectedHeader =
+ new String[] {
+ DEVICE, varPop("s1"), varPop("s2"), varPop("s4"), varPop("s5"),
+ };
+ String[] retArray = new String[] {"root.db.d1,2.0,0.24,2.0,0.24,"};
+ resultSetEqualTest(
+ "select var_pop(s1),var_pop(s2),var_pop(s4),var_pop(s5) from
root.db.d1 align by device",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+
"root.db.d1,0.6666666666666666,0.2222222222222222,0.6666666666666666,0.2222222222222222,"
+ };
+ resultSetEqualTest(
+ "select var_pop(s1),var_pop(s2),var_pop(s4),var_pop(s5) from
root.db.d1 where time < 10 align by device",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarSampAlignByDevice() {
+ String[] expectedHeader =
+ new String[] {
+ DEVICE, varSamp("s1"), varSamp("s2"), varSamp("s4"), varSamp("s5"),
+ };
+ String[] retArray = new String[] {"root.db.d1,2.5,0.3,2.5,0.3,"};
+ resultSetEqualTest(
+ "select var_samp(s1),var_samp(s2),var_samp(s4),var_samp(s5) from
root.db.d1 align by device",
+ expectedHeader,
+ retArray);
+
+ retArray = new String[]
{"root.db.d1,1.0,0.3333333333333333,1.0,0.3333333333333333,"};
+ resultSetEqualTest(
+ "select var_samp(s1),var_samp(s2),var_samp(s4),var_samp(s5) from
root.db.d1 where time < 10 align by device",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevInHaving() {
+ String[] expectedHeader = new String[] {stddev("root.db.d1.s1")};
+ String[] retArray = new String[] {"1.5811388300841898,"};
+ resultSetEqualTest(
+ "select stddev(s1) from root.db.d1 having stddev(s2)>0",
expectedHeader, retArray);
+ }
+
+ @Test
+ public void testStddevPopInHaving() {
+ String[] expectedHeader = new String[] {stddevPop("root.db.d1.s1")};
+ String[] retArray = new String[] {"1.4142135623730951,"};
+ resultSetEqualTest(
+ "select stddev_pop(s1) from root.db.d1 having stddev_pop(s2)>0",
expectedHeader, retArray);
+ }
+
+ @Test
+ public void testStddevSampInHaving() {
+ String[] expectedHeader = new String[] {stddevSamp("root.db.d1.s1")};
+ String[] retArray = new String[] {"1.5811388300841898,"};
+ resultSetEqualTest(
+ "select stddev_samp(s1) from root.db.d1 having stddev_samp(s2)>0",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarianceInHaving() {
+ String[] expectedHeader = new String[] {variance("root.db.d1.s1")};
+ String[] retArray = new String[] {"2.5,"};
+ resultSetEqualTest(
+ "select variance(s1) from root.db.d1 having variance(s2)>0",
expectedHeader, retArray);
+ }
+
+ @Test
+ public void testVarPopInHaving() {
+ String[] expectedHeader = new String[] {varPop("root.db.d1.s1")};
+ String[] retArray = new String[] {"2.0,"};
+ resultSetEqualTest(
+ "select var_pop(s1) from root.db.d1 having var_pop(s2)>0",
expectedHeader, retArray);
+ }
+
+ @Test
+ public void testVarSampInHaving() {
+ String[] expectedHeader = new String[] {varSamp("root.db.d1.s1")};
+ String[] retArray = new String[] {"2.5,"};
+ resultSetEqualTest(
+ "select var_samp(s1) from root.db.d1 having var_samp(s2)>0",
expectedHeader, retArray);
+ }
+
+ @Test
+ public void testStddevWithGroupByLevel() {
+ String[] expectedHeader =
+ new String[] {
+ stddev("root.*.*.s1"),
+ stddev("root.*.*.s2"),
+ stddev("root.*.*.s4"),
+ stddev("root.*.*.s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"1.4907119849998598,0.48304589153964794,1.0540925533894598,1.4181364924121764,"
+ };
+ resultSetEqualTest(
+ "select stddev(s1),stddev(s2),stddev(s4),stddev(s5) from root.db.*
group by level = 0",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevPopWithGroupByLevel() {
+ String[] expectedHeader =
+ new String[] {
+ stddevPop("root.*.*.s1"),
+ stddevPop("root.*.*.s2"),
+ stddevPop("root.*.*.s4"),
+ stddevPop("root.*.*.s5"),
+ };
+ String[] retArray =
+ new String[]
{"1.4142135623730951,0.45825756949558405,1.0,1.3453624047073711,"};
+ resultSetEqualTest(
+ "select stddev_pop(s1),stddev_pop(s2),stddev_pop(s4),stddev_pop(s5)
from root.db.* group by level = 0",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevSampWithGroupByLevel() {
+ String[] expectedHeader =
+ new String[] {
+ stddevSamp("root.*.*.s1"),
+ stddevSamp("root.*.*.s2"),
+ stddevSamp("root.*.*.s4"),
+ stddevSamp("root.*.*.s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"1.4907119849998598,0.48304589153964794,1.0540925533894598,1.4181364924121764,"
+ };
+ resultSetEqualTest(
+ "select
stddev_samp(s1),stddev_samp(s2),stddev_samp(s4),stddev_samp(s5) from root.db.*
group by level = 0",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarianceWithGroupByLevel() {
+ String[] expectedHeader =
+ new String[] {
+ variance("root.*.*.s1"),
+ variance("root.*.*.s2"),
+ variance("root.*.*.s4"),
+ variance("root.*.*.s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"2.2222222222222223,0.23333333333333334,1.1111111111111112,2.011111111111111,"
+ };
+ resultSetEqualTest(
+ "select variance(s1),variance(s2),variance(s4),variance(s5) from
root.db.* group by level = 0",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarPopWithGroupByLevel() {
+ String[] expectedHeader =
+ new String[] {
+ varPop("root.*.*.s1"),
+ varPop("root.*.*.s2"),
+ varPop("root.*.*.s4"),
+ varPop("root.*.*.s5"),
+ };
+ String[] retArray = new String[] {"2.0,0.21000000000000002,1.0,1.81,"};
+ resultSetEqualTest(
+ "select var_pop(s1),var_pop(s2),var_pop(s4),var_pop(s5) from root.db.*
group by level = 0",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarSampWithGroupByLevel() {
+ String[] expectedHeader =
+ new String[] {
+ varSamp("root.*.*.s1"),
+ varSamp("root.*.*.s2"),
+ varSamp("root.*.*.s4"),
+ varSamp("root.*.*.s5"),
+ };
+ String[] retArray =
+ new String[] {
+
"2.2222222222222223,0.23333333333333334,1.1111111111111112,2.011111111111111,"
+ };
+ resultSetEqualTest(
+ "select var_samp(s1),var_samp(s2),var_samp(s4),var_samp(s5) from
root.db.* group by level = 0",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevWithSlidingWindow() {
+ String[] expectedHeader = new String[] {TIMESTAMP_STR,
stddev("root.db.d1.s1")};
+ String[] retArray = new String[] {"1,1.0,", "3,null,"};
+ resultSetEqualTest(
+ "select stddev(s1) from root.db.d1 group by time([1,4),3ms,2ms)",
expectedHeader, retArray);
+ }
+
+ @Test
+ public void testStddevPopWithSlidingWindow() {
+ String[] expectedHeader = new String[] {TIMESTAMP_STR,
stddevPop("root.db.d1.s1")};
+ String[] retArray = new String[] {"1,0.816496580927726,", "3,0.0,"};
+ resultSetEqualTest(
+ "select stddev_pop(s1) from root.db.d1 group by time([1,4),3ms,2ms)",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testStddevSampWithSlidingWindow() {
+ String[] expectedHeader = new String[] {TIMESTAMP_STR,
stddevSamp("root.db.d1.s1")};
+ String[] retArray = new String[] {"1,1.0,", "3,null,"};
+ resultSetEqualTest(
+ "select stddev_samp(s1) from root.db.d1 group by time([1,4),3ms,2ms)",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarianceWithSlidingWindow() {
+ String[] expectedHeader = new String[] {TIMESTAMP_STR,
variance("root.db.d1.s1")};
+ String[] retArray = new String[] {"1,1.0,", "3,null,"};
+ resultSetEqualTest(
+ "select variance(s1) from root.db.d1 group by time([1,4),3ms,2ms)",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarPopWithSlidingWindow() {
+ String[] expectedHeader = new String[] {TIMESTAMP_STR,
varPop("root.db.d1.s1")};
+ String[] retArray = new String[] {"1,0.6666666666666666,", "3,0.0,"};
+ resultSetEqualTest(
+ "select var_pop(s1) from root.db.d1 group by time([1,4),3ms,2ms)",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testVarSampWithSlidingWindow() {
+ String[] expectedHeader = new String[] {TIMESTAMP_STR,
varSamp("root.db.d1.s1")};
+ String[] retArray = new String[] {"1,1.0,", "3,null,"};
+ resultSetEqualTest(
+ "select var_samp(s1) from root.db.d1 group by time([1,4),3ms,2ms)",
+ expectedHeader,
+ retArray);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/libudf/it/dprofile/DProfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/libudf/it/dprofile/DProfileIT.java
index 507dc7390e3..b686766613e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/libudf/it/dprofile/DProfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/libudf/it/dprofile/DProfileIT.java
@@ -136,7 +136,6 @@ public class DProfileIT {
"create function segment as
'org.apache.iotdb.library.dprofile.UDTFSegment'");
statement.execute("create function skew as
'org.apache.iotdb.library.dprofile.UDAFSkew'");
statement.execute("create function spread as
'org.apache.iotdb.library.dprofile.UDAFSpread'");
- statement.execute("create function stddev as
'org.apache.iotdb.library.dprofile.UDAFStddev'");
statement.execute("create function minmax as
'org.apache.iotdb.library.dprofile.UDTFMinMax'");
statement.execute("create function zscore as
'org.apache.iotdb.library.dprofile.UDTFZScore'");
statement.execute("create function spline as
'org.apache.iotdb.library.dprofile.UDTFSpline'");
@@ -368,32 +367,6 @@ public class DProfileIT {
}
}
- @Test
- public void testSddev1() {
- String sqlStr = "select stddev(d1.s2) from root.vehicle";
- try (Connection connection = EnvFactory.getEnv().getConnection();
- Statement statement = connection.createStatement()) {
- ResultSet resultSet = statement.executeQuery(sqlStr);
- resultSet.next();
- Object result = resultSet.getObject(2);
- } catch (SQLException throwable) {
- fail(throwable.getMessage());
- }
- }
-
- @Test
- public void testStddev2() {
- String sqlStr = "select stddev(d2.s2) from root.vehicle";
- try (Connection connection = EnvFactory.getEnv().getConnection();
- Statement statement = connection.createStatement()) {
- ResultSet resultSet = statement.executeQuery(sqlStr);
- resultSet.next();
- Object result = resultSet.getObject(2);
- } catch (SQLException throwable) {
- fail(throwable.getMessage());
- }
- }
-
@Test
public void testACF1() {
String sqlStr = "select acf(d2.s2) from root.vehicle";
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
index 4ebde4c3180..9c9329ead7a 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
@@ -172,7 +172,13 @@ public class ExportTsFile extends AbstractTsFileTool {
|| sqlLower.contains("first_value(")
|| sqlLower.contains("last_value(")
|| sqlLower.contains("max_time(")
- || sqlLower.contains("min_time(")) {
+ || sqlLower.contains("min_time(")
+ || sqlLower.contains("stddev(")
+ || sqlLower.contains("stddev_pop(")
+ || sqlLower.contains("stddev_samp(")
+ || sqlLower.contains("variance(")
+ || sqlLower.contains("var_pop(")
+ || sqlLower.contains("var_samp(")) {
IoTPrinter.println("The sql you entered is invalid, please don't use
aggregate query.");
System.exit(CODE_ERROR);
}
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index cf790b09fd3..01582f32d18 100644
---
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -259,6 +259,9 @@ public class IoTDBDatabaseMetadata implements
DatabaseMetaData {
"SET",
"SLIMIT",
"SOFFSET",
+ "STDDEV",
+ "STDDEV_POP",
+ "STDDEV_SAMP",
"STORAGE",
"SUM",
"SNAPPY",
@@ -286,6 +289,9 @@ public class IoTDBDatabaseMetadata implements
DatabaseMetaData {
"UNSET",
"UNCOMPRESSED",
"VALUES",
+ "VARIANCE",
+ "VAR_POP",
+ "VAR_SAMP",
"VERSION",
"WHERE",
"WITH",
diff --git
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
index a06ce44fc84..705f3c0ac86 100644
---
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
+++
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
@@ -201,6 +201,12 @@ public class IoTDBDynamicTableFactory
|| sqlLower.contains("last_value(")
|| sqlLower.contains("max_time(")
|| sqlLower.contains("min_time(")
+ || sqlLower.contains("stddev(")
+ || sqlLower.contains("stddev_pop(")
+ || sqlLower.contains("stddev_samp(")
+ || sqlLower.contains("variance(")
+ || sqlLower.contains("var_pop(")
+ || sqlLower.contains("var_samp(")
|| sqlLower.contains("group")
|| sqlLower.contains("where")
|| sqlLower.contains("create")
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
index 0a2eff2b221..75091443fd0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
@@ -40,6 +40,15 @@ public interface Accumulator {
*/
void addInput(Column[] column, BitMap bitMap, int lastIndex);
+ /**
+ * Sliding window constantly add and remove partial result in the window.
Aggregation functions
+ * need to implement this method to support sliding window feature.
+ */
+ default void removeIntermediate(Column[] partialResult) {
+ throw new UnsupportedOperationException(
+ "This type of accumulator does not support remove input!");
+ }
+
/**
* For aggregation function like COUNT, SUM, partialResult should be single;
But for AVG,
* last_value, it should be double column with dictionary order.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
index 9fdf2fbe31b..40d0a2d9bcd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
@@ -72,6 +72,16 @@ public class AccumulatorFactory {
return crateModeAccumulator(tsDataType);
case COUNT_TIME:
return new CountTimeAccumulator();
+ case STDDEV:
+ case STDDEV_SAMP:
+ return new VarianceAccumulator(tsDataType,
VarianceAccumulator.VarianceType.STDDEV_SAMP);
+ case STDDEV_POP:
+ return new VarianceAccumulator(tsDataType,
VarianceAccumulator.VarianceType.STDDEV_POP);
+ case VARIANCE:
+ case VAR_SAMP:
+ return new VarianceAccumulator(tsDataType,
VarianceAccumulator.VarianceType.VAR_SAMP);
+ case VAR_POP:
+ return new VarianceAccumulator(tsDataType,
VarianceAccumulator.VarianceType.VAR_POP);
default:
throw new IllegalArgumentException("Invalid Aggregation function: " +
aggregationType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
index e310976f589..53f6d50ed3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
@@ -78,6 +78,19 @@ public class AvgAccumulator implements Accumulator {
}
}
+ @Override
+ public void removeIntermediate(Column[] input) {
+ checkArgument(input.length == 2, "partialResult of Avg should be 2");
+ if (input[0].isNull(0)) {
+ return;
+ }
+ countValue -= input[0].getLong(0);
+ sumValue -= input[1].getDouble(0);
+ if (countValue == 0) {
+ initResult = false;
+ }
+ }
+
@Override
public void addStatistics(Statistics statistics) {
if (statistics == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
index 870b8fbb47b..f1783148514 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
@@ -66,6 +66,15 @@ public class CountAccumulator implements Accumulator {
countValue += partialResult[0].getLong(0);
}
+ @Override
+ public void removeIntermediate(Column[] input) {
+ checkArgument(input.length == 1, "input of Count should be 1");
+ if (input[0].isNull(0)) {
+ return;
+ }
+ countValue -= input[0].getLong(0);
+ }
+
@Override
public void addStatistics(Statistics statistics) {
if (statistics == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/SumAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/SumAccumulator.java
index 78e3305e57d..57ae7a56c8a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/SumAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/SumAccumulator.java
@@ -74,6 +74,15 @@ public class SumAccumulator implements Accumulator {
sumValue += partialResult[0].getDouble(0);
}
+ @Override
+ public void removeIntermediate(Column[] input) {
+ checkArgument(input.length == 1, "input of Sum should be 1");
+ if (input[0].isNull(0)) {
+ return;
+ }
+ sumValue -= input[0].getDouble(0);
+ }
+
@Override
public void addStatistics(Statistics statistics) {
if (statistics == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
new file mode 100644
index 00000000000..6b1d63ba7d6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class VarianceAccumulator implements Accumulator {
+ public enum VarianceType {
+ STDDEV_POP,
+ STDDEV_SAMP,
+ VAR_POP,
+ VAR_SAMP,
+ }
+
+ private final TSDataType seriesDataType;
+
+ private final VarianceType varianceType;
+
+ private long count;
+ private double mean;
+ private double m2;
+
+ public VarianceAccumulator(TSDataType seriesDataType, VarianceType
varianceType) {
+ this.seriesDataType = seriesDataType;
+ this.varianceType = varianceType;
+ }
+
+ @Override
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(column, bitMap, lastIndex);
+ return;
+ case INT64:
+ addLongInput(column, bitMap, lastIndex);
+ return;
+ case FLOAT:
+ addFloatInput(column, bitMap, lastIndex);
+ return;
+ case DOUBLE:
+ addDoubleInput(column, bitMap, lastIndex);
+ return;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in aggregation variance :
%s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 1, "partialResult of variance should
be 1");
+ if (partialResult[0].isNull(0)) {
+ return;
+ }
+ byte[] bytes = partialResult[0].getBinary(0).getValues();
+ long intermediateCount = BytesUtils.bytesToLong(bytes, Long.BYTES);
+ double intermediateMean = BytesUtils.bytesToDouble(bytes, Long.BYTES);
+ double intermediateM2 = BytesUtils.bytesToDouble(bytes, (Long.BYTES +
Double.BYTES));
+
+ long newCount = count + intermediateCount;
+ double newMean = ((intermediateCount * intermediateMean) + (count * mean))
/ newCount;
+ double delta = intermediateMean - mean;
+
+ m2 = m2 + intermediateM2 + delta * delta * intermediateCount * count /
newCount;
+ count = newCount;
+ mean = newMean;
+ }
+
+ @Override
+ public void removeIntermediate(Column[] input) {
+ checkArgument(input.length == 1, "Input of variance should be 1");
+ if (input[0].isNull(0)) {
+ return;
+ }
+ // Deserialize
+ byte[] bytes = input[0].getBinary(0).getValues();
+ long intermediateCount = BytesUtils.bytesToLong(bytes, Long.BYTES);
+ double intermediateMean = BytesUtils.bytesToDouble(bytes, Long.BYTES);
+ double intermediateM2 = BytesUtils.bytesToDouble(bytes, (Long.BYTES +
Double.BYTES));
+ // Remove from state
+ long newCount = count - intermediateCount;
+ double newMean = ((count * mean) - (intermediateCount * intermediateMean))
/ newCount;
+ double delta = intermediateMean - mean;
+
+ m2 = m2 - intermediateM2 - delta * delta * intermediateCount * count /
newCount;
+ count = newCount;
+ mean = newMean;
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ @Override
+ public void setFinal(Column finalResult) {
+ reset();
+ if (finalResult.isNull(0)) {
+ return;
+ }
+ count = 1;
+ double value = finalResult.getDouble(0);
+ mean = value;
+ m2 = value * value;
+ }
+
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 1, "partialResult of variance
should be 1");
+ if (count == 0) {
+ columnBuilders[0].appendNull();
+ } else {
+ byte[] bytes = serialize();
+ columnBuilders[0].writeBinary(new Binary(bytes));
+ }
+ }
+
+ private byte[] serialize() {
+ byte[] countBytes = BytesUtils.longToBytes(count);
+ byte[] meanBytes = BytesUtils.doubleToBytes(mean);
+ byte[] m2Bytes = BytesUtils.doubleToBytes(m2);
+
+ return BytesUtils.concatByteArrayList(Arrays.asList(countBytes, meanBytes,
m2Bytes));
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ switch (varianceType) {
+ case STDDEV_POP:
+ if (count == 0) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeDouble(Math.sqrt(m2 / count));
+ }
+ break;
+ case STDDEV_SAMP:
+ if (count < 2) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeDouble(Math.sqrt(m2 / (count - 1)));
+ }
+ break;
+ case VAR_POP:
+ if (count == 0) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeDouble(m2 / count);
+ }
+ break;
+ case VAR_SAMP:
+ if (count < 2) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.writeDouble(m2 / (count - 1));
+ }
+ break;
+ default:
+ throw new EnumConstantNotPresentException(VarianceType.class,
varianceType.name());
+ }
+ }
+
+ @Override
+ public void reset() {
+ count = 0;
+ mean = 0.0;
+ m2 = 0.0;
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.TEXT};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.DOUBLE;
+ }
+
+ private void addIntInput(Column[] columns, BitMap bitmap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitmap != null && !bitmap.isMarked(i)) {
+ continue;
+ }
+ if (!columns[1].isNull(i)) {
+ int value = columns[1].getInt(i);
+ count++;
+ double delta = value - mean;
+ mean += delta / count;
+ m2 += delta * (value - mean);
+ }
+ }
+ }
+
+ private void addLongInput(Column[] columns, BitMap bitmap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitmap != null && !bitmap.isMarked(i)) {
+ continue;
+ }
+ if (!columns[1].isNull(i)) {
+ long value = columns[1].getLong(i);
+ count++;
+ double delta = value - mean;
+ mean += delta / count;
+ m2 += delta * (value - mean);
+ }
+ }
+ }
+
+ private void addFloatInput(Column[] columns, BitMap bitmap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitmap != null && !bitmap.isMarked(i)) {
+ continue;
+ }
+ if (!columns[1].isNull(i)) {
+ float value = columns[1].getFloat(i);
+ count++;
+ double delta = value - mean;
+ mean += delta / count;
+ m2 += delta * (value - mean);
+ }
+ }
+ }
+
+ private void addDoubleInput(Column[] columns, BitMap bitmap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitmap != null && !bitmap.isMarked(i)) {
+ continue;
+ }
+ if (!columns[1].isNull(i)) {
+ double value = columns[1].getDouble(i);
+ count++;
+ double delta = value - mean;
+ mean += delta / count;
+ m2 += delta * (value - mean);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregator.java
index 8603db807a1..f54b1d5e9cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregator.java
@@ -23,13 +23,10 @@ import
org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import java.util.Arrays;
@@ -108,36 +105,5 @@ public abstract class SlidingWindowAggregator extends
Aggregator {
.map(Column::getDataType)
.collect(Collectors.toList());
}
-
- public Column[] opposite() {
- List<TSDataType> dataTypes = getDataTypes();
- TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(dataTypes);
- ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
- Column[] results = new Column[partialResultColumns.length];
- for (int i = 0; i < partialResultColumns.length; i++) {
- switch (dataTypes.get(i)) {
- case INT32:
- columnBuilders[i].writeInt(partialResultColumns[i].getInt(0) * -1);
- break;
- case INT64:
- columnBuilders[i].writeLong(partialResultColumns[i].getLong(0) *
-1);
- break;
- case FLOAT:
- columnBuilders[i].writeFloat(partialResultColumns[i].getFloat(0) *
-1);
- break;
- case DOUBLE:
- columnBuilders[i].writeDouble(partialResultColumns[i].getDouble(0)
* -1);
- break;
- case TEXT:
- case BOOLEAN:
- throw new UnSupportedDataTypeException(
- String.format("Unsupported data type in opposite : %s",
dataTypes.get(i)));
- default:
- throw new IllegalArgumentException("Unknown data type: " +
dataTypes.get(i));
- }
- results[i] = columnBuilders[i].build();
- }
- return results;
- }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
index 53fba75c504..ac01208d36f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
@@ -133,6 +133,12 @@ public class SlidingWindowAggregatorFactory {
case AVG:
case COUNT:
case COUNT_TIME:
+ case STDDEV:
+ case STDDEV_POP:
+ case STDDEV_SAMP:
+ case VARIANCE:
+ case VAR_POP:
+ case VAR_SAMP:
return new SmoothQueueSlidingWindowAggregator(accumulator,
inputLocationList, step);
case MAX_VALUE:
return new MonotonicQueueSlidingWindowAggregator(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
index 1c2db2b3d84..62295c2be58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
@@ -27,7 +27,7 @@ import java.util.List;
/**
* The aggregation result is calculated from all pre-aggregation results in
the currently maintained
- * queue when calculating the COUNT, SUM, and AVG.
+ * queue when calculating the COUNT, SUM, AVG and STDDEV/VARIANCE relevant
functions.
*/
public class SmoothQueueSlidingWindowAggregator extends
SlidingWindowAggregator {
public SmoothQueueSlidingWindowAggregator(
@@ -44,7 +44,7 @@ public class SmoothQueueSlidingWindowAggregator extends
SlidingWindowAggregator
}
while (!deque.isEmpty() &&
!curTimeRange.contains(deque.getFirst().getTime())) {
PartialAggregationResult partialResult = deque.removeFirst();
- this.accumulator.addIntermediate(partialResult.opposite());
+ this.accumulator.removeIntermediate(partialResult.getPartialResult());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index c7cc034e44c..52248ff8e87 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -2875,6 +2875,12 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
case SqlConstant.SUM:
case SqlConstant.TIME_DURATION:
case SqlConstant.MODE:
+ case SqlConstant.STDDEV:
+ case SqlConstant.STDDEV_POP:
+ case SqlConstant.STDDEV_SAMP:
+ case SqlConstant.VARIANCE:
+ case SqlConstant.VAR_POP:
+ case SqlConstant.VAR_SAMP:
checkFunctionExpressionInputSize(
functionExpression.getExpressionString(),
functionExpression.getExpressions().size(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index 680a546bc60..190d3d3767a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -145,6 +145,24 @@ public class AggregationDescriptor {
outputAggregationNames.add(SqlConstant.MAX_TIME);
outputAggregationNames.add(SqlConstant.MIN_TIME);
break;
+ case STDDEV:
+ outputAggregationNames.add(SqlConstant.STDDEV);
+ break;
+ case STDDEV_POP:
+ outputAggregationNames.add(SqlConstant.STDDEV_POP);
+ break;
+ case STDDEV_SAMP:
+ outputAggregationNames.add(SqlConstant.STDDEV_SAMP);
+ break;
+ case VARIANCE:
+ outputAggregationNames.add(SqlConstant.VARIANCE);
+ break;
+ case VAR_POP:
+ outputAggregationNames.add(SqlConstant.VAR_POP);
+ break;
+ case VAR_SAMP:
+ outputAggregationNames.add(SqlConstant.VAR_SAMP);
+ break;
default:
outputAggregationNames.add(aggregationFuncName);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index efae314ecaa..21a8e1070b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -124,6 +124,12 @@ public class SchemaUtils {
return TSDataType.INT64;
case SqlConstant.AVG:
case SqlConstant.SUM:
+ case SqlConstant.STDDEV:
+ case SqlConstant.STDDEV_POP:
+ case SqlConstant.STDDEV_SAMP:
+ case SqlConstant.VARIANCE:
+ case SqlConstant.VAR_POP:
+ case SqlConstant.VAR_SAMP:
return TSDataType.DOUBLE;
case SqlConstant.LAST_VALUE:
case SqlConstant.FIRST_VALUE:
@@ -157,6 +163,12 @@ public class SchemaUtils {
case COUNT_TIME:
case AVG:
case TIME_DURATION:
+ case STDDEV:
+ case STDDEV_POP:
+ case STDDEV_SAMP:
+ case VARIANCE:
+ case VAR_POP:
+ case VAR_SAMP:
return true;
default:
throw new IllegalArgumentException(
@@ -178,6 +190,18 @@ public class SchemaUtils {
return Collections.singletonList(TAggregationType.MIN_TIME);
case LAST_VALUE:
return Collections.singletonList(TAggregationType.MAX_TIME);
+ case STDDEV:
+ return Collections.singletonList(TAggregationType.STDDEV);
+ case STDDEV_POP:
+ return Collections.singletonList(TAggregationType.STDDEV_POP);
+ case STDDEV_SAMP:
+ return Collections.singletonList(TAggregationType.STDDEV_SAMP);
+ case VARIANCE:
+ return Collections.singletonList(TAggregationType.VARIANCE);
+ case VAR_POP:
+ return Collections.singletonList(TAggregationType.VAR_POP);
+ case VAR_SAMP:
+ return Collections.singletonList(TAggregationType.VAR_SAMP);
case AVG:
return Arrays.asList(TAggregationType.COUNT, TAggregationType.SUM);
case TIME_DURATION:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index 9d23177b20b..a214a27b353 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -145,6 +145,12 @@ public class TypeInferenceUtils {
return dataType;
case SqlConstant.AVG:
case SqlConstant.SUM:
+ case SqlConstant.STDDEV:
+ case SqlConstant.STDDEV_POP:
+ case SqlConstant.STDDEV_SAMP:
+ case SqlConstant.VARIANCE:
+ case SqlConstant.VAR_POP:
+ case SqlConstant.VAR_SAMP:
return TSDataType.DOUBLE;
default:
throw new IllegalArgumentException("Invalid Aggregation function: " +
aggrFuncName);
@@ -162,11 +168,17 @@ public class TypeInferenceUtils {
case SqlConstant.EXTREME:
case SqlConstant.MIN_VALUE:
case SqlConstant.MAX_VALUE:
+ case SqlConstant.STDDEV:
+ case SqlConstant.STDDEV_POP:
+ case SqlConstant.STDDEV_SAMP:
+ case SqlConstant.VARIANCE:
+ case SqlConstant.VAR_POP:
+ case SqlConstant.VAR_SAMP:
if (dataType.isNumeric()) {
return;
}
throw new SemanticException(
- "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE, MAX_VALUE]
only support numeric data types [INT32, INT64, FLOAT, DOUBLE]");
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE, MAX_VALUE,
STDDEV, STDDEV_POP, STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only support
numeric data types [INT32, INT64, FLOAT, DOUBLE]");
case SqlConstant.COUNT:
case SqlConstant.COUNT_TIME:
case SqlConstant.MIN_TIME:
@@ -213,6 +225,12 @@ public class TypeInferenceUtils {
case SqlConstant.LAST_VALUE:
case SqlConstant.TIME_DURATION:
case SqlConstant.MODE:
+ case SqlConstant.STDDEV:
+ case SqlConstant.STDDEV_POP:
+ case SqlConstant.STDDEV_SAMP:
+ case SqlConstant.VARIANCE:
+ case SqlConstant.VAR_POP:
+ case SqlConstant.VAR_SAMP:
return;
case SqlConstant.COUNT_IF:
Expression keepExpression = inputExpressions.get(1);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
index afd47cc8c51..75d8a04aac1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
@@ -56,6 +56,12 @@ public class SqlConstant {
public static final String COUNT_IF = "count_if";
public static final String TIME_DURATION = "time_duration";
public static final String MODE = "mode";
+ public static final String STDDEV = "stddev";
+ public static final String STDDEV_POP = "stddev_pop";
+ public static final String STDDEV_SAMP = "stddev_samp";
+ public static final String VARIANCE = "variance";
+ public static final String VAR_POP = "var_pop";
+ public static final String VAR_SAMP = "var_samp";
public static final String COUNT_TIME = "count_time";
public static final String COUNT_TIME_HEADER = "count_time(*)";
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
index c5458553b00..941a39dcb53 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
@@ -25,11 +25,14 @@ import
org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.junit.Assert;
import org.junit.Before;
@@ -112,6 +115,13 @@ public class AccumulatorTest {
avgAccumulator.outputFinal(finalResult);
Assert.assertEquals(49.5d, finalResult.build().getDouble(0), 0.001);
+ // test remove partial result interface
+ avgAccumulator.removeIntermediate(
+ new Column[] {intermediateResult[0].build(),
intermediateResult[1].build()});
+ finalResult = new DoubleColumnBuilder(null, 1);
+ avgAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(49.5d, finalResult.build().getDouble(0), 0.001);
+
avgAccumulator.reset();
avgAccumulator.addStatistics(statistics);
finalResult = new DoubleColumnBuilder(null, 1);
@@ -152,6 +162,12 @@ public class AccumulatorTest {
countAccumulator.outputFinal(finalResult);
Assert.assertEquals(200, finalResult.build().getLong(0));
+ // test remove partial result interface
+ countAccumulator.removeIntermediate(new Column[]
{intermediateResult[0].build()});
+ finalResult = new LongColumnBuilder(null, 1);
+ countAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100, finalResult.build().getLong(0), 0.001);
+
countAccumulator.reset();
countAccumulator.addStatistics(statistics);
finalResult = new LongColumnBuilder(null, 1);
@@ -526,10 +542,326 @@ public class AccumulatorTest {
sumAccumulator.outputFinal(finalResult);
Assert.assertEquals(9900d, finalResult.build().getDouble(0), 0.001);
+ // test remove partial result interface
+ sumAccumulator.removeIntermediate(new Column[]
{intermediateResult[0].build()});
+ finalResult = new DoubleColumnBuilder(null, 1);
+ sumAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(4950d, finalResult.build().getDouble(0), 0.001);
+
sumAccumulator.reset();
sumAccumulator.addStatistics(statistics);
finalResult = new DoubleColumnBuilder(null, 1);
sumAccumulator.outputFinal(finalResult);
Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
}
+
+ @Test
+ public void stddevAccumulatorTest() {
+ Accumulator stddevAccumulator =
+ AccumulatorFactory.createAccumulator(
+ TAggregationType.STDDEV,
+ TSDataType.DOUBLE,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true);
+ // check intermediate type and final type
+ Assert.assertEquals(TSDataType.TEXT,
stddevAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE, stddevAccumulator.getFinalType());
+ // check returning null when no data
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ stddevAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ stddevAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+ // check returning null when no enough data
+ intermediateResult[0].writeBinary(new Binary(new byte[0]));
+ stddevAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ finalResult = new DoubleColumnBuilder(null, 1);
+ stddevAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ stddevAccumulator.addInput(timeAndValueColumn, null,
rawData.getPositionCount() - 1);
+ Assert.assertFalse(stddevAccumulator.hasFinalResult());
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ stddevAccumulator.outputIntermediate(intermediateResult);
+ byte[] result = intermediateResult[0].build().getBinary(0).getValues();
+ Assert.assertEquals(100, BytesUtils.bytesToLong(result, Long.BYTES));
+ Assert.assertEquals(49.50, BytesUtils.bytesToDouble(result, Long.BYTES),
0.001);
+ Assert.assertEquals(
+ 83325, BytesUtils.bytesToDouble(result, (Long.BYTES + Double.BYTES)),
0.001);
+
+ stddevAccumulator.addIntermediate(
+ new Column[] {
+ intermediateResult[0].build(),
+ });
+ finalResult = new DoubleColumnBuilder(null, 1);
+ stddevAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(28.938, finalResult.build().getDouble(0), 0.001);
+
+ // test remove partial result interface
+ stddevAccumulator.removeIntermediate(new Column[]
{intermediateResult[0].build()});
+ finalResult = new DoubleColumnBuilder(null, 1);
+ stddevAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(29.011491975882016, finalResult.build().getDouble(0),
0.001);
+ }
+
+ @Test
+ public void stddevPopAccumulatorTest() {
+ Accumulator stddevPopAccumulator =
+ AccumulatorFactory.createAccumulator(
+ TAggregationType.STDDEV_POP,
+ TSDataType.DOUBLE,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true);
+ // check intermediate type and final type
+ Assert.assertEquals(TSDataType.TEXT,
stddevPopAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE,
stddevPopAccumulator.getFinalType());
+ // check returning null when no data
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ stddevPopAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ stddevPopAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ stddevPopAccumulator.addInput(timeAndValueColumn, null,
rawData.getPositionCount() - 1);
+ Assert.assertFalse(stddevPopAccumulator.hasFinalResult());
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ stddevPopAccumulator.outputIntermediate(intermediateResult);
+ byte[] result = intermediateResult[0].build().getBinary(0).getValues();
+ Assert.assertEquals(100, BytesUtils.bytesToLong(result, Long.BYTES));
+ Assert.assertEquals(49.50, BytesUtils.bytesToDouble(result, Long.BYTES),
0.001);
+ Assert.assertEquals(
+ 83325, BytesUtils.bytesToDouble(result, (Long.BYTES + Double.BYTES)),
0.001);
+
+ stddevPopAccumulator.addIntermediate(
+ new Column[] {
+ intermediateResult[0].build(),
+ });
+ finalResult = new DoubleColumnBuilder(null, 1);
+ stddevPopAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(28.866, finalResult.build().getDouble(0), 0.001);
+
+ // test remove partial result interface
+ stddevPopAccumulator.removeIntermediate(new Column[]
{intermediateResult[0].build()});
+ finalResult = new DoubleColumnBuilder(null, 1);
+ stddevPopAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(28.86607004772212, finalResult.build().getDouble(0),
0.001);
+ }
+
+ @Test
+ public void stddevSampAccumulatorTest() {
+ Accumulator stddevSampAccumulator =
+ AccumulatorFactory.createAccumulator(
+ TAggregationType.STDDEV_SAMP,
+ TSDataType.DOUBLE,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true);
+ // check intermediate type and final type
+ Assert.assertEquals(TSDataType.TEXT,
stddevSampAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE,
stddevSampAccumulator.getFinalType());
+ // check returning null when no data
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ stddevSampAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ stddevSampAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+ // check returning null when no enough data
+ intermediateResult[0].writeBinary(new Binary(new byte[0]));
+ stddevSampAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ finalResult = new DoubleColumnBuilder(null, 1);
+ stddevSampAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ stddevSampAccumulator.addInput(timeAndValueColumn, null,
rawData.getPositionCount() - 1);
+ Assert.assertFalse(stddevSampAccumulator.hasFinalResult());
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ stddevSampAccumulator.outputIntermediate(intermediateResult);
+ byte[] result = intermediateResult[0].build().getBinary(0).getValues();
+ Assert.assertEquals(100, BytesUtils.bytesToLong(result, Long.BYTES));
+ Assert.assertEquals(49.50, BytesUtils.bytesToDouble(result, Long.BYTES),
0.001);
+ Assert.assertEquals(
+ 83325, BytesUtils.bytesToDouble(result, (Long.BYTES + Double.BYTES)),
0.001);
+
+ stddevSampAccumulator.addIntermediate(
+ new Column[] {
+ intermediateResult[0].build(),
+ });
+ finalResult = new DoubleColumnBuilder(null, 1);
+ stddevSampAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(28.938, finalResult.build().getDouble(0), 0.001);
+
+ // test remove partial result interface
+ stddevSampAccumulator.removeIntermediate(new Column[]
{intermediateResult[0].build()});
+ finalResult = new DoubleColumnBuilder(null, 1);
+ stddevSampAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(29.011491975882016, finalResult.build().getDouble(0),
0.001);
+ }
+
+ @Test
+ public void varianceAccumulatorTest() {
+ Accumulator varianceAccumulator =
+ AccumulatorFactory.createAccumulator(
+ TAggregationType.VARIANCE,
+ TSDataType.DOUBLE,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true);
+ // check intermediate type and final type
+ Assert.assertEquals(TSDataType.TEXT,
varianceAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE, varianceAccumulator.getFinalType());
+ // check returning null when no data
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ varianceAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ varianceAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+ // check returning null when no enough data
+ intermediateResult[0].writeBinary(new Binary(new byte[0]));
+ varianceAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ finalResult = new DoubleColumnBuilder(null, 1);
+ varianceAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ varianceAccumulator.addInput(timeAndValueColumn, null,
rawData.getPositionCount() - 1);
+ Assert.assertFalse(varianceAccumulator.hasFinalResult());
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ varianceAccumulator.outputIntermediate(intermediateResult);
+ byte[] result = intermediateResult[0].build().getBinary(0).getValues();
+ Assert.assertEquals(100, BytesUtils.bytesToLong(result, Long.BYTES));
+ Assert.assertEquals(49.50, BytesUtils.bytesToDouble(result, Long.BYTES),
0.001);
+ Assert.assertEquals(
+ 83325, BytesUtils.bytesToDouble(result, (Long.BYTES + Double.BYTES)),
0.001);
+
+ varianceAccumulator.addIntermediate(
+ new Column[] {
+ intermediateResult[0].build(),
+ });
+ finalResult = new DoubleColumnBuilder(null, 1);
+ varianceAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(837.437, finalResult.build().getDouble(0), 0.001);
+
+ // test remove partial result interface
+ varianceAccumulator.removeIntermediate(new Column[]
{intermediateResult[0].build()});
+ finalResult = new DoubleColumnBuilder(null, 1);
+ varianceAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(841.6666666666666, finalResult.build().getDouble(0),
0.001);
+ }
+
+ @Test
+ public void varPopAccumulatorTest() {
+ Accumulator varPopAccumulator =
+ AccumulatorFactory.createAccumulator(
+ TAggregationType.VAR_POP,
+ TSDataType.DOUBLE,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true);
+ // check intermediate type and final type
+ Assert.assertEquals(TSDataType.TEXT,
varPopAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE, varPopAccumulator.getFinalType());
+ // check returning null when no data
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ varPopAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ varPopAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ varPopAccumulator.addInput(timeAndValueColumn, null,
rawData.getPositionCount() - 1);
+ Assert.assertFalse(varPopAccumulator.hasFinalResult());
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ varPopAccumulator.outputIntermediate(intermediateResult);
+ byte[] result = intermediateResult[0].build().getBinary(0).getValues();
+ Assert.assertEquals(100, BytesUtils.bytesToLong(result, Long.BYTES));
+ Assert.assertEquals(49.50, BytesUtils.bytesToDouble(result, Long.BYTES),
0.001);
+ Assert.assertEquals(
+ 83325, BytesUtils.bytesToDouble(result, (Long.BYTES + Double.BYTES)),
0.001);
+
+ varPopAccumulator.addIntermediate(
+ new Column[] {
+ intermediateResult[0].build(),
+ });
+ finalResult = new DoubleColumnBuilder(null, 1);
+ varPopAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(833.25, finalResult.build().getDouble(0), 0.001);
+
+ // test remove partial result interface
+ varPopAccumulator.removeIntermediate(new Column[]
{intermediateResult[0].build()});
+ finalResult = new DoubleColumnBuilder(null, 1);
+ varPopAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(833.25, finalResult.build().getDouble(0), 0.001);
+ }
+
+ @Test
+ public void varSampAccumulatorTest() {
+ Accumulator varSampAccumulator =
+ AccumulatorFactory.createAccumulator(
+ TAggregationType.VAR_SAMP,
+ TSDataType.DOUBLE,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true);
+ // check intermediate type and final type
+ Assert.assertEquals(TSDataType.TEXT,
varSampAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE, varSampAccumulator.getFinalType());
+ // check returning null when no data
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ varSampAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ varSampAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+ // check returning null when no enough data
+ intermediateResult[0].writeBinary(new Binary(new byte[0]));
+ varSampAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertTrue(intermediateResult[0].build().isNull(0));
+ finalResult = new DoubleColumnBuilder(null, 1);
+ varSampAccumulator.outputFinal(finalResult);
+ Assert.assertTrue(finalResult.build().isNull(0));
+
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ varSampAccumulator.addInput(timeAndValueColumn, null,
rawData.getPositionCount() - 1);
+ Assert.assertFalse(varSampAccumulator.hasFinalResult());
+ intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+ varSampAccumulator.outputIntermediate(intermediateResult);
+ byte[] result = intermediateResult[0].build().getBinary(0).getValues();
+ Assert.assertEquals(100, BytesUtils.bytesToLong(result, Long.BYTES));
+ Assert.assertEquals(49.50, BytesUtils.bytesToDouble(result, Long.BYTES),
0.001);
+ Assert.assertEquals(
+ 83325, BytesUtils.bytesToDouble(result, (Long.BYTES + Double.BYTES)),
0.001);
+
+ varSampAccumulator.addIntermediate(
+ new Column[] {
+ intermediateResult[0].build(),
+ });
+ finalResult = new DoubleColumnBuilder(null, 1);
+ varSampAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(837.437, finalResult.build().getDouble(0), 0.001);
+
+ // test remove partial result interface
+ varSampAccumulator.removeIntermediate(new Column[]
{intermediateResult[0].build()});
+ finalResult = new DoubleColumnBuilder(null, 1);
+ varSampAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(841.6666666666666, finalResult.build().getDouble(0),
0.001);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
index 1afda7f2a53..8a4f31baa70 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
@@ -38,7 +38,13 @@ public enum BuiltinAggregationFunction {
COUNT_IF("count_if"),
TIME_DURATION("time_duration"),
MODE("mode"),
- COUNT_TIME("count_time");
+ COUNT_TIME("count_time"),
+ STDDEV("stddev"),
+ STDDEV_POP("stddev_pop"),
+ STDDEV_SAMP("stddev_samp"),
+ VARIANCE("variance"),
+ VAR_POP("var_pop"),
+ VAR_SAMP("var_samp");
private final String functionName;
@@ -79,6 +85,12 @@ public enum BuiltinAggregationFunction {
case "count_if":
case "mode":
case "count_time":
+ case "stddev":
+ case "stddev_pop":
+ case "stddev_samp":
+ case "variance":
+ case "var_pop":
+ case "var_samp":
return false;
default:
throw new IllegalArgumentException("Invalid Aggregation function: " +
name);
@@ -103,6 +115,12 @@ public enum BuiltinAggregationFunction {
case "sum":
case "time_duration":
case "mode":
+ case "stddev":
+ case "stddev_pop":
+ case "stddev_samp":
+ case "variance":
+ case "var_pop":
+ case "var_samp":
return true;
case "count_if":
case "count_time":
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index bb4e7135509..dbdb8bd406f 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -190,5 +190,11 @@ enum TAggregationType {
COUNT_IF,
TIME_DURATION,
MODE,
- COUNT_TIME
+ COUNT_TIME,
+ STDDEV,
+ STDDEV_POP,
+ STDDEV_SAMP,
+ VARIANCE,
+ VAR_POP,
+ VAR_SAMP
}
\ No newline at end of file
diff --git a/library-udf/src/assembly/tools/register-UDF.bat
b/library-udf/src/assembly/tools/register-UDF.bat
index 69a5dd20a3e..d5276ff2a68 100644
--- a/library-udf/src/assembly/tools/register-UDF.bat
+++ b/library-udf/src/assembly/tools/register-UDF.bat
@@ -40,7 +40,6 @@ call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user%
-pw %pass% -e "creat
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e
"create function segment as 'org.apache.iotdb.library.dprofile.UDTFSegment'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e
"create function skew as 'org.apache.iotdb.library.dprofile.UDAFSkew'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e
"create function spread as 'org.apache.iotdb.library.dprofile.UDAFSpread'"
-call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e
"create function stddev as 'org.apache.iotdb.library.dprofile.UDAFStddev'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e
"create function minmax as 'org.apache.iotdb.library.dprofile.UDTFMinMax'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e
"create function zscore as 'org.apache.iotdb.library.dprofile.UDTFZScore'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e
"create function spline as 'org.apache.iotdb.library.dprofile.UDTFSpline'"
diff --git a/library-udf/src/assembly/tools/register-UDF.sh
b/library-udf/src/assembly/tools/register-UDF.sh
index aa7242a98c3..bb65b496dee 100755
--- a/library-udf/src/assembly/tools/register-UDF.sh
+++ b/library-udf/src/assembly/tools/register-UDF.sh
@@ -41,7 +41,6 @@ pass=root
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create
function segment as 'org.apache.iotdb.library.dprofile.UDTFSegment'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create
function skew as 'org.apache.iotdb.library.dprofile.UDAFSkew'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create
function spread as 'org.apache.iotdb.library.dprofile.UDAFSpread'"
-../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create
function stddev as 'org.apache.iotdb.library.dprofile.UDAFStddev'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create
function minmax as 'org.apache.iotdb.library.dprofile.UDTFMinMax'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create
function zscore as 'org.apache.iotdb.library.dprofile.UDTFZScore'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create
function spline as 'org.apache.iotdb.library.dprofile.UDTFSpline'"
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDAFStddev.java
b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDAFStddev.java
deleted file mode 100644
index f2c05968d10..00000000000
---
a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDAFStddev.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.library.dprofile;
-
-import org.apache.iotdb.library.util.Util;
-import org.apache.iotdb.udf.api.UDTF;
-import org.apache.iotdb.udf.api.access.Row;
-import org.apache.iotdb.udf.api.collector.PointCollector;
-import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
-import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
-import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
-import org.apache.iotdb.udf.api.type.Type;
-
-/** This function is used to calculate the population standard deviation. */
-public class UDAFStddev implements UDTF {
- private long count = 0;
- private double sumX2 = 0.0;
- private double sumX1 = 0.0;
-
- @Override
- public void validate(UDFParameterValidator validator) throws Exception {
- validator
- .validateInputSeriesNumber(1)
- .validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT,
Type.DOUBLE);
- }
-
- @Override
- public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations)
- throws Exception {
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
- }
-
- @Override
- public void transform(Row row, PointCollector collector) throws Exception {
- double value = Util.getValueAsDouble(row);
- if (Double.isFinite(value)) {
- this.count++;
- this.sumX1 += value;
- this.sumX2 += value * value;
- }
- }
-
- @Override
- public void terminate(PointCollector collector) throws Exception {
- collector.putDouble(
- 0, Math.sqrt(this.sumX2 / this.count - Math.pow(this.sumX1 /
this.count, 2)));
- }
-}