This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_vector by this push:
     new fc5ce31  refactor the interaction with MManager while executing 
AlignByDevicePlan (#4359)
fc5ce31 is described below

commit fc5ce315498f4cf3906cb07b5b30d847365b7ac4
Author: liuminghui233 <[email protected]>
AuthorDate: Thu Nov 11 19:15:27 2021 +0800

    refactor the interaction with MManager while executing AlignByDevicePlan 
(#4359)
---
 .../db/query/dataset/AlignByDeviceDataSet.java     | 47 +++++-----------------
 1 file changed, 10 insertions(+), 37 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index d2ee07a..0486dfe 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.query.dataset;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
@@ -42,13 +41,10 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -135,13 +131,14 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     while (deviceIterator.hasNext()) {
       currentDevice = deviceIterator.next();
       // get all measurements of current device
-      Set<String> measurementOfGivenDevice = 
getMeasurementsUnderGivenDevice(currentDevice);
+      Map<String, MeasurementPath> measurementToPathMap =
+          getMeasurementsUnderGivenDevice(currentDevice);
+      Set<String> measurementOfGivenDevice = measurementToPathMap.keySet();
 
       // extract paths and aggregations queried from all measurements
       // executeColumns is for calculating rowRecord
       executeColumns = new ArrayList<>();
       List<PartialPath> executePaths = new ArrayList<>();
-      List<TSDataType> tsDataTypes = new ArrayList<>();
       List<String> executeAggregations = new ArrayList<>();
       for (Entry<String, MeasurementInfo> entry : 
measurementInfoMap.entrySet()) {
         if (entry.getValue().getMeasurementType() != MeasurementType.Exist) {
@@ -157,8 +154,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
         }
         if (measurementOfGivenDevice.contains(measurement)) {
           executeColumns.add(column);
-          executePaths.add(transformPath(currentDevice, measurement));
-          
tsDataTypes.add(measurementInfoMap.get(column).getMeasurementDataType());
+          executePaths.add(measurementToPathMap.get(measurement));
         }
       }
 
@@ -220,41 +216,18 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     return false;
   }
 
-  /**
-   * Get all measurements under given device. For a vectorMeasurementSchema, 
we return its
-   * measurementId + all subMeasurement. e.g. schema: vector1[s1, s2], return 
["vector1.s1",
-   * "vector1.s2"].
-   */
-  protected Set<String> getMeasurementsUnderGivenDevice(PartialPath device) 
throws IOException {
+  /** Get all measurements under given device. */
+  protected Map<String, MeasurementPath> 
getMeasurementsUnderGivenDevice(PartialPath device)
+      throws IOException {
     try {
-      Set<String> res = new HashSet<>();
       // TODO: Implement this method in Cluster MManager
+      Map<String, MeasurementPath> measurementToPathMap = new HashMap<>();
       List<MeasurementPath> measurementPaths =
           IoTDB.metaManager.getAllMeasurementByDevicePath(device);
       for (MeasurementPath measurementPath : measurementPaths) {
-        res.add(measurementPath.getMeasurement());
-      }
-
-      return res;
-    } catch (MetadataException e) {
-      throw new IOException("Cannot get node from " + device, e);
-    }
-  }
-
-  /**
-   * Attention. For a vectorPath(root.sg.d1.vector1.s1), device is root.sg.d1, 
measurement is
-   * "vector1.s1".
-   */
-  private PartialPath transformPath(PartialPath device, String measurement) 
throws IOException {
-    try {
-      PartialPath fullPath = new PartialPath(device.getFullPath(), 
measurement);
-      IMeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(fullPath);
-      if (schema instanceof UnaryMeasurementSchema) {
-        return new MeasurementPath(device.getFullPath(), measurement, schema);
-      } else {
-        String vectorPath = fullPath.getDevice();
-        return new AlignedPath(vectorPath, fullPath.getMeasurement());
+        measurementToPathMap.put(measurementPath.getMeasurement(), 
measurementPath);
       }
+      return measurementToPathMap;
     } catch (MetadataException e) {
       throw new IOException("Cannot get node from " + device, e);
     }

Reply via email to