This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dcc82a  [IOTDB-1764] Support vector timeseries in raw data query in 
cluster (#4182)
1dcc82a is described below

commit 1dcc82aad34bfc0820ac28f6a2e70757fef7d219
Author: Xiangwei Wei <[email protected]>
AuthorDate: Thu Oct 21 13:24:15 2021 +0800

    [IOTDB-1764] Support vector timeseries in raw data query in cluster (#4182)
---
 .../apache/iotdb/cluster/metadata/CMManager.java   | 71 ++++++++--------------
 .../iotdb/cluster/query/LocalQueryExecutor.java    | 39 ++----------
 .../cluster/query/reader/ClusterReaderFactory.java | 24 ++------
 .../iotdb/cluster/utils/ClusterQueryUtils.java     | 43 +++++++++++++
 .../cluster/client/sync/SyncClientAdaptorTest.java | 14 +++--
 .../cluster/server/member/DataGroupMemberTest.java | 11 ++--
 thrift-cluster/src/main/thrift/cluster.thrift      |  6 +-
 7 files changed, 96 insertions(+), 112 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index bca7810..ecd9736 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -117,6 +117,8 @@ import java.util.stream.Collectors;
 import static 
org.apache.iotdb.cluster.query.ClusterPlanExecutor.LOG_FAIL_CONNECT;
 import static 
org.apache.iotdb.cluster.query.ClusterPlanExecutor.THREAD_POOL_SIZE;
 import static 
org.apache.iotdb.cluster.query.ClusterPlanExecutor.waitForThreadPool;
+import static 
org.apache.iotdb.cluster.utils.ClusterQueryUtils.getAssembledPathFromRequest;
+import static 
org.apache.iotdb.cluster.utils.ClusterQueryUtils.getPathStrListForRequest;
 import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
 
 @SuppressWarnings("java:S1135") // ignore todos
@@ -229,18 +231,16 @@ public class CMManager extends MManager {
             MeasurementMNode.getMeasurementMNode(
                 null, measurementSchema.getMeasurementId(), measurementSchema, 
null);
         if (measurementSchema instanceof VectorMeasurementSchema) {
-          for (int i = 0; i < 
measurementSchema.getSubMeasurementsList().size(); i++) {
+          for (String subMeasurement : 
measurementSchema.getSubMeasurementsList()) {
             cacheMeta(
-                ((VectorPartialPath) fullPath).getPathWithSubSensor(i), 
measurementMNode, false);
+                new VectorPartialPath(fullPath.getDevice(), subMeasurement),
+                measurementMNode,
+                false);
           }
-          cacheMeta(
-              new PartialPath(fullPath.getDevice(), 
measurementSchema.getMeasurementId()),
-              measurementMNode,
-              true);
         } else {
           cacheMeta(fullPath, measurementMNode, true);
         }
-        return measurementMNode.getDataType(fullPath.getMeasurement());
+        return measurementMNode.getDataType(measurement);
       } else {
         throw e;
       }
@@ -974,7 +974,7 @@ public class CMManager extends MManager {
       throws MetadataException {
     List<PartialPath> result = new ArrayList<>();
     // split the paths by the data group they belong to
-    Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
+    Map<PartitionGroup, List<String>> remoteGroupPathMap = new HashMap<>();
     for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
       String storageGroupName = sgPathEntry.getKey();
       PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
@@ -1000,14 +1000,15 @@ public class CMManager extends MManager {
         result.addAll(allTimeseriesName);
       } else {
         // batch the queries of the same group to reduce communication
-        groupPathMap
+        remoteGroupPathMap
             .computeIfAbsent(partitionGroup, p -> new ArrayList<>())
             .add(pathUnderSG.getFullPath());
       }
     }
 
     // query each data group separately
-    for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : 
groupPathMap.entrySet()) {
+    for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry :
+        remoteGroupPathMap.entrySet()) {
       PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
       List<String> pathsToQuery = partitionGroupPathEntry.getValue();
       result.addAll(getMatchedPaths(partitionGroup, pathsToQuery, withAlias));
@@ -1090,14 +1091,10 @@ public class CMManager extends MManager {
       // need to query other nodes in the group
       List<PartialPath> partialPaths = new ArrayList<>();
       for (int i = 0; i < result.paths.size(); i++) {
-        try {
-          PartialPath partialPath = new PartialPath(result.paths.get(i));
-          if (withAlias) {
-            partialPath.setMeasurementAlias(result.aliasList.get(i));
-          }
-          partialPaths.add(partialPath);
-        } catch (IllegalPathException e) {
-          // ignore
+        PartialPath matchedPath = 
getAssembledPathFromRequest(result.paths.get(i));
+        partialPaths.add(matchedPath);
+        if (withAlias) {
+          matchedPath.setMeasurementAlias(result.aliasList.get(i));
         }
       }
       return partialPaths;
@@ -1298,21 +1295,6 @@ public class CMManager extends MManager {
   }
 
   /**
-   * Get the local paths that match any path in "paths". The result is not 
deduplicated.
-   *
-   * @param paths paths potentially contain wildcards
-   */
-  public List<String> getAllPaths(List<String> paths) throws MetadataException 
{
-    List<String> ret = new ArrayList<>();
-    for (String path : paths) {
-      getFlatMeasurementPaths(new PartialPath(path)).stream()
-          .map(PartialPath::getFullPath)
-          .forEach(ret::add);
-    }
-    return ret;
-  }
-
-  /**
    * Get the local devices that match any path in "paths". The result is 
deduplicated.
    *
    * @param paths paths potentially contain wildcards
@@ -1712,23 +1694,18 @@ public class CMManager extends MManager {
 
   public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias)
       throws MetadataException {
-    List<String> retPaths = new ArrayList<>();
-    List<String> alias = null;
-    if (withAlias) {
-      alias = new ArrayList<>();
-    }
-
-    if (withAlias) {
-      for (String path : paths) {
-        List<PartialPath> allTimeseriesPathWithAlias =
-            super.getFlatMeasurementPathsWithAlias(new PartialPath(path), -1, 
-1).left;
-        for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) 
{
-          retPaths.add(timeseriesPathWithAlias.getFullPath());
+    List<List<String>> retPaths = new ArrayList<>();
+    List<String> alias = withAlias ? new ArrayList<>() : null;
+
+    for (String path : paths) {
+      List<PartialPath> allTimeseriesPathWithAlias =
+          super.getFlatMeasurementPathsWithAlias(new PartialPath(path), -1, 
-1).left;
+      for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) {
+        retPaths.add(getPathStrListForRequest(timeseriesPathWithAlias));
+        if (withAlias) {
           alias.add(timeseriesPathWithAlias.getMeasurementAlias());
         }
       }
-    } else {
-      retPaths = getAllPaths(paths);
     }
 
     GetAllPathsResult getAllPathsResult = new GetAllPathsResult();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 7300827..880866c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -47,7 +47,6 @@ import 
org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
@@ -93,6 +92,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.iotdb.cluster.utils.ClusterQueryUtils.getAssembledPathFromRequest;
+
 public class LocalQueryExecutor {
 
   private static final Logger logger = 
LoggerFactory.getLogger(LocalQueryExecutor.class);
@@ -217,12 +218,7 @@ public class LocalQueryExecutor {
         request.getQueryId());
     dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
-    PartialPath path = null;
-    try {
-      path = new PartialPath(request.getPath());
-    } catch (IllegalPathException e) {
-      // ignore
-    }
+    PartialPath path = getAssembledPathFromRequest(request.getPath());
     TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
     Filter timeFilter = null;
     Filter valueFilter = null;
@@ -301,25 +297,7 @@ public class LocalQueryExecutor {
     dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
     List<PartialPath> paths = Lists.newArrayList();
-    request
-        .getPath()
-        .forEach(
-            fullPath -> {
-              try {
-                if (fullPath.contains("$#$")) {
-                  String[] array = fullPath.split(":");
-                  List<String> subSensorsList = new ArrayList<>();
-                  for (int i = 1; i < array.length; i++) {
-                    subSensorsList.add(array[i]);
-                  }
-                  paths.add(new VectorPartialPath(array[0], subSensorsList));
-                } else {
-                  paths.add(new PartialPath(fullPath));
-                }
-              } catch (IllegalPathException e) {
-                logger.warn("Failed to create partial path, fullPath is {}.", 
fullPath, e);
-              }
-            });
+    request.getPath().forEach(path -> 
paths.add(getAssembledPathFromRequest(path)));
 
     List<TSDataType> dataTypes = Lists.newArrayList();
     request.getDataTypeOrdinal().forEach(dataType -> 
dataTypes.add(TSDataType.values()[dataType]));
@@ -549,8 +527,6 @@ public class LocalQueryExecutor {
    * Create an IReaderByTime of a path, register it in the query manager to 
get a reader id for it
    * and send the id back to the requester. If the reader does not have any 
data, an id of -1 will
    * be returned.
-   *
-   * @param request
    */
   public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request)
       throws CheckConsistencyException, QueryProcessException, 
StorageEngineException {
@@ -562,12 +538,7 @@ public class LocalQueryExecutor {
         request.getQueryId());
     dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
-    PartialPath path = null;
-    try {
-      path = new PartialPath(request.getPath());
-    } catch (IllegalPathException e) {
-      // ignore
-    }
+    PartialPath path = getAssembledPathFromRequest(request.getPath());
     TSDataType dataType = TSDataType.values()[request.dataTypeOrdinal];
     Set<String> deviceMeasurements = request.getDeviceMeasurements();
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index d692830..1de13a2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -52,7 +52,6 @@ import 
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -91,6 +90,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import static 
org.apache.iotdb.cluster.utils.ClusterQueryUtils.getPathStrListForRequest;
+
 @SuppressWarnings("java:S107")
 public class ClusterReaderFactory {
 
@@ -704,23 +705,8 @@ public class ClusterReaderFactory {
       request.setValueFilterBytes(SerializeUtils.serializeFilter(valueFilter));
     }
 
-    List<String> fullPaths = Lists.newArrayList();
-    paths.forEach(
-        path -> {
-          if (path instanceof VectorPartialPath) {
-            StringBuilder builder = new StringBuilder(path.getFullPath());
-            List<String> subSensorsList = ((VectorPartialPath) 
path).getSubSensorsList();
-            for (String subSensor : subSensorsList) {
-              builder.append(":");
-              builder.append(path.getFullPath());
-              builder.append(".");
-              builder.append(subSensor);
-            }
-            fullPaths.add(builder.toString());
-          } else {
-            fullPaths.add(path.getFullPath());
-          }
-        });
+    List<List<String>> fullPaths = Lists.newArrayList();
+    paths.forEach(path -> fullPaths.add(getPathStrListForRequest(path)));
 
     List<Integer> dataTypeOrdinals = Lists.newArrayList();
     dataTypes.forEach(dataType -> dataTypeOrdinals.add(dataType.ordinal()));
@@ -752,7 +738,7 @@ public class ClusterReaderFactory {
     if (valueFilter != null) {
       request.setValueFilterBytes(SerializeUtils.serializeFilter(valueFilter));
     }
-    request.setPath(path.getFullPath());
+    request.setPath(ClusterQueryUtils.getPathStrListForRequest(path));
     request.setHeader(partitionGroup.getHeader());
     request.setQueryId(context.getQueryId());
     request.setRequester(metaGroupMember.getThisNode());
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
index 9b6b6c8..e6fc88f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
@@ -24,13 +24,21 @@ import 
org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.read.common.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 public class ClusterQueryUtils {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(ClusterQueryUtils.class);
+
   private ClusterQueryUtils() {
     // util class
   }
@@ -64,4 +72,39 @@ public class ClusterQueryUtils {
       checkPathExistence(path);
     }
   }
+
+  /**
+   * Generate path string list for RPC request.
+   *
+   * <p>If vector path, return its vectorId with all subSensors. Else just 
return path string.
+   */
+  public static List<String> getPathStrListForRequest(Path path) {
+    if (path instanceof VectorPartialPath) {
+      List<String> pathWithSubSensors =
+          new ArrayList<>(((VectorPartialPath) 
path).getSubSensorsList().size() + 1);
+      pathWithSubSensors.add(path.getFullPath());
+      pathWithSubSensors.addAll(((VectorPartialPath) 
path).getSubSensorsList());
+      return pathWithSubSensors;
+    } else {
+      return Collections.singletonList(path.getFullPath());
+    }
+  }
+
+  /**
+   * Deserialize an assembled Path from path string list that's from RPC 
request.
+   *
+   * <p>This method is corresponding to getPathStringListForRequest().
+   */
+  public static PartialPath getAssembledPathFromRequest(List<String> 
pathString) {
+    try {
+      if (pathString.size() == 1) {
+        return new PartialPath(pathString.get(0));
+      } else {
+        return new VectorPartialPath(pathString.get(0), pathString.subList(1, 
pathString.size()));
+      }
+    } catch (IllegalPathException e) {
+      logger.error("Failed to create partial path, fullPath is {}.", 
pathString, e);
+      return null;
+    }
+  }
 }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 7c11fce..ca8c677 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -245,7 +245,11 @@ public class SyncClientAdaptorTest {
               List<String> path,
               boolean withAlias,
               AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-            resultHandler.onComplete(new GetAllPathsResult(path));
+            List<List<String>> pathString = new ArrayList<>();
+            for (String s : path) {
+              pathString.add(Collections.singletonList(s));
+            }
+            resultHandler.onComplete(new GetAllPathsResult(pathString));
           }
 
           @Override
@@ -391,9 +395,11 @@ public class SyncClientAdaptorTest {
         paths.subList(0, paths.size() / 2),
         SyncClientAdaptor.getUnregisteredMeasurements(
             dataClient, TestUtils.getRaftNode(0, 0), paths));
-    assertEquals(
-        paths,
-        SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getRaftNode(0, 0), 
paths, false).paths);
+    List<String> result = new ArrayList<>();
+    SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getRaftNode(0, 0), 
paths, false)
+        .paths
+        .forEach(p -> result.add(p.get(0)));
+    assertEquals(paths, result);
     assertEquals(
         paths.size(),
         (int) SyncClientAdaptor.getPathCount(dataClient, 
TestUtils.getRaftNode(0, 0), paths, 0));
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index a925ada..f018bfc 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -740,7 +740,7 @@ public class DataGroupMemberTest extends BaseMember {
         partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 
raftId)));
     dataGroupMember.setCharacter(NodeCharacter.LEADER);
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
-    request.setPath(TestUtils.getTestSeries(0, 0));
+    request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
     request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
     request.setRequester(TestUtils.getNode(1));
     request.setQueryId(0);
@@ -808,7 +808,7 @@ public class DataGroupMemberTest extends BaseMember {
         partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 
raftId)));
     dataGroupMember.setCharacter(NodeCharacter.LEADER);
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
-    request.setPath(TestUtils.getTestSeries(0, 0));
+    request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
     request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
     request.setRequester(TestUtils.getNode(1));
     request.setQueryId(0);
