This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fd4c14b [IOTDB-1773] Aligned timeseries support group by query with
value filter for standalone mode (#4511)
fd4c14b is described below
commit fd4c14bd100be387f91be7d9dcd9daf4069b0ac0
Author: liuminghui233 <[email protected]>
AuthorDate: Sat Dec 4 10:42:34 2021 +0800
[IOTDB-1773] Aligned timeseries support group by query with value filter
for standalone mode (#4511)
---
.../groupby/ClusterGroupByVFilterDataSet.java | 10 +-
.../db/integration/aligned/AlignedWriteUtil.java | 7 +
.../IoTDBGroupByQueryWithValueFilter2IT.java | 65 ++
.../IoTDBGroupByQueryWithValueFilterIT.java | 1094 ++++++++++++++++++++
...GroupByQueryWithValueFilterWithDeletion2IT.java | 76 ++
...BGroupByQueryWithValueFilterWithDeletionIT.java | 826 +++++++++++++++
.../dataset/groupby/GroupByEngineDataSet.java | 3 -
.../dataset/groupby/GroupByFillEngineDataSet.java | 6 -
.../groupby/GroupByWithValueFilterDataSet.java | 127 ++-
.../groupby/GroupByWithoutValueFilterDataSet.java | 19 -
.../db/query/executor/AggregationExecutor.java | 28 +-
.../iotdb/db/utils/AlignedValueIterator.java | 5 +-
.../java/org/apache/iotdb/db/utils/QueryUtils.java | 21 +
13 files changed, 2165 insertions(+), 122 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
index bd1fb12..6f9414f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
@@ -30,9 +30,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import java.util.ArrayList;
@@ -64,16 +62,12 @@ public class ClusterGroupByVFilterDataSet extends
GroupByWithValueFilterDataSet
@Override
protected IReaderByTimestamp getReaderByTime(
- PartialPath path,
- RawDataQueryPlan dataQueryPlan,
- TSDataType dataType,
- QueryContext context,
- TsFileFilter fileFilter)
+ PartialPath path, RawDataQueryPlan dataQueryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException {
return readerFactory.getReaderByTimestamp(
path,
dataQueryPlan.getAllMeasurementsInDevice(path.getDevice()),
- dataType,
+ path.getSeriesType(),
context,
dataQueryPlan.isAscending(),
null);
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/AlignedWriteUtil.java
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/AlignedWriteUtil.java
index f0e8e55..7c509f0 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/AlignedWriteUtil.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/AlignedWriteUtil.java
@@ -24,6 +24,13 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
+/**
+ * This class generates data for test cases in aligned time series scenarios.
+ *
+ * <p>You can comprehensively view the generated data in the following online
doc:
+ *
+ *
<p>https://docs.google.com/spreadsheets/d/1kfrSR1_paSd9B1Z0jnPBD3WQIMDslDuNm4R0mpWx9Ms/edit?usp=sharing
+ */
public class AlignedWriteUtil {
private static final String[] sqls =
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilter2IT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilter2IT.java
new file mode 100644
index 0000000..e0ca4a0
--- /dev/null
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilter2IT.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.integration.env.ConfigFactory;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBGroupByQueryWithValueFilter2IT extends
IoTDBGroupByQueryWithValueFilterIT {
+
+ private static int numOfPointsPerPage;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeClass();
+ // TODO When the aligned time series support compaction, we need to set
compaction to true
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+
IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ prevPartitionInterval =
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ numOfPointsPerPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(3);
+ AlignedWriteUtil.insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+
ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ ConfigFactory.getConfig().setPartitionInterval(prevPartitionInterval);
+
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+ EnvironmentUtils.cleanEnv();
+ }
+}
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilterIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilterIT.java
new file mode 100644
index 0000000..df9d628
--- /dev/null
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilterIT.java
@@ -0,0 +1,1094 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.integration.env.ConfigFactory;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.constant.TestConstant.avg;
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.firstValue;
+import static org.apache.iotdb.db.constant.TestConstant.lastValue;
+import static org.apache.iotdb.db.constant.TestConstant.maxTime;
+import static org.apache.iotdb.db.constant.TestConstant.maxValue;
+import static org.apache.iotdb.db.constant.TestConstant.minTime;
+import static org.apache.iotdb.db.constant.TestConstant.minValue;
+import static org.apache.iotdb.db.constant.TestConstant.sum;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBGroupByQueryWithValueFilterIT {
+
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+ protected static long prevPartitionInterval;
+
+ private static final String TIMESTAMP_STR = "Time";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeClass();
+ // TODO When the aligned time series support compaction, we need to set
compaction to true
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+
IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ prevPartitionInterval =
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+ AlignedWriteUtil.insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+
ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ ConfigFactory.getConfig().setPartitionInterval(prevPartitionInterval);
+ EnvFactory.getEnv().cleanAfterClass();
+ }
+
+ @Test
+ public void countSumAvgTest1() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,5,30.0,6006.0", "11,10,130142.0,13014.2", "21,1,null,230000.0",
"31,0,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+ + "where s1 > 5 and time < 35 GROUP BY ([1, 41), 10ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s1"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+ + " where s1 > 5 and time < 35 GROUP BY ([1, 41), 10ms)
order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s1"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void countSumAvgTest2() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,1,null,30000.0",
+ "6,4,40.0,7.5",
+ "11,5,130052.0,26010.4",
+ "16,5,90.0,18.0",
+ "21,1,null,230000.0",
+ "26,0,null,null",
+ "31,0,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+ + "where s3 > 5 and time < 30 GROUP BY ([1, 36), 5ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s1"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+ + " where s3 > 5 and time < 30 GROUP BY ([1, 36), 5ms) order
by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s1"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void countSumAvgWithSlidingStepTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,1,null,30000.0",
+ "7,3,34.0,8.0",
+ "13,4,130045.0,32511.25",
+ "19,2,39.0,19.5",
+ "25,0,null,null",
+ "31,0,null,null",
+ "37,0,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+ + "where s3 > 5 GROUP BY ([1, 41), 4ms, 6ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s1"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+ + " where s3 > 5 GROUP BY ([1, 41), 4ms, 6ms) order by time
desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s1"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void countSumAvgWithNonAlignedTimeseriesTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,0,null,null,0,null,null",
+ "7,3,34.0,8.0,4,34.0,8.5",
+ "13,3,45.0,15.0,3,45.0,15.0",
+ "19,2,39.0,19.5,4,39.0,20.5",
+ "25,0,null,null,0,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(d1.s1), sum(d2.s2), avg(d2.s1), count(d1.s3),
sum(d1.s2), avg(d2.s3) "
+ + "from root.sg1 where d2.s3 > 5 and d1.s3 < 25 GROUP BY
([1, 31), 4ms, 6ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d2.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d2.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d2.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(d1.s1), sum(d2.s2), avg(d2.s1), count(d1.s3),
sum(d1.s2), avg(d2.s3) "
+ + "from root.sg1 where d2.s3 > 5 and d1.s3 < 25 GROUP BY
([1, 31), 4ms, 6ms) "
+ + "order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d2.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d2.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(sum("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d2.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueTimeTest1() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,30000,6.0,9,3",
+ "11,130000,11.0,20,11",
+ "21,230000,230000.0,null,23",
+ "31,null,null,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + "where s1 > 5 and time < 35 GROUP BY ([1, 41), 10ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + " where s1 > 5 and time < 35 GROUP BY ([1, 41), 10ms)
order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueTimeTest2() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,30000,30000.0,null,3",
+ "6,10,6.0,10,6",
+ "11,130000,11.0,15,11",
+ "16,20,16.0,20,16",
+ "21,230000,230000.0,null,21",
+ "26,29,null,null,26"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + "where s3 > 5 and time < 30 GROUP BY ([1, 31), 5ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + " where s3 > 5 and time < 30 GROUP BY ([1, 31), 5ms) order
by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueTimeWithSlidingStepTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,30000,30000.0,null,3",
+ "7,10,7.0,10,7",
+ "13,130000,14.0,16,13",
+ "19,22,19.0,20,19",
+ "25,28,null,null,25",
+ "31,null,null,null,null",
+ "37,null,null,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + "where s3 > 5 and time < 30 GROUP BY ([1, 41), 4ms, 6ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + " where s3 > 5 and time < 30 GROUP BY ([1, 41), 4ms, 6ms)
order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueTimeWithNonAlignedTimeseriesTest() throws
SQLException {
+ String[] retArray =
+ new String[] {
+ "1,null,null,null,null,null,null,null,null",
+ "7,10,7.0,10,7,10,7.0,10,7",
+ "13,16,14.0,16,14,16,14.0,16,14",
+ "19,22,19.0,20,19,22,19.0,20,19",
+ "25,null,null,null,null,null,null,null,null",
+ "31,null,null,null,null,null,null,null,null",
+ "37,null,null,null,null,null,null,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(d2.s3), min_value(d1.s1), max_time(d2.s2),
min_time(d1.s3), "
+ + "max_value(d1.s3), min_value(d2.s1), max_time(d1.s2),
min_time(d2.s3) "
+ + "from root.sg1 where d2.s3 > 5 and d1.s3 < 25 GROUP BY
([1, 41), 4ms, 6ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d2.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d2.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d2.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d2.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select max_value(d2.s3), min_value(d1.s1), max_time(d2.s2),
min_time(d1.s3), "
+ + "max_value(d1.s3), min_value(d2.s1), max_time(d1.s2),
min_time(d2.s3) "
+ + "from root.sg1 where d2.s3 > 5 and d1.s3 < 25 GROUP BY
([1, 41), 4ms, 6ms) "
+ + " order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d2.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d2.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d2.s1"))
+ + ","
+ + resultSet.getString(maxTime("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d2.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void firstLastTest1() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,true,aligned_test1", "11,true,aligned_unseq_test13",
"21,true,null", "31,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(s4), first_value(s5) from root.sg1.d1 "
+ + "where s4 = true GROUP BY ([1, 41), 10ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(s4), first_value(s5) from root.sg1.d1 "
+ + " where s4 = true GROUP BY ([1, 41), 10ms) order by time
desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void firstLastTest2() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,null,null", "6,false,aligned_test7", "11,null,null",
"16,null,null",
+ "21,false,null", "26,false,null", "31,null,null", "36,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(s4), first_value(s5) from root.sg1.d1 "
+ + "where s4 = false GROUP BY ([1, 41), 5ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(s4), first_value(s5) from root.sg1.d1 "
+ + " where s4 = false GROUP BY ([1, 41), 5ms) order by time
desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void firstLastWithSlidingStepTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,true,aligned_test1",
+ "7,true,aligned_test10",
+ "13,true,aligned_unseq_test13",
+ "19,true,null",
+ "25,true,null",
+ "31,null,null",
+ "37,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(s4), first_value(s5) from root.sg1.d1 "
+ + "where s4 != false GROUP BY ([1, 41), 4ms, 6ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(s4), first_value(s5) from root.sg1.d1 "
+ + " where s4 != false GROUP BY ([1, 41), 4ms, 6ms) order by
time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void firstLastWithNonAlignedTimeseriesTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,non_aligned_test4,true,aligned_test4,true",
+ "7,non_aligned_test10,false,aligned_test10,false",
+ "13,null,true,aligned_unseq_test13,null",
+ "19,null,null,null,null",
+ "25,null,null,null,null",
+ "31,non_aligned_test34,null,aligned_test34,null",
+ "37,non_aligned_test40,null,aligned_test40,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(d2.s5), first_value(d1.s4),
last_value(d1.s5), first_value(d2.s4) "
+ + "from root.sg1 where d1.s5 like 'aligned_unseq_test%' or
d2.s5 like 'non_aligned_test%' "
+ + "GROUP BY ([1, 41), 4ms, 6ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d2.s5"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d2.s4"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(d2.s5), first_value(d1.s4),
last_value(d1.s5), first_value(d2.s4) "
+ + "from root.sg1 where d1.s5 like 'aligned_unseq_test%' or
d2.s5 like 'non_aligned_test%' "
+ + "GROUP BY ([1, 41), 4ms, 6ms) order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d2.s5"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d2.s4"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void groupByWithWildcardTest1() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,5,4,5,4,4,9.0,9,9,false,aligned_test9",
+ "11,10,10,10,1,1,20.0,20,20,true,aligned_unseq_test13",
+ "21,1,0,1,1,0,230000.0,null,230000,false,null",
+ "31,0,0,0,0,0,null,null,null,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(*), last_value(*) from root.sg1.d1 "
+ + " where s1 > 5 and time < 35 GROUP BY ([1, 41), 10ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s5"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(*), last_value(*) from root.sg1.d1 "
+ + " where s1 > 5 and time < 35 GROUP BY ([1, 41), 10ms)
order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s5"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void groupByWithWildcardTest2() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,1,0,1,1,1",
+ "5,2,2,2,2,1",
+ "9,2,3,3,2,2",
+ "13,3,3,3,1,1",
+ "17,3,3,3,0,0",
+ "21,1,0,1,1,0",
+ "25,0,0,0,0,0",
+ "29,0,1,0,0,1",
+ "33,0,3,0,0,3",
+ "37,0,1,0,0,1"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(*) from root.sg1.d1 "
+ + "where s1 > 5 or s2 > 5 and time < 38 GROUP BY ([1, 41),
3ms, 4ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(*) from root.sg1.d1 "
+ + " where s1 > 5 or s2 > 5 and time < 38 GROUP BY ([1, 41),
3ms, 4ms) "
+ + " order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void groupByWithWildcardTest3() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,30000.0,null,30000,true,aligned_unseq_test3",
+ "5,7.0,7,7,false,aligned_test7",
+ "9,11.0,11,11,true,aligned_test10",
+ "13,15.0,15,15,true,aligned_unseq_test13",
+ "17,19.0,19,19,null,null",
+ "21,230000.0,null,230000,false,null",
+ "25,null,null,null,null,null",
+ "29,null,31,null,null,aligned_test31",
+ "33,null,35,null,null,aligned_test35",
+ "37,null,37,null,null,aligned_test37"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(*) from root.sg1.d1 "
+ + "where s1 > 5 or s2 > 5 and time < 38 GROUP BY ([1, 41),
3ms, 4ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(*) from root.sg1.d1 "
+ + " where s1 > 5 or s2 > 5 and time < 38 GROUP BY ([1, 41),
3ms, 4ms) order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s2"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+}
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilterWithDeletion2IT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilterWithDeletion2IT.java
new file mode 100644
index 0000000..6980890
--- /dev/null
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilterWithDeletion2IT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.integration.env.ConfigFactory;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBGroupByQueryWithValueFilterWithDeletion2IT
+ extends IoTDBGroupByQueryWithValueFilterWithDeletionIT {
+
+ private static int numOfPointsPerPage;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeClass();
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+
IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ prevPartitionInterval =
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ numOfPointsPerPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(3);
+ AlignedWriteUtil.insertData();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("delete from root.sg1.d1.s1 where time <= 15");
+ statement.execute("delete timeseries root.sg1.d1.s2");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+
ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ ConfigFactory.getConfig().setPartitionInterval(prevPartitionInterval);
+
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+ EnvFactory.getEnv().cleanAfterClass();
+ }
+}
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilterWithDeletionIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilterWithDeletionIT.java
new file mode 100644
index 0000000..c47915d
--- /dev/null
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithValueFilterWithDeletionIT.java
@@ -0,0 +1,826 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.integration.env.ConfigFactory;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.constant.TestConstant.avg;
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.firstValue;
+import static org.apache.iotdb.db.constant.TestConstant.lastValue;
+import static org.apache.iotdb.db.constant.TestConstant.maxValue;
+import static org.apache.iotdb.db.constant.TestConstant.minTime;
+import static org.apache.iotdb.db.constant.TestConstant.minValue;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBGroupByQueryWithValueFilterWithDeletionIT {
+
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+ protected static long prevPartitionInterval;
+
+ private static final String TIMESTAMP_STR = "Time";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeClass();
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+
IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ prevPartitionInterval =
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+ AlignedWriteUtil.insertData();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("delete from root.sg1.d1.s1 where time <= 15");
+ statement.execute("delete timeseries root.sg1.d1.s2");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+
ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ ConfigFactory.getConfig().setPartitionInterval(prevPartitionInterval);
+ EnvFactory.getEnv().cleanAfterClass();
+ }
+
+ @Test
+ public void countSumAvgTest1() throws SQLException {
+ String[] retArray =
+ new String[] {"1,0,5006.666666666667", "11,5,13014.2", "21,1,25578.0",
"31,0,null"};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s3) from root.sg1.d1 "
+ + "where s3 > 5 and time < 30 GROUP BY ([1, 41), 10ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s3) from root.sg1.d1 "
+ + " where s3 > 5 and time < 30 GROUP BY ([1, 41), 10ms)
order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void countSumAvgTest2() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,0,30000.0", "6,0,8.0", "11,0,26010.4", "16,5,18.0",
+ "21,1,46018.4", "26,0,27.5", "31,0,null", "36,0,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s3) from root.sg1.d1 "
+ + "where s3 > 5 and time < 30 GROUP BY ([1, 41), 5ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s3) from root.sg1.d1 "
+ + " where s3 > 5 and time < 30 GROUP BY ([1, 41), 5ms) order
by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void countSumAvgWithSlidingStepTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,0,30000.0",
+ "7,0,8.5",
+ "13,1,32511.25",
+ "19,2,20.5",
+ "25,0,26.5",
+ "31,0,null",
+ "37,0,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s3) from root.sg1.d1 "
+ + "where s3 > 5 GROUP BY ([1, 41), 4ms, 6ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s2), avg(s3) from root.sg1.d1 "
+ + " where s3 > 5 GROUP BY ([1, 41), 4ms, 6ms) order by time
desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(avg("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueTimeTest1() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,30000,null,3", "11,130000,16.0,11", "21,230000,230000.0,21",
"31,null,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + "where s3 > 5 GROUP BY ([1, 41), 10ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + " where s3 > 5 GROUP BY ([1, 41), 10ms) order by time
desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueTimeTest2() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,30000,null,3",
+ "6,10,null,6",
+ "11,130000,null,11",
+ "16,20,16.0,16",
+ "21,230000,230000.0,21",
+ "26,30,null,26",
+ "31,null,null,null",
+ "36,null,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + "where s3 > 5 and time < 38 GROUP BY ([1, 41), 5ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + " where s3 > 5 and time < 38 GROUP BY ([1, 41), 5ms) order
by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void maxMinValueTimeWithSlidingStepTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,30000,null,3",
+ "7,10,null,7",
+ "13,130000,16.0,13",
+ "19,22,19.0,19",
+ "25,28,null,25",
+ "31,null,null,null",
+ "37,null,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + "where s3 > 5 GROUP BY ([1, 41), 4ms, 6ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select max_value(s3), min_value(s1), max_time(s2), min_time(s3)
from root.sg1.d1 "
+ + " where s3 > 5 GROUP BY ([1, 41), 4ms, 6ms) order by time
desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(maxValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(minValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(minTime("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void firstLastTest1() throws SQLException {
+ String[] retArray =
+ new String[] {"1,null,30000", "11,20.0,11", "21,230000.0,21",
"31,null,null"};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(s1), first_value(s3) from root.sg1.d1 "
+ + "where s3 > 5 GROUP BY ([1, 41), 10ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(s1), first_value(s3) from root.sg1.d1 "
+ + " where s3 > 5 GROUP BY ([1, 41), 10ms) order by time
desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void firstLastTest2() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,null,30000",
+ "6,null,6",
+ "11,null,11",
+ "16,20.0,16",
+ "21,230000.0,21",
+ "26,null,26",
+ "31,null,null",
+ "36,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(s1), first_value(s3) from root.sg1.d1 "
+ + "where s3 > 5 and time < 30 GROUP BY ([1, 41), 5ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(s1), first_value(s3) from root.sg1.d1 "
+ + " where s3 > 5 and time < 30 GROUP BY ([1, 41), 5ms) order
by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void firstLastWithSlidingStepTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,null,30000",
+ "7,null,7",
+ "13,16.0,130000",
+ "19,20.0,19",
+ "25,null,25",
+ "31,null,null",
+ "37,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(s1), first_value(s3) from root.sg1.d1 "
+ + "where s3 > 5 and time < 30 GROUP BY ([1, 41), 4ms, 6ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(s1), first_value(s3) from root.sg1.d1 "
+ + " where s3 > 5 and time < 30 GROUP BY ([1, 41), 4ms, 6ms)
order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(firstValue("root.sg1.d1.s3"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void groupByWithWildcardTest1() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,0,7,8,8,null,10,true,aligned_test10",
+ "11,5,10,1,1,20.0,20,true,aligned_unseq_test13",
+ "21,1,10,10,0,230000.0,30,false,null",
+ "31,0,0,0,0,null,null,null,null"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(*), last_value(*) from root.sg1.d1 "
+ + " where s3 > 5 or s4 = true GROUP BY ([1, 41), 10ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s5"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(*), last_value(*) from root.sg1.d1 "
+ + "where s3 > 5 or s4 = true GROUP BY ([1, 41), 10ms) order
by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s5"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void groupByWithWildcardTest2() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,0,1,1,1",
+ "5,0,2,2,1",
+ "9,0,3,2,2",
+ "13,0,3,1,1",
+ "17,3,3,0,0",
+ "21,1,3,3,0",
+ "25,0,3,3,0",
+ "29,0,2,2,1",
+ "33,0,0,0,3",
+ "37,0,0,0,3"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select count(*) from root.sg1.d1 "
+ + "where s3 > 5 or s5 like 'aligned_test3%' GROUP BY ([1,
41), 3ms, 4ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(*) from root.sg1.d1 where s3 > 5 or s5 like
'aligned_test3%' "
+ + " GROUP BY ([1, 41), 3ms, 4ms) order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(count("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void groupByWithWildcardTest3() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "1,null,30000,true,aligned_unseq_test3",
+ "5,null,7,false,aligned_test7",
+ "9,null,11,true,aligned_test10",
+ "13,null,15,true,aligned_unseq_test13",
+ "17,19.0,19,null,null",
+ "21,230000.0,230000,false,null",
+ "25,null,27,false,null",
+ "29,null,30,false,aligned_test31",
+ "33,null,null,null,aligned_test35",
+ "37,null,null,null,aligned_test39"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select last_value(*) from root.sg1.d1 "
+ + "where s3 > 5 or s5 like 'aligned_test3%' GROUP BY ([1,
41), 3ms, 4ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select last_value(*) from root.sg1.d1 where s3 > 5 or s5 like
'aligned_test3%' "
+ + " GROUP BY ([1, 41), 3ms, 4ms) order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = retArray.length;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s3"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s4"))
+ + ","
+ + resultSet.getString(lastValue("root.sg1.d1.s5"));
+ Assert.assertEquals(retArray[cnt - 1], ans);
+ cnt--;
+ }
+ Assert.assertEquals(0, cnt);
+ }
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 39aa84f..1019b11 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -231,6 +230,4 @@ public abstract class GroupByEngineDataSet extends
QueryDataSet {
hasCachedTimeInterval = false;
return new Pair<>(curStartTime, curEndTime);
}
-
- public abstract Pair<Long, Object> peekNextNotNullValue(Path path, int i)
throws IOException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillEngineDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillEngineDataSet.java
index 6202fc9..e3684a4 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillEngineDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillEngineDataSet.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.db.query.executor.fill.ValueFill;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -404,9 +403,4 @@ public abstract class GroupByFillEngineDataSet extends
GroupByEngineDataSet {
}
}
}
-
- @Override
- public Pair<Long, Object> peekNextNotNullValue(Path path, int i) throws
IOException {
- throw new IOException("Group by fill doesn't support peek next");
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 4ea2035..d052381 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -24,33 +24,39 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.stream.Collectors;
public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
- protected List<IReaderByTimestamp> allDataReaderList;
+ private Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap;
+
protected GroupByTimePlan groupByTimePlan;
private TimeGenerator timestampGenerator;
/** cached timestamp for next group by partition. */
@@ -71,7 +77,7 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngineDataSet {
@TestOnly
public GroupByWithValueFilterDataSet(long queryId, GroupByTimePlan
groupByTimePlan) {
super(new QueryContext(queryId), groupByTimePlan);
- this.allDataReaderList = new ArrayList<>();
+ this.readerToAggrIndexesMap = new HashMap<>();
this.timeStampFetchSize =
IoTDBDescriptor.getInstance().getConfig().getBatchSize();
}
@@ -79,20 +85,43 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngineDataSet {
public void initGroupBy(QueryContext context, GroupByTimePlan
groupByTimePlan)
throws StorageEngineException, QueryProcessException {
this.timestampGenerator = getTimeGenerator(context, groupByTimePlan);
- this.allDataReaderList = new ArrayList<>();
+ this.readerToAggrIndexesMap = new HashMap<>();
this.groupByTimePlan = groupByTimePlan;
+ List<PartialPath> selectedSeries = new ArrayList<>();
+ groupByTimePlan
+ .getDeduplicatedPaths()
+ .forEach(k -> selectedSeries.add(((MeasurementPath)
k).transformToExactPath()));
+
+ Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
+ MetaUtils.groupAggregationsBySeries(selectedSeries);
+ Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap =
+ MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap);
+
List<StorageGroupProcessor> list =
StorageEngine.getInstance()
.mergeLock(paths.stream().map(p -> (PartialPath)
p).collect(Collectors.toList()));
try {
- for (int i = 0; i < paths.size(); i++) {
- PartialPath path = (PartialPath) paths.get(i);
- allDataReaderList.add(
- getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context,
null));
+ // init non-aligned series reader
+ for (PartialPath path : pathToAggrIndexesMap.keySet()) {
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTime(path, groupByTimePlan, context);
+ readerToAggrIndexesMap.put(
+ seriesReaderByTimestamp,
Collections.singletonList(pathToAggrIndexesMap.get(path)));
+ }
+ // init aligned series reader
+ for (PartialPath alignedPath : alignedPathToAggrIndexesMap.keySet()) {
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTime(alignedPath, groupByTimePlan, context);
+ readerToAggrIndexesMap.put(
+ seriesReaderByTimestamp,
alignedPathToAggrIndexesMap.get(alignedPath));
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
+
+ // assign null to be friendly for GC
+ pathToAggrIndexesMap = null;
+ alignedPathToAggrIndexesMap = null;
}
}
@@ -102,19 +131,15 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngineDataSet {
}
protected IReaderByTimestamp getReaderByTime(
- PartialPath path,
- RawDataQueryPlan queryPlan,
- TSDataType dataType,
- QueryContext context,
- TsFileFilter fileFilter)
+ PartialPath path, RawDataQueryPlan queryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException {
return new SeriesReaderByTimestamp(
path,
queryPlan.getAllMeasurementsInDevice(path.getDevice()),
- dataType,
+ path.getSeriesType(),
context,
QueryResourceManager.getInstance().getQueryDataSource(path, context,
null),
- fileFilter,
+ null,
ascending);
}
@@ -159,10 +184,7 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngineDataSet {
timeArrayLength = constructTimeArrayForOneCal(timestampArray,
timeArrayLength);
// cal result using timestamp array
- for (int i = 0; i < paths.size(); i++) {
- curAggregateResults[i].updateResultUsingTimestamps(
- timestampArray, timeArrayLength, allDataReaderList.get(i));
- }
+ calcUsingTimestampArray(timestampArray, timeArrayLength);
timeArrayLength = 0;
// judge if it's end
@@ -174,58 +196,31 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngineDataSet {
if (timeArrayLength > 0) {
// cal result using timestamp array
- for (int i = 0; i < paths.size(); i++) {
- curAggregateResults[i].updateResultUsingTimestamps(
- timestampArray, timeArrayLength, allDataReaderList.get(i));
- }
+ calcUsingTimestampArray(timestampArray, timeArrayLength);
}
return constructRowRecord(curAggregateResults);
}
- @Override
- @SuppressWarnings("squid:S3776")
- public Pair<Long, Object> peekNextNotNullValue(Path path, int i) throws
IOException {
- if ((!timestampGenerator.hasNext() && cachedTimestamps.isEmpty())
- || allDataReaderList.get(i).readerIsEmpty()) {
- return null;
- }
-
- long[] timestampArray = new long[1];
- AggregateResult aggrResultByName =
- AggregateResultFactory.getAggrResultByName(
- groupByTimePlan.getDeduplicatedAggregations().get(i),
- groupByTimePlan.getDeduplicatedDataTypes().get(i),
- ascending);
+ private void calcUsingTimestampArray(long[] timestampArray, int
timeArrayLength)
+ throws IOException {
+ for (Entry<IReaderByTimestamp, List<List<Integer>>> entry :
readerToAggrIndexesMap.entrySet()) {
+ IReaderByTimestamp reader = entry.getKey();
+ List<List<Integer>> subIndexes = entry.getValue();
+ int subSensorSize = subIndexes.size();
- long tmpStartTime = curStartTime - slidingStep;
- int index = 0;
- while (tmpStartTime >= startTime
- && (timestampGenerator.hasNext() || !cachedTimestamps.isEmpty())) {
- long timestamp = Long.MIN_VALUE;
- if (timestampGenerator.hasNext()) {
- cachedTimestamps.add(timestampGenerator.next());
- }
- if (!cachedTimestamps.isEmpty() && index < cachedTimestamps.size()) {
- timestamp = cachedTimestamps.get(index++);
- }
- if (timestamp >= tmpStartTime) {
- timestampArray[0] = timestamp;
- } else {
- do {
- tmpStartTime -= slidingStep;
- if (timestamp >= tmpStartTime) {
- timestampArray[0] = timestamp;
- break;
+ Object[] values = reader.getValuesInTimestamps(timestampArray,
timeArrayLength);
+ ValueIterator valueIterator = QueryUtils.generateValueIterator(values);
+ if (valueIterator != null) {
+ for (int curIndex = 0; curIndex < subSensorSize; curIndex++) {
+ valueIterator.setSubMeasurementIndex(curIndex);
+ for (Integer index : subIndexes.get(curIndex)) {
+ curAggregateResults[index].updateResultUsingValues(
+ timestampArray, timeArrayLength, valueIterator);
+ valueIterator.reset();
}
- } while (tmpStartTime >= startTime);
- }
- aggrResultByName.updateResultUsingTimestamps(timestampArray, 1,
allDataReaderList.get(i));
-
- if (aggrResultByName.getResult() != null) {
- return new Pair<>(tmpStartTime, aggrResultByName.getResult());
+ }
}
}
- return null;
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index ea3f1a4..8de5276 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -32,12 +32,10 @@ import
org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.filter.TsFileFilter;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -224,23 +222,6 @@ public class GroupByWithoutValueFilterDataSet extends
GroupByEngineDataSet {
return curAggregateResults;
}
- @Override
- public Pair<Long, Object> peekNextNotNullValue(Path path, int i) throws
IOException {
- Pair<Long, Object> result = null;
- long nextStartTime = curStartTime;
- long nextEndTime;
- do {
- nextStartTime -= slidingStep;
- if (nextStartTime >= startTime) {
- nextEndTime = Math.min(nextStartTime + interval, endTime);
- } else {
- return null;
- }
- result = pathExecutors.get(path).peekNextNotNullValue(nextStartTime,
nextEndTime);
- } while (result == null);
- return result;
- }
-
protected GroupByExecutor getGroupByExecutor(
PartialPath path,
Set<String> allSensors,
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 0779a20..fdb0b88 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -45,7 +45,6 @@ import
org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
-import org.apache.iotdb.db.utils.AlignedValueIterator;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -58,7 +57,6 @@ import
org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.util.ArrayList;
@@ -696,13 +694,15 @@ public class AggregationExecutor {
// if cached in timeGenerator
if (cached.get(pathId)) {
Object[] values =
timestampGenerator.getValues(selectedSeries.get(pathId));
- ValueIterator valueIterator = generateValueIterator(values);
- for (Integer index : subIndexes) {
- aggregateResultList[index].updateResultUsingValues(
- timeArray, timeArrayLength, valueIterator);
- valueIterator.reset();
+ ValueIterator valueIterator =
QueryUtils.generateValueIterator(values);
+ if (valueIterator != null) {
+ for (Integer index : subIndexes) {
+ aggregateResultList[index].updateResultUsingValues(
+ timeArray, timeArrayLength, valueIterator);
+ valueIterator.reset();
+ }
+ cachedOrNot[i] = true;
}
- cachedOrNot[i] = true;
}
}
@@ -710,8 +710,8 @@ public class AggregationExecutor {
// TODO: if we only need to get firstValue/minTime that's not need
to traverse all values,
// it's enough to get the exact number of values for these specific
aggregate func
Object[] values = entry.getKey().getValuesInTimestamps(timeArray,
timeArrayLength);
- if (values != null) {
- ValueIterator valueIterator = generateValueIterator(values);
+ ValueIterator valueIterator =
QueryUtils.generateValueIterator(values);
+ if (valueIterator != null) {
for (int i = 0; i < entry.getValue().size(); i++) {
if (!cachedOrNot[i]) {
valueIterator.setSubMeasurementIndex(i);
@@ -728,14 +728,6 @@ public class AggregationExecutor {
}
}
- private ValueIterator generateValueIterator(Object[] values) {
- if (values[0] instanceof TsPrimitiveType[]) {
- return new AlignedValueIterator(values);
- } else {
- return new ValueIterator(values);
- }
- }
-
/** Return whether there is result that has not been cached */
private boolean hasRemaining(boolean[] cachedOrNot) {
for (int i = 0; i < cachedOrNot.length; i++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
index 5d34d4d..60ca8d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
@@ -37,7 +37,8 @@ public class AlignedValueIterator extends ValueIterator {
@Override
public boolean hasNext() {
while (curPos < values.length
- && ((TsPrimitiveType[]) values[curPos])[subMeasurementIndex] == null) {
+ && (values[curPos] == null
+ || ((TsPrimitiveType[]) values[curPos])[subMeasurementIndex] ==
null)) {
curPos++;
}
return curPos < values.length;
@@ -50,7 +51,7 @@ public class AlignedValueIterator extends ValueIterator {
@Override
public Object get(int index) {
- if (((TsPrimitiveType[]) values[index])[subMeasurementIndex] == null) {
+ if (values[index] == null || ((TsPrimitiveType[])
values[index])[subMeasurementIndex] == null) {
return null;
}
return ((TsPrimitiveType[]) values[index])[subMeasurementIndex].getValue();
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
index 2feaa2f..90b4dab 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.util.List;
@@ -173,4 +174,24 @@ public class QueryUtils {
seqResources.removeIf(fileFilter::fileNotSatisfy);
unseqResources.removeIf(fileFilter::fileNotSatisfy);
}
+
+ public static ValueIterator generateValueIterator(Object[] values) {
+ if (values == null) {
+ return null;
+ }
+ // find the first element that is not NULL
+ int index = 0;
+ while (index < values.length && values[index] == null) {
+ index++;
+ }
+ if (index == values.length) {
+ // all elements are NULL
+ return null;
+ }
+ if (values[index] instanceof TsPrimitiveType[]) {
+ return new AlignedValueIterator(values);
+ } else {
+ return new ValueIterator(values);
+ }
+ }
}