This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch clusterVector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 334209f5c4954023e14d1c644266619a18cc36af Author: Alima777 <[email protected]> AuthorDate: Tue Oct 12 10:53:47 2021 +0800 Fix cluster vector meta error --- .../apache/iotdb/cluster/metadata/CMManager.java | 41 +++++++++++++--------- .../cluster/client/sync/SyncClientAdaptorTest.java | 4 ++- .../iotdb/AlignedTimeseriesSessionExample.java | 25 +++++-------- thrift-cluster/src/main/thrift/cluster.thrift | 1 + 4 files changed, 36 insertions(+), 35 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index f28308d..c5034a2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -196,6 +196,7 @@ public class CMManager extends MManager { } String measurement = fullPath.getMeasurement(); + System.out.println("FullPath: " + fullPath.getExactFullPath()); if (fullPath instanceof VectorPartialPath) { if (((VectorPartialPath) fullPath).getSubSensorsList().size() != 1) { return TSDataType.VECTOR; @@ -203,6 +204,7 @@ public class CMManager extends MManager { measurement = ((VectorPartialPath) fullPath).getSubSensor(0); } } + System.out.println("Measurement: " + measurement); // try remote cache first try { @@ -229,18 +231,17 @@ public class CMManager extends MManager { MeasurementMNode.getMeasurementMNode( null, measurementSchema.getMeasurementId(), measurementSchema, null); if (measurementSchema instanceof VectorMeasurementSchema) { - for (int i = 0; i < measurementSchema.getSubMeasurementsList().size(); i++) { + for (String subMeasurement : measurementSchema.getSubMeasurementsList()) { cacheMeta( - ((VectorPartialPath) fullPath).getPathWithSubSensor(i), measurementMNode, false); + new VectorPartialPath(fullPath.getDevice(), subMeasurement), + measurementMNode, + false); } - cacheMeta( - new PartialPath(fullPath.getDevice(), measurementSchema.getMeasurementId()), - measurementMNode, - true); } else { cacheMeta(fullPath, measurementMNode, true); } - return measurementMNode.getDataType(fullPath.getMeasurement()); + System.out.println(measurementSchema); + return measurementMNode.getDataType(measurement); } else { throw e; } @@ -974,7 +975,7 @@ public class CMManager extends MManager { throws MetadataException { List<PartialPath> result = new ArrayList<>(); // split the paths by the data group they belong to - Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>(); + Map<PartitionGroup, List<String>> remoteGroupPathMap = new HashMap<>(); for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) { String storageGroupName = sgPathEntry.getKey(); PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue()); @@ -1000,14 +1001,15 @@ public class CMManager extends MManager { result.addAll(allTimeseriesName); } else { // batch the queries of the same group to reduce communication - groupPathMap + remoteGroupPathMap .computeIfAbsent(partitionGroup, p -> new ArrayList<>()) .add(pathUnderSG.getFullPath()); } } // query each data group separately - for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) { + for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : + remoteGroupPathMap.entrySet()) { PartitionGroup partitionGroup = partitionGroupPathEntry.getKey(); List<String> pathsToQuery = partitionGroupPathEntry.getValue(); result.addAll(getMatchedPaths(partitionGroup, pathsToQuery, withAlias)); @@ -1091,11 +1093,15 @@ public class CMManager extends MManager { List<PartialPath> partialPaths = new ArrayList<>(); for (int i = 0; i < result.paths.size(); i++) { try { - PartialPath partialPath = new PartialPath(result.paths.get(i)); + PartialPath matchedPath = new PartialPath(result.paths.get(i)); + if (result.isVectorPath.get(i)) { + matchedPath = + new VectorPartialPath(matchedPath.getDevice(), matchedPath.getMeasurement()); + } if (withAlias) { - partialPath.setMeasurementAlias(result.aliasList.get(i)); + matchedPath.setMeasurementAlias(result.aliasList.get(i)); } - partialPaths.add(partialPath); + partialPaths.add(matchedPath); } catch (IllegalPathException e) { // ignore } @@ -1713,17 +1719,17 @@ public class CMManager extends MManager { public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias) throws MetadataException { List<String> retPaths = new ArrayList<>(); + List<Boolean> isVectorPath = new ArrayList<>(); List<String> alias = null; - if (withAlias) { - alias = new ArrayList<>(); - } if (withAlias) { + alias = new ArrayList<>(); for (String path : paths) { List<PartialPath> allTimeseriesPathWithAlias = super.getAllTimeseriesPathWithAlias(new PartialPath(path), -1, -1).left; for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) { - retPaths.add(timeseriesPathWithAlias.getFullPath()); + retPaths.add(timeseriesPathWithAlias.getExactFullPath()); + isVectorPath.add(timeseriesPathWithAlias instanceof VectorPartialPath); alias.add(timeseriesPathWithAlias.getMeasurementAlias()); } } @@ -1733,6 +1739,7 @@ public class CMManager extends MManager { GetAllPathsResult getAllPathsResult = new GetAllPathsResult(); getAllPathsResult.setPaths(retPaths); + getAllPathsResult.setIsVectorPath(isVectorPath); getAllPathsResult.setAliasList(alias); return getAllPathsResult; } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java index 7c11fce..0871eff 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java @@ -245,7 +245,9 @@ public class SyncClientAdaptorTest { List<String> path, boolean withAlias, AsyncMethodCallback<GetAllPathsResult> resultHandler) { - resultHandler.onComplete(new GetAllPathsResult(path)); + List<Boolean> isVectorPaths = new ArrayList<>(path.size()); + Collections.fill(isVectorPaths, false); + resultHandler.onComplete(new GetAllPathsResult(path, isVectorPaths)); } @Override diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java index bbfd4c7..598868f 100644 --- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java @@ -61,26 +61,17 @@ public class AlignedTimeseriesSessionExample { createTemplate(); createAlignedTimeseries(); - insertAlignedRecord(); - insertAlignedRecords(); - insertAlignedRecordsOfOneDevices(); - - insertAlignedStringRecord(); - insertAlignedStringRecords(); - insertTabletWithAlignedTimeseriesMethod1(); insertTabletWithAlignedTimeseriesMethod2(); - insertNullableTabletWithAlignedTimeseries(); - insertTabletsWithAlignedTimeseries(); - - selectTest(); - selectWithValueFilterTest(); - selectWithGroupByTest(); - selectWithLastTest(); - - selectWithAggregationTest(); - selectWithAlignByDeviceTest(); + // selectTest(); + // selectWithValueFilterTest(); + // selectWithGroupByTest(); + // selectWithLastTest(); + // + // selectWithAggregationTest(); + // + // selectWithAlignByDeviceTest(); session.close(); } diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift index 61ae467..755b806 100644 --- a/thrift-cluster/src/main/thrift/cluster.thrift +++ b/thrift-cluster/src/main/thrift/cluster.thrift @@ -265,6 +265,7 @@ struct LastQueryRequest { struct GetAllPathsResult { 1: required list<string> paths 2: optional list<string> aliasList + 3: required list<bool> isVectorPath }
