This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch min_by in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c64a5c037c8cc978b10678b74ac9e093809fe523 Author: lancelly <[email protected]> AuthorDate: Thu Feb 22 19:26:00 2024 +0800 minby --- .../constant/BuiltinAggregationFunctionEnum.java | 3 +- .../db/it/aggregation/IoTDBAggregationIT.java | 52 +++ .../db/it/aggregation/maxby/IoTDBMaxByIT.java | 3 +- .../db/it/aggregation/minby/IoTDBMinBy2IT.java | 41 +++ .../db/it/aggregation/minby/IoTDBMinBy3IT.java | 48 +++ .../minby/IoTDBMinByAlignedSeriesIT.java | 71 ++++ .../IoTDBMaxByIT.java => minby/IoTDBMinByIT.java} | 131 ++++--- .../java/org/apache/iotdb/tool/ExportTsFile.java | 3 +- .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java | 1 + .../sql/factory/IoTDBDynamicTableFactory.java | 1 + .../execution/aggregation/AccumulatorFactory.java | 4 + .../execution/aggregation/MaxByAccumulator.java | 404 +-------------------- ...cumulator.java => MaxMinByBaseAccumulator.java} | 63 ++-- .../execution/aggregation/MinByAccumulator.java | 48 +++ .../SlidingWindowAggregatorFactory.java | 34 ++ .../plan/analyze/ExpressionTypeAnalyzer.java | 1 + .../db/queryengine/plan/parser/ASTVisitor.java | 1 + .../plan/parameter/AggregationDescriptor.java | 3 + .../org/apache/iotdb/db/utils/SchemaUtils.java | 5 + .../apache/iotdb/db/utils/TypeInferenceUtils.java | 3 + .../iotdb/db/utils/constant/SqlConstant.java | 1 + .../iotdb/db/utils/constant/TestConstant.java | 4 + .../execution/aggregation/AccumulatorTest.java | 33 ++ .../udf/builtin/BuiltinAggregationFunction.java | 5 +- .../thrift-commons/src/main/thrift/common.thrift | 1 + 25 files changed, 476 insertions(+), 488 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java index bf41380cfe4..7c2c283d30e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java @@ -41,7 +41,8 @@ public enum BuiltinAggregationFunctionEnum { COUNT("count"), AVG("avg"), SUM("sum"), - MAX_BY("max_by"); + MAX_BY("max_by"), + MIN_BY("min_by"); private final String functionName; diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java index b209d05b3a5..aff7582bf26 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java @@ -46,6 +46,7 @@ import static org.apache.iotdb.db.utils.constant.TestConstant.lastValue; import static org.apache.iotdb.db.utils.constant.TestConstant.maxBy; import static org.apache.iotdb.db.utils.constant.TestConstant.maxTime; import static org.apache.iotdb.db.utils.constant.TestConstant.maxValue; +import static org.apache.iotdb.db.utils.constant.TestConstant.minBy; import static org.apache.iotdb.db.utils.constant.TestConstant.minTime; import static org.apache.iotdb.db.utils.constant.TestConstant.minValue; import static org.apache.iotdb.db.utils.constant.TestConstant.sum; @@ -1033,4 +1034,55 @@ public class IoTDBAggregationIT { fail(e.getMessage()); } } + + @Test + public void minByTest() { + String[] retArray = new String[] {"0,500", "0,500", "0,500"}; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + int cnt; + try (ResultSet resultSet = + statement.executeQuery( + "SELECT min_by(time, s0) " + + "FROM root.vehicle.d0 WHERE time >= 100 AND time < 9000")) { + cnt = 0; + while (resultSet.next()) { + String ans = + resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(minBy("Time", d0s0)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(1, cnt); + } + + try (ResultSet resultSet = + statement.executeQuery("SELECT min_by(time,s0) FROM root.vehicle.d0 WHERE time < 2500")) { + while (resultSet.next()) { + String ans = + resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(minBy("Time", d0s0)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(2, cnt); + } + + // keep the correctness of `order by time desc` + cnt = 0; + try (ResultSet resultSet = + statement.executeQuery( + "SELECT min_by(time,s0) FROM root.vehicle.d0 WHERE time >= 100 AND time < 9000 order by time desc")) { + while (resultSet.next()) { + String ans = + resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(minBy("Time", d0s0)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(1, cnt); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java index b73f35bbf1e..5b706720b0c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java @@ -96,7 +96,6 @@ public class IoTDBMaxByIT { "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 4, 4, 4, 4, true, \"1\")", "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 1, 1, false, \"1\")", @@ -107,7 +106,7 @@ public class IoTDBMaxByIT { "flush" }; - protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy:"; + protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy/MinBy:"; @BeforeClass public static void setUp() throws Exception { diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy2IT.java new file mode 100644 index 00000000000..9efa6ff1906 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy2IT.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.aggregation.minby; + +import org.apache.iotdb.it.env.EnvFactory; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareData; + +public class IoTDBMinBy2IT extends IoTDBMinByIT { + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().getConfig().getCommonConfig().setDegreeOfParallelism(4); + EnvFactory.getEnv().initClusterEnvironment(); + prepareData(NON_ALIGNED_DATASET); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy3IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy3IT.java new file mode 100644 index 00000000000..e4c6523ac4d --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy3IT.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.aggregation.minby; + +import org.apache.iotdb.it.env.EnvFactory; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareData; + +public class IoTDBMinBy3IT extends IoTDBMinByIT { + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setEnableSeqSpaceCompaction(false) + .setEnableUnseqSpaceCompaction(false) + .setEnableCrossSpaceCompaction(false) + .setMaxTsBlockLineNumber(1) + .setMaxNumberOfPointsInPage(1); + EnvFactory.getEnv().initClusterEnvironment(); + prepareData(NON_ALIGNED_DATASET); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByAlignedSeriesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByAlignedSeriesIT.java new file mode 100644 index 00000000000..56f7b5ecb09 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByAlignedSeriesIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.aggregation.minby; + +import org.apache.iotdb.it.env.EnvFactory; + +import org.junit.BeforeClass; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareData; + +public class IoTDBMinByAlignedSeriesIT extends IoTDBMinByIT { + protected static final String[] ALIGNED_DATASET = + new String[] { + // x input + "CREATE ALIGNED TIMESERIES root.db.d1(x1 INT32, x2 INT64, x3 FLOAT, x4 DOUBLE, x5 BOOLEAN, x6 TEXT)", + // y input + "CREATE ALIGNED TIMESERIES root.db.d1(y1 INT32, y2 INT64, y3 FLOAT, y4 DOUBLE, y5 BOOLEAN, y6 TEXT)", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 3, 3, 3, 3, true, \"4\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 2, 2, 2, 2, false, \"3\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 1, 1, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(12, 0, 0, 0, 0, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 4, 4, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(13, 0, 0, 0, 0, false, \"4\")", + "flush", + // For Align By Device + "CREATE ALIGNED TIMESERIES root.db.d2(x1 INT32, x2 INT64, x3 FLOAT, x4 DOUBLE, x5 BOOLEAN, x6 TEXT)", + "CREATE ALIGNED TIMESERIES root.db.d2(y1 INT32, y2 INT64, y3 FLOAT, y4 DOUBLE, y5 BOOLEAN, y6 TEXT)", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 3, 3, 3, 3, true, \"4\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 2, 2, 2, 2, false, \"3\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 1, 1, false, \"4\")", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 0, 0, 0, 0, false, \"4\")", + "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 4, 4, false, \"4\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 0, 0, 0, 0, false, \"4\")", + }; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000); + EnvFactory.getEnv().initClusterEnvironment(); + prepareData(ALIGNED_DATASET); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByIT.java similarity index 78% copy from integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java copy to integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByIT.java index b73f35bbf1e..f33236839d9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByIT.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.it.aggregation.maxby; +package org.apache.iotdb.db.it.aggregation.minby; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; @@ -41,13 +41,13 @@ import java.util.Map; import static org.apache.iotdb.db.it.utils.TestUtils.prepareData; import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest; import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR; -import static org.apache.iotdb.db.utils.constant.TestConstant.maxBy; +import static org.apache.iotdb.db.utils.constant.TestConstant.minBy; import static org.apache.iotdb.itbase.constant.TestConstant.DEVICE; import static org.junit.Assert.fail; @RunWith(IoTDBTestRunner.class) @Category({LocalStandaloneIT.class, ClusterIT.class}) -public class IoTDBMaxByIT { +public class IoTDBMinByIT { protected static final String[] NON_ALIGNED_DATASET = new String[] { "CREATE DATABASE root.db", @@ -68,9 +68,9 @@ public class IoTDBMaxByIT { "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", - "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 4, 4, false, \"4\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 3, 3, 3, 3, true, \"4\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 2, 2, 2, 2, false, \"3\")", + "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 1, 1, false, \"4\")", "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 3, 3, false, \"3\")", "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 8, 8, false, \"4\")", "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 3, 3, false, \"3\")", @@ -96,18 +96,17 @@ public class IoTDBMaxByIT { "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 1, 1, true, \"1\")", "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 2, 2, false, \"2\")", "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 4, 4, 4, 4, true, \"1\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 2, 2, true, \"4\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 3, 3, false, \"3\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 3, 3, 3, 3, true, \"4\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 2, 2, 2, 2, false, \"3\")", "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 1, 1, false, \"1\")", "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 3, 3, false, \"3\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 9, 9, 9, 9, false, \"1\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 0, 0, 0, 0, false, \"1\")", "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 4, 4, false, \"4\")", - "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 9, 9, 9, 9, false, \"1\")", + "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 0, 0, 0, 0, false, \"1\")", "flush" }; - protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy:"; + protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy/MinBy:"; @BeforeClass public static void setUp() throws Exception { @@ -127,7 +126,7 @@ public class IoTDBMaxByIT { Statement statement = connection.createStatement()) { try { try (ResultSet resultSet = - statement.executeQuery("SELECT max_by(x1, y5) FROM root.db.d1")) { + statement.executeQuery("SELECT min_by(x1, y5) FROM root.db.d1")) { resultSet.next(); fail(); } @@ -136,7 +135,7 @@ public class IoTDBMaxByIT { } try { try (ResultSet resultSet = - statement.executeQuery("SELECT max_by(x1, y6) FROM root.db.d1")) { + statement.executeQuery("SELECT min_by(x1, y6) FROM root.db.d1")) { resultSet.next(); fail(); } @@ -145,7 +144,7 @@ public class IoTDBMaxByIT { } try { try (ResultSet resultSet = - statement.executeQuery("SELECT max_by(x5, y5) FROM root.db.d1")) { + statement.executeQuery("SELECT min_by(x5, y5) FROM root.db.d1")) { resultSet.next(); fail(); } @@ -154,7 +153,7 @@ public class IoTDBMaxByIT { } try { try (ResultSet resultSet = - statement.executeQuery("SELECT max_by(x5, y6) FROM root.db.d1")) { + statement.executeQuery("SELECT min_by(x5, y6) FROM root.db.d1")) { resultSet.next(); fail(); } @@ -163,7 +162,7 @@ public class IoTDBMaxByIT { } try { try (ResultSet resultSet = - statement.executeQuery("SELECT max_by(x6, y5) FROM root.db.d1")) { + statement.executeQuery("SELECT min_by(x6, y5) FROM root.db.d1")) { resultSet.next(); fail(); } @@ -172,7 +171,7 @@ public class IoTDBMaxByIT { } try { try (ResultSet resultSet = - statement.executeQuery("SELECT max_by(x6, y6) FROM root.db.d1")) { + statement.executeQuery("SELECT min_by(x6, y6) FROM root.db.d1")) { resultSet.next(); fail(); } @@ -199,7 +198,7 @@ public class IoTDBMaxByIT { String y = expectedHeader.getKey(); resultSetEqualTest( String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 where time <= 3", + "select min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s) from root.db.d1 where time <= 3", y, y, y, y, y, y), expectedHeader.getValue(), retArray); @@ -224,7 +223,7 @@ public class IoTDBMaxByIT { String y = expectedHeader.getKey(); resultSetEqualTest( String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 where time <= 4", + "select min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s) from root.db.d1 where time <= 4", y, y, y, y, y, y), expectedHeader.getValue(), retArray); @@ -249,7 +248,7 @@ public class IoTDBMaxByIT { String y = expectedHeader.getKey(); resultSetEqualTest( String.format( - "select max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s) from root.db.d1 where time <= 3", + "select min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s) from root.db.d1 where time <= 3", y, y, y, y, y, y), expectedHeader.getValue(), retArray); @@ -259,7 +258,7 @@ public class IoTDBMaxByIT { String y = expectedHeader.getKey(); resultSetEqualTest( String.format( - "select max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s) from root.db.d1 where time <= 4", + "select min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s) from root.db.d1 where time <= 4", y, y, y, y, y, y), expectedHeader.getValue(), retArray1); @@ -276,18 +275,18 @@ public class IoTDBMaxByIT { Statement statement = connection.createStatement()) { String[] expectedHeader = new String[] { - "max_by(root.db.d1.x1 + 1 - 3, -cos(sin(root.db.d1.y2 / 10)))", - "max_by(root.db.d1.x2 * 2 / 3, -cos(sin(root.db.d1.y2 / 10)))", - "max_by(floor(root.db.d1.x3), -cos(sin(root.db.d1.y2 / 10)))", - "max_by(ceil(root.db.d1.x4), -cos(sin(root.db.d1.y2 / 10)))", - "max_by(root.db.d1.x5, -cos(sin(root.db.d1.y2 / 10)))", - "max_by(REPLACE(root.db.d1.x6, '3', '4'), -cos(sin(root.db.d1.y2 / 10)))", + "min_by(root.db.d1.x1 + 1 - 3, -cos(sin(root.db.d1.y2 / 10)))", + "min_by(root.db.d1.x2 * 2 / 3, -cos(sin(root.db.d1.y2 / 10)))", + "min_by(floor(root.db.d1.x3), -cos(sin(root.db.d1.y2 / 10)))", + "min_by(ceil(root.db.d1.x4), -cos(sin(root.db.d1.y2 / 10)))", + "min_by(root.db.d1.x5, -cos(sin(root.db.d1.y2 / 10)))", + "min_by(REPLACE(root.db.d1.x6, '3', '4'), -cos(sin(root.db.d1.y2 / 10)))", }; String[] retArray = new String[] {"1.0,2.0,3.0,3.0,false,4,"}; String y = "-cos(sin(y2 / 10))"; resultSetEqualTest( String.format( - "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, '3', '4'),%s) from root.db.d1 where time <= 3", + "select min_by(x1 + 1 - 3,%s),min_by(x2 * 2 / 3,%s),min_by(floor(x3),%s),min_by(ceil(x4),%s),min_by(x5,%s),min_by(replace(x6, '3', '4'),%s) from root.db.d1 where time <= 3", y, y, y, y, y, y), expectedHeader, retArray); @@ -304,12 +303,12 @@ public class IoTDBMaxByIT { String[] expectedHeader = new String[] { DEVICE, - "max_by(x1 + 1 - 3, -cos(sin(y2 / 10)))", - "max_by(x2 * 2 / 3, -cos(sin(y2 / 10)))", - "max_by(floor(x3), -cos(sin(y2 / 10)))", - "max_by(ceil(x4), -cos(sin(y2 / 10)))", - "max_by(x5, -cos(sin(y2 / 10)))", - "max_by(REPLACE(x6, '3', '4'), -cos(sin(y2 / 10)))", + "min_by(x1 + 1 - 3, -cos(sin(y2 / 10)))", + "min_by(x2 * 2 / 3, -cos(sin(y2 / 10)))", + "min_by(floor(x3), -cos(sin(y2 / 10)))", + "min_by(ceil(x4), -cos(sin(y2 / 10)))", + "min_by(x5, -cos(sin(y2 / 10)))", + "min_by(REPLACE(x6, '3', '4'), -cos(sin(y2 / 10)))", }; String[] retArray = new String[] { @@ -318,7 +317,7 @@ public class IoTDBMaxByIT { String y = "-cos(sin(y2 / 10))"; resultSetEqualTest( String.format( - "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, '3', '4'),%s) from root.db.** where time <= 3 align by device", + "select min_by(x1 + 1 - 3,%s),min_by(x2 * 2 / 3,%s),min_by(floor(x3),%s),min_by(ceil(x4),%s),min_by(x5,%s),min_by(replace(x6, '3', '4'),%s) from root.db.** where time <= 3 align by device", y, y, y, y, y, y), expectedHeader, retArray); @@ -335,12 +334,12 @@ public class IoTDBMaxByIT { String[] expectedHeader = new String[] { TIMESTAMP_STR, - "max_by(root.db.d1.x1, root.db.d1.y2)", - "max_by(root.db.d1.x2, root.db.d1.y2)", - "max_by(root.db.d1.x3, root.db.d1.y2)", - "max_by(root.db.d1.x4, root.db.d1.y2)", - "max_by(root.db.d1.x5, root.db.d1.y2)", - "max_by(root.db.d1.x6, root.db.d1.y2)", + "min_by(root.db.d1.x1, root.db.d1.y2)", + "min_by(root.db.d1.x2, root.db.d1.y2)", + "min_by(root.db.d1.x3, root.db.d1.y2)", + "min_by(root.db.d1.x4, root.db.d1.y2)", + "min_by(root.db.d1.x5, root.db.d1.y2)", + "min_by(root.db.d1.x6, root.db.d1.y2)", }; String y = "y2"; // order by time ASC @@ -350,7 +349,7 @@ public class IoTDBMaxByIT { }; resultSetEqualTest( String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 where time <= 10 group by ([0,9),4ms) ", + "select min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s) from root.db.d1 where time <= 10 group by ([0,9),4ms) ", y, y, y, y, y, y), expectedHeader, retArray1); @@ -365,7 +364,7 @@ public class IoTDBMaxByIT { }; resultSetEqualTest( String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,14),4ms) order by time desc", + "select min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s) from root.db.d1 group by ([0,14),4ms) order by time desc", y, y, y, y, y, y), expectedHeader, retArray2); @@ -392,16 +391,16 @@ public class IoTDBMaxByIT { String[] expectedHeader = new String[] { TIMESTAMP_STR, - String.format("max_by(root.db.d1.x1, root.db.d1.%s)", y), - String.format("max_by(root.db.d1.x2, root.db.d1.%s)", y), - String.format("max_by(root.db.d1.x3, root.db.d1.%s)", y), - String.format("max_by(root.db.d1.x4, root.db.d1.%s)", y), - String.format("max_by(root.db.d1.x5, root.db.d1.%s)", y), - String.format("max_by(root.db.d1.x6, root.db.d1.%s)", y), + String.format("min_by(root.db.d1.x1, root.db.d1.%s)", y), + String.format("min_by(root.db.d1.x2, root.db.d1.%s)", y), + String.format("min_by(root.db.d1.x3, root.db.d1.%s)", y), + String.format("min_by(root.db.d1.x4, root.db.d1.%s)", y), + String.format("min_by(root.db.d1.x5, root.db.d1.%s)", y), + String.format("min_by(root.db.d1.x6, root.db.d1.%s)", y), }; resultSetEqualTest( String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 where time <= 10 group by ([0,9),4ms,2ms) ", + "select min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s) from root.db.d1 where time <= 10 group by ([0,9),4ms,2ms) ", y, y, y, y, y, y), expectedHeader, retArray); @@ -419,18 +418,18 @@ public class IoTDBMaxByIT { String[] expectedHeader = new String[] { TIMESTAMP_STR, - "max_by(root.db.d1.x1, root.db.d1.y2)", - "max_by(root.db.d1.x2, root.db.d1.y2)", - "max_by(root.db.d1.x3, root.db.d1.y2)", - "max_by(root.db.d1.x4, root.db.d1.y2)", - "max_by(root.db.d1.x5, root.db.d1.y2)", - "max_by(root.db.d1.x6, root.db.d1.y2)", + "min_by(root.db.d1.x1, root.db.d1.y2)", + "min_by(root.db.d1.x2, root.db.d1.y2)", + "min_by(root.db.d1.x3, root.db.d1.y2)", + "min_by(root.db.d1.x4, root.db.d1.y2)", + "min_by(root.db.d1.x5, root.db.d1.y2)", + "min_by(root.db.d1.x6, root.db.d1.y2)", }; String[] retArray = new String[] {"8,3,3,3.0,3.0,false,3,"}; String y = "y2"; resultSetEqualTest( String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.d1 group by ([0,9),4ms) having max_by(time, %s) > 4", + "select min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s) from root.db.d1 group by ([0,9),4ms) having min_by(time, %s) > 4", y, y, y, y, y, y, y), expectedHeader, retArray); @@ -449,16 +448,16 @@ public class IoTDBMaxByIT { for (String y : yArray) { String[] expectedHeader = new String[] { - String.format("max_by(root.*.*.x1, root.*.*.%s)", y), - String.format("max_by(root.*.*.x2, root.*.*.%s)", y), - String.format("max_by(root.*.*.x3, root.*.*.%s)", y), - String.format("max_by(root.*.*.x4, root.*.*.%s)", y), - String.format("max_by(root.*.*.x5, root.*.*.%s)", y), - String.format("max_by(root.*.*.x6, root.*.*.%s)", y), + String.format("min_by(root.*.*.x1, root.*.*.%s)", y), + String.format("min_by(root.*.*.x2, root.*.*.%s)", y), + String.format("min_by(root.*.*.x3, root.*.*.%s)", y), + String.format("min_by(root.*.*.x4, root.*.*.%s)", y), + String.format("min_by(root.*.*.x5, root.*.*.%s)", y), + String.format("min_by(root.*.*.x6, root.*.*.%s)", y), }; resultSetEqualTest( String.format( - "select max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s) from root.db.** group by level = 0", + "select min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s) from root.db.** group by level = 0", y, y, y, y, y, y), expectedHeader, retArray); @@ -479,7 +478,7 @@ public class IoTDBMaxByIT { res.put( y, Arrays.stream(xInput) - .map(x -> maxBy("Time".equals(x) ? x : device + "." + x, device + "." + y)) + .map(x -> minBy("Time".equals(x) ? x : device + "." + x, device + "." + y)) .toArray(String[]::new)); }); return res; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java index 4f193ceff0d..7eb12390d21 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java @@ -188,7 +188,8 @@ public class ExportTsFile extends AbstractTsFileTool { || sqlLower.contains("variance(") || sqlLower.contains("var_pop(") || sqlLower.contains("var_samp(") - || sqlLower.contains("max_by(")) { + || sqlLower.contains("max_by(") + || sqlLower.contains("min_by(")) { ioTPrinter.println("The sql you entered is invalid, please don't use aggregate query."); System.exit(CODE_ERROR); } diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java index e20a007f096..3de83632122 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java @@ -224,6 +224,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { "LATEST", "LIKE", "MAX_BY", + "MIN_BY", "METADATA", "MERGE", "MOVE", diff --git a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java index f5305180e95..363750f2c81 100644 --- a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java +++ b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java @@ -203,6 +203,7 @@ public class IoTDBDynamicTableFactory || sqlLower.contains("max_time(") || sqlLower.contains("min_time(") || sqlLower.contains("max_by(") + || sqlLower.contains("min_by(") || sqlLower.contains("stddev(") || sqlLower.contains("stddev_pop(") || sqlLower.contains("stddev_samp(") diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java index 99c8f3fe9a2..1f8a8e8154b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java @@ -67,6 +67,7 @@ public class AccumulatorFactory { public static boolean isMultiInputAggregation(TAggregationType aggregationType) { switch (aggregationType) { case MAX_BY: + case MIN_BY: return true; default: return false; @@ -79,6 +80,9 @@ public class AccumulatorFactory { case MAX_BY: checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size."); return new MaxByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); + case MIN_BY: + checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size."); + return new MinByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java index b2226238016..b5d1ee92da5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -19,410 +19,30 @@ package org.apache.iotdb.db.queryengine.execution.aggregation; -import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.Column; -import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; -import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.utils.BitMap; -import org.apache.iotdb.tsfile.utils.BytesUtils; -import org.apache.iotdb.tsfile.utils.TsPrimitiveType; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Collections; - -import static com.google.common.base.Preconditions.checkArgument; - -/** max(x,y) returns the value of x associated with the maximum value of y over all input values. */ -public class MaxByAccumulator implements Accumulator { - - private final TSDataType xDataType; - - private final TSDataType yDataType; - - private final TsPrimitiveType yMaxValue; - - private final TsPrimitiveType xResult; - - private boolean xNull = true; - - private boolean initResult; - - private long yTimeStamp = Long.MAX_VALUE; - - private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy: %s"; - - public MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) { - this.xDataType = xDataType; - this.yDataType = yDataType; - this.xResult = TsPrimitiveType.getByType(xDataType); - this.yMaxValue = TsPrimitiveType.getByType(yDataType); - } - - // Column should be like: | Time | x | y | - @Override - public void addInput(Column[] column, BitMap bitMap) { - checkArgument(column.length == 3, "Length of input Column[] for MaxBy should be 3"); - switch (yDataType) { - case INT32: - addIntInput(column, bitMap); - return; - case INT64: - addLongInput(column, bitMap); - return; - case FLOAT: - addFloatInput(column, bitMap); - return; - case DOUBLE: - addDoubleInput(column, bitMap); - return; - case TEXT: - case BOOLEAN: - default: - throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, yDataType)); - } - } - - // partialResult should be like: | partialMaxByBinary | - @Override - public void addIntermediate(Column[] partialResult) { - checkArgument(partialResult.length == 1, "partialResult of MaxBy should be 1"); - // Return if y is null. - if (partialResult[0].isNull(0)) { - return; - } - byte[] bytes = partialResult[0].getBinary(0).getValues(); - updateFromBytesIntermediateInput(bytes); - } - - @Override - public void addStatistics(Statistics statistics) { - throw new UnsupportedOperationException(getClass().getName()); - } - - // finalResult should be single column, like: | finalXValue | - @Override - public void setFinal(Column finalResult) { - if (finalResult.isNull(0)) { - return; - } - initResult = true; - updateX(finalResult, 0); - } - - // columnBuilders should be like | TextIntermediateColumnBuilder | - @Override - public void outputIntermediate(ColumnBuilder[] columnBuilders) { - checkArgument(columnBuilders.length == 1, "partialResult of MaxValue should be 1"); - if (!initResult) { - columnBuilders[0].appendNull(); - return; - } - columnBuilders[0].writeBinary(new Binary(serialize())); - } - - @Override - public void outputFinal(ColumnBuilder columnBuilder) { - if (!initResult) { - columnBuilder.appendNull(); - return; - } - writeX(columnBuilder); +public class MaxByAccumulator extends MaxMinByBaseAccumulator { + protected MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) { + super(xDataType, yDataType); } @Override - public void reset() { - initResult = false; - xNull = true; - this.xResult.reset(); - this.yMaxValue.reset(); - yTimeStamp = Long.MAX_VALUE; + protected boolean check(int yValue, int yExtremeValue) { + return yValue > yExtremeValue; } @Override - public boolean hasFinalResult() { - return false; + protected boolean check(long yValue, long yExtremeValue) { + return yValue > yExtremeValue; } @Override - public TSDataType[] getIntermediateType() { - return new TSDataType[] {TSDataType.TEXT}; + protected boolean check(float yValue, float yExtremeValue) { + return yValue > yExtremeValue; } @Override - public TSDataType getFinalType() { - return xDataType; - } - - private void addIntInput(Column[] column, BitMap bitMap) { - int count = column[0].getPositionCount(); - for (int i = 0; i < count; i++) { - if (bitMap != null && !bitMap.isMarked(i)) { - continue; - } - if (!column[2].isNull(i)) { - updateIntResult(column[0].getLong(i), column[2].getInt(i), column[1], i); - } - } - } - - private void updateIntResult(long time, int yMaxVal, Column xColumn, int xIndex) { - if (!initResult - || yMaxVal > yMaxValue.getInt() - || (yMaxVal == yMaxValue.getInt() && time < yTimeStamp)) { - initResult = true; - yTimeStamp = time; - yMaxValue.setInt(yMaxVal); - updateX(xColumn, xIndex); - } - } - - private void addLongInput(Column[] column, BitMap bitMap) { - int count = column[0].getPositionCount(); - for (int i = 0; i < count; i++) { - if (bitMap != null && !bitMap.isMarked(i)) { - continue; - } - if (!column[2].isNull(i)) { - updateLongResult(column[0].getLong(i), column[2].getLong(i), column[1], i); - } - } - } - - private void updateLongResult(long time, long yMaxVal, Column xColumn, int xIndex) { - if (!initResult - || yMaxVal > yMaxValue.getLong() - || (yMaxVal == yMaxValue.getLong() && time < yTimeStamp)) { - initResult = true; - yTimeStamp = time; - yMaxValue.setLong(yMaxVal); - updateX(xColumn, xIndex); - } - } - - private void addFloatInput(Column[] column, BitMap bitMap) { - int count = column[0].getPositionCount(); - for (int i = 0; i < count; i++) { - if (bitMap != null && !bitMap.isMarked(i)) { - continue; - } - if (!column[2].isNull(i)) { - updateFloatResult(column[0].getLong(i), column[2].getFloat(i), column[1], i); - } - } - } - - private void updateFloatResult(long time, float yMaxVal, Column xColumn, int xIndex) { - if (!initResult - || yMaxVal > yMaxValue.getFloat() - || (yMaxVal == yMaxValue.getFloat() && time < yTimeStamp)) { - initResult = true; - yTimeStamp = time; - yMaxValue.setFloat(yMaxVal); - updateX(xColumn, xIndex); - } - } - - private void addDoubleInput(Column[] column, BitMap bitMap) { - int count = column[0].getPositionCount(); - for (int i = 0; i < count; i++) { - if (bitMap != null && !bitMap.isMarked(i)) { - continue; - } - if (!column[2].isNull(i)) { - updateDoubleResult(column[0].getLong(i), column[2].getDouble(i), column[1], i); - } - } - } - - private void updateDoubleResult(long time, double yMaxVal, Column xColumn, int xIndex) { - if (!initResult - || yMaxVal > yMaxValue.getDouble() - || (yMaxVal == yMaxValue.getDouble() && time < yTimeStamp)) { - initResult = true; - yTimeStamp = time; - yMaxValue.setDouble(yMaxVal); - updateX(xColumn, xIndex); - } - } - - private void writeX(ColumnBuilder columnBuilder) { - if (xNull) { - columnBuilder.appendNull(); - return; - } - switch (xDataType) { - case INT32: - columnBuilder.writeInt(xResult.getInt()); - break; - case INT64: - columnBuilder.writeLong(xResult.getLong()); - break; - case FLOAT: - columnBuilder.writeFloat(xResult.getFloat()); - break; - case DOUBLE: - columnBuilder.writeDouble(xResult.getDouble()); - break; - case TEXT: - columnBuilder.writeBinary(xResult.getBinary()); - break; - case BOOLEAN: - columnBuilder.writeBoolean(xResult.getBoolean()); - break; - default: - throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType)); - } - } - - private void updateX(Column xColumn, int xIndex) { - if (xColumn.isNull(xIndex)) { - xNull = true; - } else { - xNull = false; - switch (xDataType) { - case INT32: - xResult.setInt(xColumn.getInt(xIndex)); - break; - case INT64: - xResult.setLong(xColumn.getLong(xIndex)); - break; - case FLOAT: - xResult.setFloat(xColumn.getFloat(xIndex)); - break; - case DOUBLE: - xResult.setDouble(xColumn.getDouble(xIndex)); - break; - case TEXT: - xResult.setBinary(xColumn.getBinary(xIndex)); - break; - case BOOLEAN: - xResult.setBoolean(xColumn.getBoolean(xIndex)); - break; - default: - throw new UnSupportedDataTypeException( - String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType)); - } - } - } - - private byte[] serialize() { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - try { - dataOutputStream.writeLong(yTimeStamp); - writeIntermediateToStream(yDataType, yMaxValue, dataOutputStream); - dataOutputStream.writeBoolean(xNull); - if (!xNull) { - writeIntermediateToStream(xDataType, xResult, dataOutputStream); - } - } catch (IOException e) { - throw new UnsupportedOperationException( - "Failed to serialize intermediate result for MaxByAccumulator.", e); - } - return byteArrayOutputStream.toByteArray(); - } - - private void writeIntermediateToStream( - TSDataType dataType, TsPrimitiveType value, DataOutputStream dataOutputStream) - throws IOException { - switch (dataType) { - case INT32: - dataOutputStream.writeInt(value.getInt()); - break; - case INT64: - dataOutputStream.writeLong(value.getLong()); - break; - case FLOAT: - dataOutputStream.writeFloat(value.getFloat()); - break; - case DOUBLE: - dataOutputStream.writeDouble(value.getDouble()); - break; - case TEXT: - dataOutputStream.writeBytes(value.getBinary().toString()); - break; - case BOOLEAN: - dataOutputStream.writeBoolean(value.getBoolean()); - break; - default: - throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, dataType)); - } - } - - private void updateFromBytesIntermediateInput(byte[] bytes) { - long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0); - int offset = Long.BYTES; - // Use Column to store x value - TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(xDataType)); - ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0]; - switch (yDataType) { - case INT32: - int intMaxVal = BytesUtils.bytesToInt(bytes, offset); - offset += Integer.BYTES; - readXFromBytesIntermediateInput(bytes, offset, columnBuilder); - updateIntResult(time, intMaxVal, columnBuilder.build(), 0); - break; - case INT64: - long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, offset); - offset += Long.BYTES; - readXFromBytesIntermediateInput(bytes, offset, columnBuilder); - updateLongResult(time, longMaxVal, columnBuilder.build(), 0); - break; - case FLOAT: - float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset); - offset += Float.BYTES; - readXFromBytesIntermediateInput(bytes, offset, columnBuilder); - updateFloatResult(time, floatMaxVal, columnBuilder.build(), 0); - break; - case DOUBLE: - double doubleMaxVal = BytesUtils.bytesToDouble(bytes, offset); - offset += Long.BYTES; - readXFromBytesIntermediateInput(bytes, offset, columnBuilder); - updateDoubleResult(time, doubleMaxVal, columnBuilder.build(), 0); - break; - case TEXT: - case BOOLEAN: - default: - throw new UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, yDataType)); - } - } - - private void readXFromBytesIntermediateInput( - byte[] bytes, int offset, ColumnBuilder columnBuilder) { - boolean isXNull = BytesUtils.bytesToBool(bytes, offset); - offset += 1; - if (isXNull) { - columnBuilder.appendNull(); - } else { - switch (xDataType) { - case INT32: - columnBuilder.writeInt(BytesUtils.bytesToInt(bytes, offset)); - break; - case INT64: - columnBuilder.writeLong(BytesUtils.bytesToLongFromOffset(bytes, 8, offset)); - break; - case FLOAT: - columnBuilder.writeFloat(BytesUtils.bytesToFloat(bytes, offset)); - break; - case DOUBLE: - columnBuilder.writeDouble(BytesUtils.bytesToDouble(bytes, offset)); - break; - case TEXT: - columnBuilder.writeBinary( - new Binary(BytesUtils.subBytes(bytes, offset, bytes.length - offset))); - break; - case BOOLEAN: - columnBuilder.writeBoolean(BytesUtils.bytesToBool(bytes, offset)); - break; - default: - throw new UnSupportedDataTypeException( - String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType)); - } - } + protected boolean check(double yValue, double yExtremeValue) { + return yValue > yExtremeValue; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxMinByBaseAccumulator.java similarity index 86% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxMinByBaseAccumulator.java index b2226238016..a4ea49be634 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxMinByBaseAccumulator.java @@ -38,13 +38,13 @@ import java.util.Collections; import static com.google.common.base.Preconditions.checkArgument; /** max(x,y) returns the value of x associated with the maximum value of y over all input values. */ -public class MaxByAccumulator implements Accumulator { +public abstract class MaxMinByBaseAccumulator implements Accumulator { private final TSDataType xDataType; private final TSDataType yDataType; - private final TsPrimitiveType yMaxValue; + private final TsPrimitiveType yExtremeValue; private final TsPrimitiveType xResult; @@ -54,19 +54,19 @@ public class MaxByAccumulator implements Accumulator { private long yTimeStamp = Long.MAX_VALUE; - private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy: %s"; + private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data type in MaxBy/MinBy: %s"; - public MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) { + protected MaxMinByBaseAccumulator(TSDataType xDataType, TSDataType yDataType) { this.xDataType = xDataType; this.yDataType = yDataType; this.xResult = TsPrimitiveType.getByType(xDataType); - this.yMaxValue = TsPrimitiveType.getByType(yDataType); + this.yExtremeValue = TsPrimitiveType.getByType(yDataType); } // Column should be like: | Time | x | y | @Override public void addInput(Column[] column, BitMap bitMap) { - checkArgument(column.length == 3, "Length of input Column[] for MaxBy should be 3"); + checkArgument(column.length == 3, "Length of input Column[] for MaxBy/MinBy should be 3"); switch (yDataType) { case INT32: addIntInput(column, bitMap); @@ -90,7 +90,7 @@ public class MaxByAccumulator implements Accumulator { // partialResult should be like: | partialMaxByBinary | @Override public void addIntermediate(Column[] partialResult) { - checkArgument(partialResult.length == 1, "partialResult of MaxBy should be 1"); + checkArgument(partialResult.length == 1, "partialResult of MaxBy/MinBy should be 1"); // Return if y is null. if (partialResult[0].isNull(0)) { return; @@ -139,7 +139,7 @@ public class MaxByAccumulator implements Accumulator { initResult = false; xNull = true; this.xResult.reset(); - this.yMaxValue.reset(); + this.yExtremeValue.reset(); yTimeStamp = Long.MAX_VALUE; } @@ -170,13 +170,13 @@ public class MaxByAccumulator implements Accumulator { } } - private void updateIntResult(long time, int yMaxVal, Column xColumn, int xIndex) { + private void updateIntResult(long time, int yValue, Column xColumn, int xIndex) { if (!initResult - || yMaxVal > yMaxValue.getInt() - || (yMaxVal == yMaxValue.getInt() && time < yTimeStamp)) { + || check(yValue, yExtremeValue.getInt()) + || (yValue == yExtremeValue.getInt() && time < yTimeStamp)) { initResult = true; yTimeStamp = time; - yMaxValue.setInt(yMaxVal); + yExtremeValue.setInt(yValue); updateX(xColumn, xIndex); } } @@ -193,13 +193,13 @@ public class MaxByAccumulator implements Accumulator { } } - private void updateLongResult(long time, long yMaxVal, Column xColumn, int xIndex) { + private void updateLongResult(long time, long yValue, Column xColumn, int xIndex) { if (!initResult - || yMaxVal > yMaxValue.getLong() - || (yMaxVal == yMaxValue.getLong() && time < yTimeStamp)) { + || check(yValue, yExtremeValue.getLong()) + || (yValue == yExtremeValue.getLong() && time < yTimeStamp)) { initResult = true; yTimeStamp = time; - yMaxValue.setLong(yMaxVal); + yExtremeValue.setLong(yValue); updateX(xColumn, xIndex); } } @@ -216,13 +216,13 @@ public class MaxByAccumulator implements Accumulator { } } - private void updateFloatResult(long time, float yMaxVal, Column xColumn, int xIndex) { + private void updateFloatResult(long time, float yValue, Column xColumn, int xIndex) { if (!initResult - || yMaxVal > yMaxValue.getFloat() - || (yMaxVal == yMaxValue.getFloat() && time < yTimeStamp)) { + || check(yValue, yExtremeValue.getFloat()) + || (yValue == yExtremeValue.getFloat() && time < yTimeStamp)) { initResult = true; yTimeStamp = time; - yMaxValue.setFloat(yMaxVal); + yExtremeValue.setFloat(yValue); updateX(xColumn, xIndex); } } @@ -239,13 +239,13 @@ public class MaxByAccumulator implements Accumulator { } } - private void updateDoubleResult(long time, double yMaxVal, Column xColumn, int xIndex) { + private void updateDoubleResult(long time, double yValue, Column xColumn, int xIndex) { if (!initResult - || yMaxVal > yMaxValue.getDouble() - || (yMaxVal == yMaxValue.getDouble() && time < yTimeStamp)) { + || check(yValue, yExtremeValue.getDouble()) + || (yValue == yExtremeValue.getDouble() && time < yTimeStamp)) { initResult = true; yTimeStamp = time; - yMaxValue.setDouble(yMaxVal); + yExtremeValue.setDouble(yValue); updateX(xColumn, xIndex); } } @@ -315,7 +315,7 @@ public class MaxByAccumulator implements Accumulator { DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); try { dataOutputStream.writeLong(yTimeStamp); - writeIntermediateToStream(yDataType, yMaxValue, dataOutputStream); + writeIntermediateToStream(yDataType, yExtremeValue, dataOutputStream); dataOutputStream.writeBoolean(xNull); if (!xNull) { writeIntermediateToStream(xDataType, xResult, dataOutputStream); @@ -425,4 +425,17 @@ public class MaxByAccumulator implements Accumulator { } } } + + /** + * @param yValue Input y. + * @param yExtremeValue Current extreme value of y. + * @return True if yValue is the new extreme value. + */ + protected abstract boolean check(int yValue, int yExtremeValue); + + protected abstract boolean check(long yValue, long yExtremeValue); + + protected abstract boolean check(float yValue, float yExtremeValue); + + protected abstract boolean check(double yValue, double yExtremeValue); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinByAccumulator.java new file mode 100644 index 00000000000..0571efc6a34 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinByAccumulator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.aggregation; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +public class MinByAccumulator extends MaxMinByBaseAccumulator { + protected MinByAccumulator(TSDataType xDataType, TSDataType yDataType) { + super(xDataType, yDataType); + } + + @Override + protected boolean check(int yValue, int yExtremeValue) { + return yValue < yExtremeValue; + } + + @Override + protected boolean check(long yValue, long yExtremeValue) { + return yValue < yExtremeValue; + } + + @Override + protected boolean check(float yValue, float yExtremeValue) { + return yValue < yExtremeValue; + } + + @Override + protected boolean check(double yValue, double yExtremeValue) { + return yValue < yExtremeValue; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java index 5d958cfbe25..2293c3b38ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java @@ -51,6 +51,9 @@ public class SlidingWindowAggregatorFactory { private static final Map<TSDataType, Comparator<Column>> maxByComparators = new EnumMap<>(TSDataType.class); + private static final Map<TSDataType, Comparator<Column>> minByComparators = + new EnumMap<>(TSDataType.class); + static { // return a value greater than 0 if o1 is numerically greater than o2 maxComparators.put(TSDataType.INT32, Comparator.comparingInt(o -> o.getInt(0))); @@ -137,6 +140,34 @@ public class SlidingWindowAggregatorFactory { TSDataType.DOUBLE, Comparator.comparingDouble( o -> BytesUtils.bytesToDouble(o.getBinary(0).getValues(), Long.BYTES))); + + // return a value greater than 0 if o1 is numerically less than o2 + minByComparators.put( + TSDataType.INT32, + (o1, o2) -> + Integer.compare( + BytesUtils.bytesToInt(o2.getBinary(0).getValues(), Long.BYTES), + BytesUtils.bytesToInt(o1.getBinary(0).getValues(), Long.BYTES))); + minByComparators.put( + TSDataType.INT64, + (o1, o2) -> + Long.compare( + BytesUtils.bytesToLongFromOffset( + o2.getBinary(0).getValues(), Long.BYTES, Long.BYTES), + BytesUtils.bytesToLongFromOffset( + o1.getBinary(0).getValues(), Long.BYTES, Long.BYTES))); + minByComparators.put( + TSDataType.FLOAT, + (o1, o2) -> + Float.compare( + BytesUtils.bytesToFloat(o2.getBinary(0).getValues(), Long.BYTES), + BytesUtils.bytesToFloat(o1.getBinary(0).getValues(), Long.BYTES))); + minByComparators.put( + TSDataType.DOUBLE, + (o1, o2) -> + Double.compare( + BytesUtils.bytesToDouble(o2.getBinary(0).getValues(), Long.BYTES), + BytesUtils.bytesToDouble(o1.getBinary(0).getValues(), Long.BYTES))); } public static SlidingWindowAggregator createSlidingWindowAggregator( @@ -192,6 +223,9 @@ public class SlidingWindowAggregatorFactory { case MAX_BY: return new MonotonicQueueSlidingWindowAggregator( accumulator, inputLocationList, step, maxByComparators.get(dataTypes.get(1))); + case MIN_BY: + return new MonotonicQueueSlidingWindowAggregator( + accumulator, inputLocationList, step, minByComparators.get(dataTypes.get(1))); case COUNT_IF: throw new SemanticException("COUNT_IF with slidingWindow is not supported now"); case TIME_DURATION: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java index 3e3da458b7a..66dcc539851 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java @@ -443,6 +443,7 @@ public class ExpressionTypeAnalyzer { case SqlConstant.VAR_POP: case SqlConstant.VAR_SAMP: case SqlConstant.MAX_BY: + case SqlConstant.MIN_BY: return expressionTypes.get(NodeRef.of(inputExpressions.get(0))); default: throw new IllegalArgumentException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index d39498a70d7..3b93613c294 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -2920,6 +2920,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { return; case SqlConstant.COUNT_IF: case SqlConstant.MAX_BY: + case SqlConstant.MIN_BY: checkFunctionExpressionInputSize( functionExpression.getExpressionString(), functionExpression.getExpressions().size(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java index 806d5ced160..69eb807711e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java @@ -184,6 +184,9 @@ public class AggregationDescriptor { case MAX_BY: outputAggregationNames.add(addPartialSuffix(SqlConstant.MAX_BY)); break; + case MIN_BY: + outputAggregationNames.add(addPartialSuffix(SqlConstant.MIN_BY)); + break; case UDAF: outputAggregationNames.add(addPartialSuffix(aggregationFuncName)); break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java index 4cd94334460..533e1c7ab34 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java @@ -141,6 +141,7 @@ public class SchemaUtils { case SqlConstant.VAR_POP + "_partial": case SqlConstant.VAR_SAMP + "_partial": case SqlConstant.MAX_BY + "_partial": + case SqlConstant.MIN_BY + "_partial": return TSDataType.TEXT; case SqlConstant.LAST_VALUE: case SqlConstant.FIRST_VALUE: @@ -148,6 +149,7 @@ public class SchemaUtils { case SqlConstant.MAX_VALUE: case SqlConstant.MODE: case SqlConstant.MAX_BY: + case SqlConstant.MIN_BY: default: return null; } @@ -239,6 +241,7 @@ public class SchemaUtils { case VAR_POP: case VAR_SAMP: case MAX_BY: + case MIN_BY: case UDAF: return true; default: @@ -275,6 +278,8 @@ public class SchemaUtils { return Collections.singletonList(addPartialSuffix(SqlConstant.VAR_SAMP)); case MAX_BY: return Collections.singletonList(addPartialSuffix(SqlConstant.MAX_BY)); + case MIN_BY: + return Collections.singletonList(addPartialSuffix(SqlConstant.MIN_BY)); case AVG: return Arrays.asList(SqlConstant.COUNT, SqlConstant.SUM); case TIME_DURATION: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java index b80ae5a7a8f..187225f622a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java @@ -144,6 +144,7 @@ public class TypeInferenceUtils { case SqlConstant.EXTREME: case SqlConstant.MODE: case SqlConstant.MAX_BY: + case SqlConstant.MIN_BY: return dataType; case SqlConstant.AVG: case SqlConstant.SUM: @@ -191,6 +192,7 @@ public class TypeInferenceUtils { case SqlConstant.TIME_DURATION: case SqlConstant.MODE: case SqlConstant.MAX_BY: + case SqlConstant.MIN_BY: return; case SqlConstant.COUNT_IF: if (dataType != TSDataType.BOOLEAN) { @@ -236,6 +238,7 @@ public class TypeInferenceUtils { case SqlConstant.VAR_POP: case SqlConstant.VAR_SAMP: case SqlConstant.MAX_BY: + case SqlConstant.MIN_BY: return; case SqlConstant.COUNT_IF: Expression keepExpression = inputExpressions.get(1); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java index 686476ff6a0..8521e3ad2d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java @@ -48,6 +48,7 @@ public class SqlConstant { public static final String MAX_VALUE = "max_value"; public static final String MIN_VALUE = "min_value"; public static final String MAX_BY = "max_by"; + public static final String MIN_BY = "min_by"; public static final String EXTREME = "extreme"; public static final String FIRST_VALUE = "first_value"; public static final String LAST_VALUE = "last_value"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java index 12d881ad760..265e9184e1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java @@ -79,6 +79,10 @@ public class TestConstant { return String.format("max_by(%s, %s)", x, y); } + public static String minBy(String x, String y) { + return String.format("min_by(%s, %s)", x, y); + } + private TestConstant() {} public static String getTestTsFilePath( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java index 68326f23958..ca8d5eb0afd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java @@ -909,4 +909,37 @@ public class AccumulatorTest { maxByAccumulator.outputFinal(finalResult); Assert.assertEquals(-99, finalResult.build().getInt(0)); } + + @Test + public void minByAccumulatorTest() { + Accumulator minByAccumulator = + AccumulatorFactory.createBuiltinAccumulator( + TAggregationType.MIN_BY, + Arrays.asList(TSDataType.INT32, TSDataType.DOUBLE), + Collections.emptyList(), + Collections.emptyMap(), + true); + Assert.assertEquals(TSDataType.TEXT, minByAccumulator.getIntermediateType()[0]); + Assert.assertEquals(TSDataType.INT32, minByAccumulator.getFinalType()); + // Returns null if there's no data + ColumnBuilder[] intermediateResult = new ColumnBuilder[1]; + intermediateResult[0] = new BinaryColumnBuilder(null, 1); + minByAccumulator.outputIntermediate(intermediateResult); + Assert.assertTrue(intermediateResult[0].build().isNull(0)); + ColumnBuilder finalResult = new IntColumnBuilder(null, 1); + minByAccumulator.outputFinal(finalResult); + Assert.assertTrue(finalResult.build().isNull(0)); + + Column[] timeAndValueColumn = getTimeAndTwoValueColumns(1, 0); + minByAccumulator.addInput(timeAndValueColumn, null); + Assert.assertFalse(minByAccumulator.hasFinalResult()); + intermediateResult[0] = new BinaryColumnBuilder(null, 1); + minByAccumulator.outputIntermediate(intermediateResult); + + // add intermediate result as input + minByAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()}); + finalResult = new IntColumnBuilder(null, 1); + minByAccumulator.outputFinal(finalResult); + Assert.assertEquals(0, finalResult.build().getInt(0)); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java index c551b02bd8e..72bd4ac51af 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java @@ -45,7 +45,8 @@ public enum BuiltinAggregationFunction { VARIANCE("variance"), VAR_POP("var_pop"), VAR_SAMP("var_samp"), - MAX_BY("max_by"); + MAX_BY("max_by"), + MIN_BY("min_by"); private final String functionName; @@ -93,6 +94,7 @@ public enum BuiltinAggregationFunction { case "var_pop": case "var_samp": case "max_by": + case "min_by": return false; default: throw new IllegalArgumentException("Invalid Aggregation function: " + name); @@ -124,6 +126,7 @@ public enum BuiltinAggregationFunction { case "var_pop": case "var_samp": case "max_by": + case "min_by": return true; case "count_if": case "count_time": diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index b0ad612bdd1..ba5a2da6b09 100644 --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift @@ -199,5 +199,6 @@ enum TAggregationType { VAR_POP, VAR_SAMP, MAX_BY, + MIN_BY, UDAF } \ No newline at end of file
