This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 98ba544  Modify the getAggrResult method, don't destroy polymorphism 
too early (#3209)
98ba544 is described below

commit 98ba544f3ac69ae9942f66c7c5ccce0923f3b56e
Author: Dawei Liu <[email protected]>
AuthorDate: Mon May 24 20:12:48 2021 +0800

    Modify the getAggrResult method, don't destroy polymorphism too early 
(#3209)
    
    * Modify the getAggrResult method, don't destroy polymorphism too early
    
    * spotless
    
    * add vector count test to testcontainer
    
    * add more test
---
 .../iotdb/cluster/query/LocalQueryExecutor.java    | 58 ++++++++--------
 .../cluster/query/aggregate/ClusterAggregator.java | 10 +--
 .../test/java/org/apache/iotdb/db/sql/Cases.java   | 78 +++++++++++++++++++++-
 .../java/org/apache/iotdb/db/sql/ClusterIT.java    |  3 +
 .../java/org/apache/iotdb/db/sql/SingleNodeIT.java |  3 +
 5 files changed, 116 insertions(+), 36 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 0731ce1..dcd00c8 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -592,7 +592,17 @@ public class LocalQueryExecutor {
 
     List<String> aggregations = request.getAggregations();
     TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
-    String path = request.getPath();
+    PartialPath path = null;
+    try {
+      path = new PartialPath(request.getPath());
+    } catch (IllegalPathException e) {
+      logger.error(
+          "{}: aggregation has error path: {}, queryId: {}",
+          name,
+          request.getPath(),
+          request.getQueryId());
+      throw new QueryProcessException(e);
+    }
     Filter timeFilter = null;
     if (request.isSetTimeFilterBytes()) {
       timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
@@ -643,7 +653,7 @@ public class LocalQueryExecutor {
       List<String> aggregations,
       Set<String> allSensors,
       TSDataType dataType,
-      String path,
+      PartialPath path,
       Filter timeFilter,
       QueryContext context,
       boolean ascending)
@@ -662,30 +672,26 @@ public class LocalQueryExecutor {
     List<Integer> nodeSlots =
         ((SlotPartitionTable) 
dataGroupMember.getMetaGroupMember().getPartitionTable())
             .getNodeSlots(dataGroupMember.getHeader());
-    try {
-      if (ascending) {
-        AggregationExecutor.aggregateOneSeries(
-            new PartialPath(path),
-            allSensors,
-            context,
-            timeFilter,
-            dataType,
-            results,
-            null,
-            new SlotTsFileFilter(nodeSlots));
-      } else {
-        AggregationExecutor.aggregateOneSeries(
-            new PartialPath(path),
-            allSensors,
-            context,
-            timeFilter,
-            dataType,
-            null,
-            results,
-            new SlotTsFileFilter(nodeSlots));
-      }
-    } catch (IllegalPathException e) {
-      // ignore
+    if (ascending) {
+      AggregationExecutor.aggregateOneSeries(
+          path,
+          allSensors,
+          context,
+          timeFilter,
+          dataType,
+          results,
+          null,
+          new SlotTsFileFilter(nodeSlots));
+    } else {
+      AggregationExecutor.aggregateOneSeries(
+          path,
+          allSensors,
+          context,
+          timeFilter,
+          dataType,
+          null,
+          results,
+          new SlotTsFileFilter(nodeSlots));
     }
     return results;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index 6c33f50..a14b0cf 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -133,7 +133,7 @@ public class ClusterAggregator {
    * @param timeFilter nullable
    */
   private List<AggregateResult> getAggregateResult(
-      Path path,
+      PartialPath path,
       Set<String> deviceMeasurements,
       List<String> aggregations,
       TSDataType dataType,
@@ -165,13 +165,7 @@ public class ClusterAggregator {
             partitionGroup.getHeader());
         List<AggregateResult> aggrResult =
             localQueryExecutor.getAggrResult(
-                aggregations,
-                deviceMeasurements,
-                dataType,
-                path.getFullPath(),
-                timeFilter,
-                context,
-                ascending);
+                aggregations, deviceMeasurements, dataType, path, timeFilter, 
context, ascending);
         logger.debug(
             "{}: queried aggregation {} of {} in {} locally are {}",
             metaGroupMember.getName(),
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java 
b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index 4655c61..8b1441b 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -18,6 +18,17 @@
  */
 package org.apache.iotdb.db.sql;
 
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -25,8 +36,8 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Stream;
 
 public abstract class Cases {
 
@@ -34,6 +45,7 @@ public abstract class Cases {
   protected Connection writeConnection;
   protected Statement[] readStatements;
   protected Connection[] readConnections;
+  protected Session session;
 
   /** initialize the writeStatement,writeConnection, readStatements and the 
readConnections. */
   public abstract void init() throws Exception;
@@ -47,6 +59,7 @@ public abstract class Cases {
     for (Connection connection : readConnections) {
       connection.close();
     }
+    session.close();
   }
 
   // if we seperate the test into multiply test() methods, then the docker 
container have to be
@@ -146,4 +159,65 @@ public abstract class Cases {
       resultSet.close();
     }
   }
+
+  @Test
+  public void vectorCountTest() throws IoTDBConnectionException, 
StatementExecutionException {
+    List<List<String>> measurementList = new ArrayList<>();
+    List<List<TSEncoding>> encodingList = new ArrayList<>();
+    List<List<TSDataType>> dataTypeList = new ArrayList<>();
+    List<CompressionType> compressionTypes = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<TSEncoding> encodings = new ArrayList<>();
+    String[] vectorMeasurements = new String[10];
+
+    Stream.iterate(0, i -> i + 1)
+        .limit(10)
+        .forEach(
+            i -> {
+              dataTypes.add(TSDataType.DOUBLE);
+              vectorMeasurements[i] = "vm" + i;
+              encodings.add(TSEncoding.RLE);
+              compressionTypes.add(CompressionType.SNAPPY);
+            });
+    encodingList.add(encodings);
+    dataTypeList.add(dataTypes);
+    measurementList.add(Arrays.asList(vectorMeasurements));
+
+    session.createDeviceTemplate(
+        "testcontainer", measurementList, dataTypeList, encodingList, 
compressionTypes);
+    session.setStorageGroup("root.template");
+    session.setDeviceTemplate("testcontainer", "root.template");
+
+    VectorMeasurementSchema vectorMeasurementSchema =
+        new VectorMeasurementSchema(vectorMeasurements, dataTypes.toArray(new 
TSDataType[0]));
+
+    Tablet tablet = new Tablet("root.template.device1", 
Arrays.asList(vectorMeasurementSchema));
+    for (int i = 0; i < 10; i++) {
+      tablet.addTimestamp(i, i);
+      for (int j = 0; j < 10; j++) {
+        tablet.addValue("vm" + j, i, (double) i);
+        tablet.rowSize++;
+      }
+    }
+    session.insertTablet(tablet);
+
+    SessionDataSet sessionDataSet =
+        session.executeQueryStatement("select count(*) from 
root.template.device1");
+    Assert.assertTrue(sessionDataSet.hasNext());
+    RowRecord next = sessionDataSet.next();
+    Assert.assertEquals(10, next.getFields().get(0).getLongV());
+
+    sessionDataSet = session.executeQueryStatement("select count(vm1) from 
root.template.device1");
+    Assert.assertTrue(sessionDataSet.hasNext());
+    next = sessionDataSet.next();
+    Assert.assertEquals(10, next.getFields().get(0).getLongV());
+
+    sessionDataSet =
+        session.executeQueryStatement("select count(vm1),count(vm2) from 
root.template.device1");
+    Assert.assertTrue(sessionDataSet.hasNext());
+    next = sessionDataSet.next();
+    Assert.assertEquals(2, next.getFields().size());
+    Assert.assertEquals(10, next.getFields().get(0).getLongV());
+    Assert.assertEquals(10, next.getFields().get(1).getLongV());
+  }
 }
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java 
b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
index b8b07b1..62541df 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.sql;
 
 import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.session.Session;
 
 import org.junit.After;
 import org.junit.Before;
@@ -81,6 +82,8 @@ public abstract class ClusterIT extends Cases {
               "jdbc:iotdb://" + readIps[i] + ":" + readPorts[i], "root", 
"root");
       readStatements[i] = readConnections[i].createStatement();
     }
+    session = new Session(getWriteRpcIp(), getWriteRpcPort());
+    session.open();
   }
 
   @After
diff --git 
a/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java 
b/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
index 8e7bc4c..0f97563 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.sql;
 
 import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.session.Session;
 
 import org.junit.*;
 import org.slf4j.Logger;
@@ -69,6 +70,8 @@ public class SingleNodeIT extends Cases {
         readConnections[0] =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:" + rpcPort, 
"root", "root");
     writeStatement = readStatements[0] = writeConnection.createStatement();
+    session = new Session("127.0.0.1", rpcPort);
+    session.open();
   }
 
   @After

Reply via email to