This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 98ba544 Modify the getAggrResult method, don't destroy polymorphism
too early (#3209)
98ba544 is described below
commit 98ba544f3ac69ae9942f66c7c5ccce0923f3b56e
Author: Dawei Liu <[email protected]>
AuthorDate: Mon May 24 20:12:48 2021 +0800
Modify the getAggrResult method, don't destroy polymorphism too early
(#3209)
* Modify the getAggrResult method, don't destroy polymorphism too early
* spotless
* add vector count test to testcontainer
* add more test
---
.../iotdb/cluster/query/LocalQueryExecutor.java | 58 ++++++++--------
.../cluster/query/aggregate/ClusterAggregator.java | 10 +--
.../test/java/org/apache/iotdb/db/sql/Cases.java | 78 +++++++++++++++++++++-
.../java/org/apache/iotdb/db/sql/ClusterIT.java | 3 +
.../java/org/apache/iotdb/db/sql/SingleNodeIT.java | 3 +
5 files changed, 116 insertions(+), 36 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 0731ce1..dcd00c8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -592,7 +592,17 @@ public class LocalQueryExecutor {
List<String> aggregations = request.getAggregations();
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
- String path = request.getPath();
+ PartialPath path = null;
+ try {
+ path = new PartialPath(request.getPath());
+ } catch (IllegalPathException e) {
+ logger.error(
+ "{}: aggregation has error path: {}, queryId: {}",
+ name,
+ request.getPath(),
+ request.getQueryId());
+ throw new QueryProcessException(e);
+ }
Filter timeFilter = null;
if (request.isSetTimeFilterBytes()) {
timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
@@ -643,7 +653,7 @@ public class LocalQueryExecutor {
List<String> aggregations,
Set<String> allSensors,
TSDataType dataType,
- String path,
+ PartialPath path,
Filter timeFilter,
QueryContext context,
boolean ascending)
@@ -662,30 +672,26 @@ public class LocalQueryExecutor {
List<Integer> nodeSlots =
((SlotPartitionTable)
dataGroupMember.getMetaGroupMember().getPartitionTable())
.getNodeSlots(dataGroupMember.getHeader());
- try {
- if (ascending) {
- AggregationExecutor.aggregateOneSeries(
- new PartialPath(path),
- allSensors,
- context,
- timeFilter,
- dataType,
- results,
- null,
- new SlotTsFileFilter(nodeSlots));
- } else {
- AggregationExecutor.aggregateOneSeries(
- new PartialPath(path),
- allSensors,
- context,
- timeFilter,
- dataType,
- null,
- results,
- new SlotTsFileFilter(nodeSlots));
- }
- } catch (IllegalPathException e) {
- // ignore
+ if (ascending) {
+ AggregationExecutor.aggregateOneSeries(
+ path,
+ allSensors,
+ context,
+ timeFilter,
+ dataType,
+ results,
+ null,
+ new SlotTsFileFilter(nodeSlots));
+ } else {
+ AggregationExecutor.aggregateOneSeries(
+ path,
+ allSensors,
+ context,
+ timeFilter,
+ dataType,
+ null,
+ results,
+ new SlotTsFileFilter(nodeSlots));
}
return results;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index 6c33f50..a14b0cf 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -133,7 +133,7 @@ public class ClusterAggregator {
* @param timeFilter nullable
*/
private List<AggregateResult> getAggregateResult(
- Path path,
+ PartialPath path,
Set<String> deviceMeasurements,
List<String> aggregations,
TSDataType dataType,
@@ -165,13 +165,7 @@ public class ClusterAggregator {
partitionGroup.getHeader());
List<AggregateResult> aggrResult =
localQueryExecutor.getAggrResult(
- aggregations,
- deviceMeasurements,
- dataType,
- path.getFullPath(),
- timeFilter,
- context,
- ascending);
+ aggregations, deviceMeasurements, dataType, path, timeFilter,
context, ascending);
logger.debug(
"{}: queried aggregation {} of {} in {} locally are {}",
metaGroupMember.getName(),
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index 4655c61..8b1441b 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -18,6 +18,17 @@
*/
package org.apache.iotdb.db.sql;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
import org.junit.Assert;
import org.junit.Test;
@@ -25,8 +36,8 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Stream;
public abstract class Cases {
@@ -34,6 +45,7 @@ public abstract class Cases {
protected Connection writeConnection;
protected Statement[] readStatements;
protected Connection[] readConnections;
+ protected Session session;
/** initialize the writeStatement,writeConnection, readStatements and the
readConnections. */
public abstract void init() throws Exception;
@@ -47,6 +59,7 @@ public abstract class Cases {
for (Connection connection : readConnections) {
connection.close();
}
+ session.close();
}
// if we seperate the test into multiply test() methods, then the docker
container have to be
@@ -146,4 +159,65 @@ public abstract class Cases {
resultSet.close();
}
}
+
+ @Test
+ public void vectorCountTest() throws IoTDBConnectionException,
StatementExecutionException {
+ List<List<String>> measurementList = new ArrayList<>();
+ List<List<TSEncoding>> encodingList = new ArrayList<>();
+ List<List<TSDataType>> dataTypeList = new ArrayList<>();
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ String[] vectorMeasurements = new String[10];
+
+ Stream.iterate(0, i -> i + 1)
+ .limit(10)
+ .forEach(
+ i -> {
+ dataTypes.add(TSDataType.DOUBLE);
+ vectorMeasurements[i] = "vm" + i;
+ encodings.add(TSEncoding.RLE);
+ compressionTypes.add(CompressionType.SNAPPY);
+ });
+ encodingList.add(encodings);
+ dataTypeList.add(dataTypes);
+ measurementList.add(Arrays.asList(vectorMeasurements));
+
+ session.createDeviceTemplate(
+ "testcontainer", measurementList, dataTypeList, encodingList,
compressionTypes);
+ session.setStorageGroup("root.template");
+ session.setDeviceTemplate("testcontainer", "root.template");
+
+ VectorMeasurementSchema vectorMeasurementSchema =
+ new VectorMeasurementSchema(vectorMeasurements, dataTypes.toArray(new
TSDataType[0]));
+
+ Tablet tablet = new Tablet("root.template.device1",
Arrays.asList(vectorMeasurementSchema));
+ for (int i = 0; i < 10; i++) {
+ tablet.addTimestamp(i, i);
+ for (int j = 0; j < 10; j++) {
+ tablet.addValue("vm" + j, i, (double) i);
+ tablet.rowSize++;
+ }
+ }
+ session.insertTablet(tablet);
+
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement("select count(*) from
root.template.device1");
+ Assert.assertTrue(sessionDataSet.hasNext());
+ RowRecord next = sessionDataSet.next();
+ Assert.assertEquals(10, next.getFields().get(0).getLongV());
+
+ sessionDataSet = session.executeQueryStatement("select count(vm1) from
root.template.device1");
+ Assert.assertTrue(sessionDataSet.hasNext());
+ next = sessionDataSet.next();
+ Assert.assertEquals(10, next.getFields().get(0).getLongV());
+
+ sessionDataSet =
+ session.executeQueryStatement("select count(vm1),count(vm2) from
root.template.device1");
+ Assert.assertTrue(sessionDataSet.hasNext());
+ next = sessionDataSet.next();
+ Assert.assertEquals(2, next.getFields().size());
+ Assert.assertEquals(10, next.getFields().get(0).getLongV());
+ Assert.assertEquals(10, next.getFields().get(1).getLongV());
+ }
}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
index b8b07b1..62541df 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.sql;
import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.session.Session;
import org.junit.After;
import org.junit.Before;
@@ -81,6 +82,8 @@ public abstract class ClusterIT extends Cases {
"jdbc:iotdb://" + readIps[i] + ":" + readPorts[i], "root",
"root");
readStatements[i] = readConnections[i].createStatement();
}
+ session = new Session(getWriteRpcIp(), getWriteRpcPort());
+ session.open();
}
@After
diff --git
a/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
b/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
index 8e7bc4c..0f97563 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.sql;
import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.session.Session;
import org.junit.*;
import org.slf4j.Logger;
@@ -69,6 +70,8 @@ public class SingleNodeIT extends Cases {
readConnections[0] =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:" + rpcPort,
"root", "root");
writeStatement = readStatements[0] = writeConnection.createStatement();
+ session = new Session("127.0.0.1", rpcPort);
+ session.open();
}
@After