This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch clusterVector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b846b50ed1d028727e764ddb2c66a92516787ae0 Author: Alima777 <[email protected]> AuthorDate: Mon Oct 18 15:35:12 2021 +0800 implement raw vector query with value filter --- .../iotdb/cluster/query/LocalQueryExecutor.java | 28 +++++++++++----------- .../cluster/query/reader/ClusterReaderFactory.java | 15 ++++++++++-- .../cluster/server/member/DataGroupMemberTest.java | 8 +++---- thrift-cluster/src/main/thrift/cluster.thrift | 2 +- 4 files changed, 32 insertions(+), 21 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java index 0bba460..b217cb6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java @@ -217,12 +217,7 @@ public class LocalQueryExecutor { request.getQueryId()); dataGroupMember.syncLeaderWithConsistencyCheck(false); - PartialPath path = null; - try { - path = new PartialPath(request.getPath()); - } catch (IllegalPathException e) { - // ignore - } + PartialPath path = getPathFromRequest(request.getPath()); TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()]; Filter timeFilter = null; Filter valueFilter = null; @@ -283,6 +278,18 @@ public class LocalQueryExecutor { } } + private PartialPath getPathFromRequest(List<String> pathString) throws QueryProcessException { + try { + if (pathString.size() == 1) { + return new PartialPath(pathString.get(0)); + } else { + return new VectorPartialPath(pathString.get(0), pathString.subList(1, pathString.size())); + } + } catch (IllegalPathException e) { + throw new QueryProcessException(e.getMessage()); + } + } + /** * Create an IBatchReader of a path, register it in the query manager to get a reader id for it * and send the id back to the requester. If the reader does not have any data, an id of -1 will @@ -545,8 +552,6 @@ public class LocalQueryExecutor { * Create an IReaderByTime of a path, register it in the query manager to get a reader id for it * and send the id back to the requester. If the reader does not have any data, an id of -1 will * be returned. - * - * @param request */ public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) throws CheckConsistencyException, QueryProcessException, StorageEngineException { @@ -558,12 +563,7 @@ public class LocalQueryExecutor { request.getQueryId()); dataGroupMember.syncLeaderWithConsistencyCheck(false); - PartialPath path = null; - try { - path = new PartialPath(request.getPath()); - } catch (IllegalPathException e) { - // ignore - } + PartialPath path = getPathFromRequest(request.getPath()); TSDataType dataType = TSDataType.values()[request.dataTypeOrdinal]; Set<String> deviceMeasurements = request.getDeviceMeasurements(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index ecb0f29..2ef44ff 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -748,8 +748,7 @@ public class ClusterReaderFactory { if (valueFilter != null) { request.setValueFilterBytes(SerializeUtils.serializeFilter(valueFilter)); } - System.out.println("Before RPC: " + path); - request.setPath(path.getFullPath()); + request.setPath(getPathStringList(path)); request.setHeader(partitionGroup.getHeader()); request.setQueryId(context.getQueryId()); request.setRequester(metaGroupMember.getThisNode()); @@ -760,6 +759,18 @@ public class ClusterReaderFactory { return request; } + /** If vector path, return vectorId with all subSensors. Else just return path string. */ + private List<String> getPathStringList(Path path) { + if (path instanceof VectorPartialPath) { + List<String> pathWithSubSensors = new ArrayList<>(); + pathWithSubSensors.add(path.getFullPath()); + pathWithSubSensors.addAll(((VectorPartialPath) path).getSubSensorsList()); + return pathWithSubSensors; + } else { + return Collections.singletonList(path.getFullPath()); + } + } + /** * Get GroupByExecutors the will executor the aggregations of "aggregationTypes" over "path". * First, the groups to be queried will be determined by the timeFilter. Then for group, a local diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java index a925ada..0091251 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java @@ -740,7 +740,7 @@ public class DataGroupMemberTest extends BaseMember { partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), raftId))); dataGroupMember.setCharacter(NodeCharacter.LEADER); SingleSeriesQueryRequest request = new SingleSeriesQueryRequest(); - request.setPath(TestUtils.getTestSeries(0, 0)); + request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0))); request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal()); request.setRequester(TestUtils.getNode(1)); request.setQueryId(0); @@ -808,7 +808,7 @@ public class DataGroupMemberTest extends BaseMember { partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), raftId))); dataGroupMember.setCharacter(NodeCharacter.LEADER); SingleSeriesQueryRequest request = new SingleSeriesQueryRequest(); - request.setPath(TestUtils.getTestSeries(0, 0)); + request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0))); request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal()); request.setRequester(TestUtils.getNode(1)); request.setQueryId(0); @@ -876,7 +876,7 @@ public class DataGroupMemberTest extends BaseMember { partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 0))); dataGroupMember.setCharacter(NodeCharacter.LEADER); SingleSeriesQueryRequest request = new SingleSeriesQueryRequest(); - request.setPath(TestUtils.getTestSeries(0, 0)); + request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0))); request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal()); request.setRequester(TestUtils.getNode(1)); request.setQueryId(0); @@ -944,7 +944,7 @@ public class DataGroupMemberTest extends BaseMember { partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 0))); dataGroupMember.setCharacter(NodeCharacter.LEADER); SingleSeriesQueryRequest request = new SingleSeriesQueryRequest(); - request.setPath(TestUtils.getTestSeries(0, 0)); + request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0))); request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal()); request.setRequester(TestUtils.getNode(10)); request.setQueryId(0); diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift index cd859fd..a7a623a 100644 --- a/thrift-cluster/src/main/thrift/cluster.thrift +++ b/thrift-cluster/src/main/thrift/cluster.thrift @@ -184,7 +184,7 @@ struct PullSchemaResp { } struct SingleSeriesQueryRequest { - 1: required string path + 1: required list<string> path 2: optional binary timeFilterBytes 3: optional binary valueFilterBytes 4: required long queryId
