This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1dcc82a [IOTDB-1764] Support vector timeseries in raw data query in
cluster (#4182)
1dcc82a is described below
commit 1dcc82aad34bfc0820ac28f6a2e70757fef7d219
Author: Xiangwei Wei <[email protected]>
AuthorDate: Thu Oct 21 13:24:15 2021 +0800
[IOTDB-1764] Support vector timeseries in raw data query in cluster (#4182)
---
.../apache/iotdb/cluster/metadata/CMManager.java | 71 ++++++++--------------
.../iotdb/cluster/query/LocalQueryExecutor.java | 39 ++----------
.../cluster/query/reader/ClusterReaderFactory.java | 24 ++------
.../iotdb/cluster/utils/ClusterQueryUtils.java | 43 +++++++++++++
.../cluster/client/sync/SyncClientAdaptorTest.java | 14 +++--
.../cluster/server/member/DataGroupMemberTest.java | 11 ++--
thrift-cluster/src/main/thrift/cluster.thrift | 6 +-
7 files changed, 96 insertions(+), 112 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index bca7810..ecd9736 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -117,6 +117,8 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.cluster.query.ClusterPlanExecutor.LOG_FAIL_CONNECT;
import static
org.apache.iotdb.cluster.query.ClusterPlanExecutor.THREAD_POOL_SIZE;
import static
org.apache.iotdb.cluster.query.ClusterPlanExecutor.waitForThreadPool;
+import static
org.apache.iotdb.cluster.utils.ClusterQueryUtils.getAssembledPathFromRequest;
+import static
org.apache.iotdb.cluster.utils.ClusterQueryUtils.getPathStrListForRequest;
import static
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
@SuppressWarnings("java:S1135") // ignore todos
@@ -229,18 +231,16 @@ public class CMManager extends MManager {
MeasurementMNode.getMeasurementMNode(
null, measurementSchema.getMeasurementId(), measurementSchema,
null);
if (measurementSchema instanceof VectorMeasurementSchema) {
- for (int i = 0; i <
measurementSchema.getSubMeasurementsList().size(); i++) {
+ for (String subMeasurement :
measurementSchema.getSubMeasurementsList()) {
cacheMeta(
- ((VectorPartialPath) fullPath).getPathWithSubSensor(i),
measurementMNode, false);
+ new VectorPartialPath(fullPath.getDevice(), subMeasurement),
+ measurementMNode,
+ false);
}
- cacheMeta(
- new PartialPath(fullPath.getDevice(),
measurementSchema.getMeasurementId()),
- measurementMNode,
- true);
} else {
cacheMeta(fullPath, measurementMNode, true);
}
- return measurementMNode.getDataType(fullPath.getMeasurement());
+ return measurementMNode.getDataType(measurement);
} else {
throw e;
}
@@ -974,7 +974,7 @@ public class CMManager extends MManager {
throws MetadataException {
List<PartialPath> result = new ArrayList<>();
// split the paths by the data group they belong to
- Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
+ Map<PartitionGroup, List<String>> remoteGroupPathMap = new HashMap<>();
for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
String storageGroupName = sgPathEntry.getKey();
PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
@@ -1000,14 +1000,15 @@ public class CMManager extends MManager {
result.addAll(allTimeseriesName);
} else {
// batch the queries of the same group to reduce communication
- groupPathMap
+ remoteGroupPathMap
.computeIfAbsent(partitionGroup, p -> new ArrayList<>())
.add(pathUnderSG.getFullPath());
}
}
// query each data group separately
- for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry :
groupPathMap.entrySet()) {
+ for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry :
+ remoteGroupPathMap.entrySet()) {
PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
List<String> pathsToQuery = partitionGroupPathEntry.getValue();
result.addAll(getMatchedPaths(partitionGroup, pathsToQuery, withAlias));
@@ -1090,14 +1091,10 @@ public class CMManager extends MManager {
// need to query other nodes in the group
List<PartialPath> partialPaths = new ArrayList<>();
for (int i = 0; i < result.paths.size(); i++) {
- try {
- PartialPath partialPath = new PartialPath(result.paths.get(i));
- if (withAlias) {
- partialPath.setMeasurementAlias(result.aliasList.get(i));
- }
- partialPaths.add(partialPath);
- } catch (IllegalPathException e) {
- // ignore
+ PartialPath matchedPath =
getAssembledPathFromRequest(result.paths.get(i));
+ partialPaths.add(matchedPath);
+ if (withAlias) {
+ matchedPath.setMeasurementAlias(result.aliasList.get(i));
}
}
return partialPaths;
@@ -1298,21 +1295,6 @@ public class CMManager extends MManager {
}
/**
- * Get the local paths that match any path in "paths". The result is not
deduplicated.
- *
- * @param paths paths potentially contain wildcards
- */
- public List<String> getAllPaths(List<String> paths) throws MetadataException
{
- List<String> ret = new ArrayList<>();
- for (String path : paths) {
- getFlatMeasurementPaths(new PartialPath(path)).stream()
- .map(PartialPath::getFullPath)
- .forEach(ret::add);
- }
- return ret;
- }
-
- /**
* Get the local devices that match any path in "paths". The result is
deduplicated.
*
* @param paths paths potentially contain wildcards
@@ -1712,23 +1694,18 @@ public class CMManager extends MManager {
public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias)
throws MetadataException {
- List<String> retPaths = new ArrayList<>();
- List<String> alias = null;
- if (withAlias) {
- alias = new ArrayList<>();
- }
-
- if (withAlias) {
- for (String path : paths) {
- List<PartialPath> allTimeseriesPathWithAlias =
- super.getFlatMeasurementPathsWithAlias(new PartialPath(path), -1,
-1).left;
- for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias)
{
- retPaths.add(timeseriesPathWithAlias.getFullPath());
+ List<List<String>> retPaths = new ArrayList<>();
+ List<String> alias = withAlias ? new ArrayList<>() : null;
+
+ for (String path : paths) {
+ List<PartialPath> allTimeseriesPathWithAlias =
+ super.getFlatMeasurementPathsWithAlias(new PartialPath(path), -1,
-1).left;
+ for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) {
+ retPaths.add(getPathStrListForRequest(timeseriesPathWithAlias));
+ if (withAlias) {
alias.add(timeseriesPathWithAlias.getMeasurementAlias());
}
}
- } else {
- retPaths = getAllPaths(paths);
}
GetAllPathsResult getAllPathsResult = new GetAllPathsResult();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 7300827..880866c 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -47,7 +47,6 @@ import
org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
@@ -93,6 +92,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.iotdb.cluster.utils.ClusterQueryUtils.getAssembledPathFromRequest;
+
public class LocalQueryExecutor {
private static final Logger logger =
LoggerFactory.getLogger(LocalQueryExecutor.class);
@@ -217,12 +218,7 @@ public class LocalQueryExecutor {
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);
- PartialPath path = null;
- try {
- path = new PartialPath(request.getPath());
- } catch (IllegalPathException e) {
- // ignore
- }
+ PartialPath path = getAssembledPathFromRequest(request.getPath());
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
Filter timeFilter = null;
Filter valueFilter = null;
@@ -301,25 +297,7 @@ public class LocalQueryExecutor {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
List<PartialPath> paths = Lists.newArrayList();
- request
- .getPath()
- .forEach(
- fullPath -> {
- try {
- if (fullPath.contains("$#$")) {
- String[] array = fullPath.split(":");
- List<String> subSensorsList = new ArrayList<>();
- for (int i = 1; i < array.length; i++) {
- subSensorsList.add(array[i]);
- }
- paths.add(new VectorPartialPath(array[0], subSensorsList));
- } else {
- paths.add(new PartialPath(fullPath));
- }
- } catch (IllegalPathException e) {
- logger.warn("Failed to create partial path, fullPath is {}.",
fullPath, e);
- }
- });
+ request.getPath().forEach(path ->
paths.add(getAssembledPathFromRequest(path)));
List<TSDataType> dataTypes = Lists.newArrayList();
request.getDataTypeOrdinal().forEach(dataType ->
dataTypes.add(TSDataType.values()[dataType]));
@@ -549,8 +527,6 @@ public class LocalQueryExecutor {
* Create an IReaderByTime of a path, register it in the query manager to
get a reader id for it
* and send the id back to the requester. If the reader does not have any
data, an id of -1 will
* be returned.
- *
- * @param request
*/
public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request)
throws CheckConsistencyException, QueryProcessException,
StorageEngineException {
@@ -562,12 +538,7 @@ public class LocalQueryExecutor {
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);
- PartialPath path = null;
- try {
- path = new PartialPath(request.getPath());
- } catch (IllegalPathException e) {
- // ignore
- }
+ PartialPath path = getAssembledPathFromRequest(request.getPath());
TSDataType dataType = TSDataType.values()[request.dataTypeOrdinal];
Set<String> deviceMeasurements = request.getDeviceMeasurements();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index d692830..1de13a2 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -52,7 +52,6 @@ import
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -91,6 +90,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import static
org.apache.iotdb.cluster.utils.ClusterQueryUtils.getPathStrListForRequest;
+
@SuppressWarnings("java:S107")
public class ClusterReaderFactory {
@@ -704,23 +705,8 @@ public class ClusterReaderFactory {
request.setValueFilterBytes(SerializeUtils.serializeFilter(valueFilter));
}
- List<String> fullPaths = Lists.newArrayList();
- paths.forEach(
- path -> {
- if (path instanceof VectorPartialPath) {
- StringBuilder builder = new StringBuilder(path.getFullPath());
- List<String> subSensorsList = ((VectorPartialPath)
path).getSubSensorsList();
- for (String subSensor : subSensorsList) {
- builder.append(":");
- builder.append(path.getFullPath());
- builder.append(".");
- builder.append(subSensor);
- }
- fullPaths.add(builder.toString());
- } else {
- fullPaths.add(path.getFullPath());
- }
- });
+ List<List<String>> fullPaths = Lists.newArrayList();
+ paths.forEach(path -> fullPaths.add(getPathStrListForRequest(path)));
List<Integer> dataTypeOrdinals = Lists.newArrayList();
dataTypes.forEach(dataType -> dataTypeOrdinals.add(dataType.ordinal()));
@@ -752,7 +738,7 @@ public class ClusterReaderFactory {
if (valueFilter != null) {
request.setValueFilterBytes(SerializeUtils.serializeFilter(valueFilter));
}
- request.setPath(path.getFullPath());
+ request.setPath(ClusterQueryUtils.getPathStrListForRequest(path));
request.setHeader(partitionGroup.getHeader());
request.setQueryId(context.getQueryId());
request.setRequester(metaGroupMember.getThisNode());
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
index 9b6b6c8..e6fc88f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
@@ -24,13 +24,21 @@ import
org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ClusterQueryUtils {
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterQueryUtils.class);
+
private ClusterQueryUtils() {
// util class
}
@@ -64,4 +72,39 @@ public class ClusterQueryUtils {
checkPathExistence(path);
}
}
+
+ /**
+ * Generate path string list for RPC request.
+ *
+ * <p>If vector path, return its vectorId with all subSensors. Else just
return path string.
+ */
+ public static List<String> getPathStrListForRequest(Path path) {
+ if (path instanceof VectorPartialPath) {
+ List<String> pathWithSubSensors =
+ new ArrayList<>(((VectorPartialPath)
path).getSubSensorsList().size() + 1);
+ pathWithSubSensors.add(path.getFullPath());
+ pathWithSubSensors.addAll(((VectorPartialPath)
path).getSubSensorsList());
+ return pathWithSubSensors;
+ } else {
+ return Collections.singletonList(path.getFullPath());
+ }
+ }
+
+ /**
+ * Deserialize an assembled Path from path string list that's from RPC
request.
+ *
+ * <p>This method is corresponding to getPathStringListForRequest().
+ */
+ public static PartialPath getAssembledPathFromRequest(List<String>
pathString) {
+ try {
+ if (pathString.size() == 1) {
+ return new PartialPath(pathString.get(0));
+ } else {
+ return new VectorPartialPath(pathString.get(0), pathString.subList(1,
pathString.size()));
+ }
+ } catch (IllegalPathException e) {
+ logger.error("Failed to create partial path, fullPath is {}.",
pathString, e);
+ return null;
+ }
+ }
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 7c11fce..ca8c677 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -245,7 +245,11 @@ public class SyncClientAdaptorTest {
List<String> path,
boolean withAlias,
AsyncMethodCallback<GetAllPathsResult> resultHandler) {
- resultHandler.onComplete(new GetAllPathsResult(path));
+ List<List<String>> pathString = new ArrayList<>();
+ for (String s : path) {
+ pathString.add(Collections.singletonList(s));
+ }
+ resultHandler.onComplete(new GetAllPathsResult(pathString));
}
@Override
@@ -391,9 +395,11 @@ public class SyncClientAdaptorTest {
paths.subList(0, paths.size() / 2),
SyncClientAdaptor.getUnregisteredMeasurements(
dataClient, TestUtils.getRaftNode(0, 0), paths));
- assertEquals(
- paths,
- SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getRaftNode(0, 0),
paths, false).paths);
+ List<String> result = new ArrayList<>();
+ SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getRaftNode(0, 0),
paths, false)
+ .paths
+ .forEach(p -> result.add(p.get(0)));
+ assertEquals(paths, result);
assertEquals(
paths.size(),
(int) SyncClientAdaptor.getPathCount(dataClient,
TestUtils.getRaftNode(0, 0), paths, 0));
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index a925ada..f018bfc 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -740,7 +740,7 @@ public class DataGroupMemberTest extends BaseMember {
partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10),
raftId)));
dataGroupMember.setCharacter(NodeCharacter.LEADER);
SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
- request.setPath(TestUtils.getTestSeries(0, 0));
+ request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(1));
request.setQueryId(0);
@@ -808,7 +808,7 @@ public class DataGroupMemberTest extends BaseMember {
partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10),
raftId)));
dataGroupMember.setCharacter(NodeCharacter.LEADER);
SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
- request.setPath(TestUtils.getTestSeries(0, 0));
+ request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(1));
request.setQueryId(0);
@@ -876,7 +876,7 @@ public class DataGroupMemberTest extends BaseMember {
partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 0)));
dataGroupMember.setCharacter(NodeCharacter.LEADER);
SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
- request.setPath(TestUtils.getTestSeries(0, 0));
+ request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(1));
request.setQueryId(0);
@@ -944,7 +944,7 @@ public class DataGroupMemberTest extends BaseMember {
partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 0)));
dataGroupMember.setCharacter(NodeCharacter.LEADER);
SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
- request.setPath(TestUtils.getTestSeries(0, 0));
+ request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(10));
request.setQueryId(0);
@@ -996,7 +996,8 @@ public class DataGroupMemberTest extends BaseMember {
new DataAsyncService(dataGroupMember)
.getAllPaths(
TestUtils.getRaftNode(0, raftId), Collections.singletonList(path),
false, handler);
- List<String> result = pathResult.get().paths;
+ List<String> result = new ArrayList<>();
+ pathResult.get().paths.forEach(p -> result.add(p.get(0)));
assertEquals(20, result.size());
for (int i = 0; i < 10; i++) {
assertTrue(result.contains(TestUtils.getTestSeries(0, i)));
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift
b/thrift-cluster/src/main/thrift/cluster.thrift
index 1cba433..05459ae 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -184,7 +184,7 @@ struct PullSchemaResp {
}
struct SingleSeriesQueryRequest {
- 1: required string path
+ 1: required list<string> path
2: optional binary timeFilterBytes
3: optional binary valueFilterBytes
4: required long queryId
@@ -199,7 +199,7 @@ struct SingleSeriesQueryRequest {
}
struct MultSeriesQueryRequest {
- 1: required list<string> path
+ 1: required list<list<string>> path
2: optional binary timeFilterBytes
3: optional binary valueFilterBytes
4: required long queryId
@@ -263,7 +263,7 @@ struct LastQueryRequest {
}
struct GetAllPathsResult {
- 1: required list<string> paths
+ 1: required list<list<string>> paths
2: optional list<string> aliasList
}