@@ -876,7 +876,7 @@ public class DataGroupMemberTest extends BaseMember {
         partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 0)));
     dataGroupMember.setCharacter(NodeCharacter.LEADER);
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
-    request.setPath(TestUtils.getTestSeries(0, 0));
+    request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
     request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
     request.setRequester(TestUtils.getNode(1));
     request.setQueryId(0);
@@ -944,7 +944,7 @@ public class DataGroupMemberTest extends BaseMember {
         partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 0)));
     dataGroupMember.setCharacter(NodeCharacter.LEADER);
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
-    request.setPath(TestUtils.getTestSeries(0, 0));
+    request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
     request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
     request.setRequester(TestUtils.getNode(10));
     request.setQueryId(0);
@@ -996,7 +996,8 @@ public class DataGroupMemberTest extends BaseMember {
     new DataAsyncService(dataGroupMember)
         .getAllPaths(
             TestUtils.getRaftNode(0, raftId), Collections.singletonList(path), 
false, handler);
-    List<String> result = pathResult.get().paths;
+    List<String> result = new ArrayList<>();
+    pathResult.get().paths.forEach(p -> result.add(p.get(0)));
     assertEquals(20, result.size());
     for (int i = 0; i < 10; i++) {
       assertTrue(result.contains(TestUtils.getTestSeries(0, i)));
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift 
b/thrift-cluster/src/main/thrift/cluster.thrift
index 1cba433..05459ae 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -184,7 +184,7 @@ struct PullSchemaResp {
 }
 
 struct SingleSeriesQueryRequest {
-  1: required string path
+  1: required list<string> path
   2: optional binary timeFilterBytes
   3: optional binary valueFilterBytes
   4: required long queryId
@@ -199,7 +199,7 @@ struct SingleSeriesQueryRequest {
 }
 
 struct MultSeriesQueryRequest {
-  1: required list<string> path
+  1: required list<list<string>> path
   2: optional binary timeFilterBytes
   3: optional binary valueFilterBytes
   4: required long queryId
@@ -263,7 +263,7 @@ struct LastQueryRequest {
 }
 
 struct GetAllPathsResult {
-  1: required list<string> paths
+  1: required list<list<string>> paths
   2: optional list<string> aliasList
 }
 

Reply via email to