This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch MetadataIndex
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/MetadataIndex by this push:
     new 0ac73c0  Match Vector with MetadataIndexTree
     new 65e13fe  Merge remote-tracking branch 'origin/MetadataIndex'
0ac73c0 is described below

commit 0ac73c0e3bd26a801ee093d65a9be252fd128ee4
Author: samperson1997 <[email protected]>
AuthorDate: Tue Apr 27 15:00:03 2021 +0800

    Match Vector with MetadataIndexTree
---
 .../file/metadata/MetadataIndexConstructor.java    |  53 +++++++--
 .../tsfile/file/metadata/MetadataIndexNode.java    |   6 +-
 .../tsfile/file/metadata/TimeseriesMetadata.java   |  11 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  | 121 +++++++++++++++------
 4 files changed, 144 insertions(+), 47 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index dcba911..47fe2e3 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -61,18 +61,55 @@ public class MetadataIndexConstructor {
       TimeseriesMetadata timeseriesMetadata;
       MetadataIndexNode currentIndexNode =
           new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+      int serializedTimeseriesMetadataNum = 0;
       for (int i = 0; i < entry.getValue().size(); i++) {
         timeseriesMetadata = entry.getValue().get(i);
-        // when constructing from leaf node, every "degree number of nodes" 
are related to an entry
-        if (i % config.getMaxDegreeOfIndexNode() == 0) {
-          if (currentIndexNode.isFull()) {
-            addCurrentIndexNodeToQueue(currentIndexNode, 
measurementMetadataIndexQueue, out);
-            currentIndexNode = new 
MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        if (timeseriesMetadata.getTimeSeriesMetadataType() == (byte) 0x80) {
+          // this timeseriesMetadata is time column of a vector series
+          // calculate the number of value columns in this vector
+          int numOfValueColumns = 0;
+          for (int j = i + 1; j < entry.getValue().size(); j++) {
+            if (entry.getValue().get(j).getTimeSeriesMetadataType() == (byte) 
0x40) {
+              numOfValueColumns++;
+            } else {
+              break;
+            }
           }
-          currentIndexNode.addEntry(
-              new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), 
out.getPosition()));
+
+          // only add time column of vector into LEAF_MEASUREMENT node
+          if (currentIndexNode.isEmpty()
+              || serializedTimeseriesMetadataNum + numOfValueColumns + 1
+                  > config.getMaxDegreeOfIndexNode() * 1.5) {
+            currentIndexNode.addEntry(
+                new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), 
out.getPosition()));
+            serializedTimeseriesMetadataNum = 0;
+          }
+
+          timeseriesMetadata.serializeTo(out.wrapAsStream());
+          serializedTimeseriesMetadataNum++;
+          for (int j = 0; j < numOfValueColumns; j++) {
+            i++;
+            timeseriesMetadata = entry.getValue().get(i);
+            // value columns of vector should not be added into 
LEAF_MEASUREMENT node
+            timeseriesMetadata.serializeTo(out.wrapAsStream());
+            serializedTimeseriesMetadataNum++;
+          }
+        } else {
+          // when constructing from leaf node, every "degree number of nodes" 
are related to an
+          // entry
+          if (serializedTimeseriesMetadataNum == 0
+              || serializedTimeseriesMetadataNum >= 
config.getMaxDegreeOfIndexNode()) {
+            if (currentIndexNode.isFull()) {
+              addCurrentIndexNodeToQueue(currentIndexNode, 
measurementMetadataIndexQueue, out);
+              currentIndexNode = new 
MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+            }
+            currentIndexNode.addEntry(
+                new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), 
out.getPosition()));
+            serializedTimeseriesMetadataNum = 0;
+          }
+          timeseriesMetadata.serializeTo(out.wrapAsStream());
+          serializedTimeseriesMetadataNum++;
         }
-        timeseriesMetadata.serializeTo(out.wrapAsStream());
       }
       addCurrentIndexNodeToQueue(currentIndexNode, 
measurementMetadataIndexQueue, out);
       deviceMetadataIndexMap.put(
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 427e9ee..9099817 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -75,7 +75,11 @@ public class MetadataIndexNode {
   }
 
   boolean isFull() {
-    return children.size() == config.getMaxDegreeOfIndexNode();
+    return children.size() >= config.getMaxDegreeOfIndexNode();
+  }
+
+  boolean isEmpty() {
+    return children.size() == 0;
   }
 
   MetadataIndexEntry peek() {
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index df285dc..60e57af 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -39,9 +39,14 @@ public class TimeseriesMetadata implements Accountable, 
ITimeSeriesMetadata {
   private long startOffsetOfChunkMetaDataList;
   /**
    * 0 means this time series has only one chunk, no need to save the 
statistic again in chunk
-   * metadata 1 means this time series has more than one chunk, should save 
the statistic again in
-   * chunk metadata if the 8th bit is 1, it means it is the time column of a 
vector series if the
-   * 7th bit is 1, it means it is the value column of a vector series
+   * metadata;
+   *
+   * <p>1 means this time series has more than one chunk, should save the 
statistic again in chunk
+   * metadata;
+   *
+   * <p>if the 8th bit is 1, it means it is the time column of a vector series;
+   *
+   * <p>if the 7th bit is 1, it means it is the value column of a vector series
    */
   private byte timeSeriesMetadataType;
 
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 41ed225..578d2ab 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -241,15 +241,45 @@ public class TsFileIOWriter {
     ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
 
     // group ChunkMetadata by series
+    // only contains ordinary path and time column of vector series
     Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+
+    // time column -> ChunkMetadataList TreeMap of value columns in vector
+    Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap = new 
HashMap<>();
+
     for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
-      for (IChunkMetadata chunkMetadata : 
chunkGroupMetadata.getChunkMetadataList()) {
-        Path series = new Path(chunkGroupMetadata.getDevice(), 
chunkMetadata.getMeasurementUid());
-        chunkMetadataListMap.computeIfAbsent(series, k -> new 
ArrayList<>()).add(chunkMetadata);
+      List<ChunkMetadata> chunkMetadataList = 
chunkGroupMetadata.getChunkMetadataList();
+      int idx = 0;
+      while (idx < chunkMetadataList.size()) {
+        IChunkMetadata chunkMetadata = chunkMetadataList.get(idx);
+        if (chunkMetadata.getMask() == 0) {
+          Path series = new Path(chunkGroupMetadata.getDevice(), 
chunkMetadata.getMeasurementUid());
+          chunkMetadataListMap.computeIfAbsent(series, k -> new 
ArrayList<>()).add(chunkMetadata);
+          idx++;
+        } else if (chunkMetadata.getMask() == (byte) 0x80) {
+          // time column of a vector series
+          Path series = new Path(chunkGroupMetadata.getDevice(), 
chunkMetadata.getMeasurementUid());
+          chunkMetadataListMap.computeIfAbsent(series, k -> new 
ArrayList<>()).add(chunkMetadata);
+          idx++;
+          Map<Path, List<IChunkMetadata>> chunkMetadataListMapInVector = new 
TreeMap<>();
+
+          // value columns of a vector series
+          while (idx < chunkMetadataList.size()
+              && chunkMetadataList.get(idx).getMask() == (byte) 0x40) {
+            chunkMetadata = chunkMetadataList.get(idx);
+            Path vectorSeries =
+                new Path(chunkGroupMetadata.getDevice(), 
chunkMetadata.getMeasurementUid());
+            chunkMetadataListMapInVector
+                .computeIfAbsent(vectorSeries, k -> new ArrayList<>())
+                .add(chunkMetadata);
+            idx++;
+          }
+          vectorToPathsMap.put(series, chunkMetadataListMapInVector);
+        }
       }
     }
 
-    MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
+    MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap, 
vectorToPathsMap);
     TsFileMetadata tsFileMetaData = new TsFileMetadata();
     tsFileMetaData.setMetadataIndex(metadataIndex);
     tsFileMetaData.setMetaOffset(metaOffset);
@@ -290,49 +320,70 @@ public class TsFileIOWriter {
    *
    * @return MetadataIndexEntry list in TsFileMetadata
    */
-  private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> 
chunkMetadataListMap)
+  private MetadataIndexNode flushMetadataIndex(
+      Map<Path, List<IChunkMetadata>> chunkMetadataListMap,
+      Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap)
       throws IOException {
 
     // convert ChunkMetadataList to this field
     deviceTimeseriesMetadataMap = new LinkedHashMap<>();
     // create device -> TimeseriesMetaDataList Map
     for (Map.Entry<Path, List<IChunkMetadata>> entry : 
chunkMetadataListMap.entrySet()) {
-      Path path = entry.getKey();
-      String device = path.getDevice();
-
-      // create TimeseriesMetaData
-      PublicBAOS publicBAOS = new PublicBAOS();
-      TSDataType dataType = entry.getValue().get(entry.getValue().size() - 
1).getDataType();
-      Statistics seriesStatistics = Statistics.getStatsByType(dataType);
-
-      int chunkMetadataListLength = 0;
-      boolean serializeStatistic = (entry.getValue().size() > 1);
-      // flush chunkMetadataList one by one
-      for (IChunkMetadata chunkMetadata : entry.getValue()) {
-        if (!chunkMetadata.getDataType().equals(dataType)) {
-          continue;
-        }
-        chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, 
serializeStatistic);
-        seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
-      }
-      TimeseriesMetadata timeseriesMetadata =
-          new TimeseriesMetadata(
-              (byte)
-                  ((serializeStatistic ? (byte) 1 : (byte) 0) | 
entry.getValue().get(0).getMask()),
-              chunkMetadataListLength,
-              path.getMeasurement(),
-              dataType,
-              seriesStatistics,
-              publicBAOS);
-      deviceTimeseriesMetadataMap
-          .computeIfAbsent(device, k -> new ArrayList<>())
-          .add(timeseriesMetadata);
+      // for ordinary path
+      flushOneChunkMetadata(entry.getKey(), entry.getValue(), 
vectorToPathsMap);
     }
 
     // construct TsFileMetadata and return
     return 
MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, 
out);
   }
 
+  private void flushOneChunkMetadata(
+      Path path,
+      List<IChunkMetadata> chunkMetadataList,
+      Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap)
+      throws IOException {
+    // create TimeseriesMetaData
+    PublicBAOS publicBAOS = new PublicBAOS();
+    TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 
1).getDataType();
+    Statistics seriesStatistics = Statistics.getStatsByType(dataType);
+
+    int chunkMetadataListLength = 0;
+    boolean serializeStatistic = (chunkMetadataList.size() > 1);
+    // flush chunkMetadataList one by one
+    for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+      if (!chunkMetadata.getDataType().equals(dataType)) {
+        continue;
+      }
+      chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, 
serializeStatistic);
+      seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+    }
+
+    TimeseriesMetadata timeseriesMetadata =
+        new TimeseriesMetadata(
+            (byte)
+                ((serializeStatistic ? (byte) 1 : (byte) 0) | 
chunkMetadataList.get(0).getMask()),
+            chunkMetadataListLength,
+            path.getMeasurement(),
+            dataType,
+            seriesStatistics,
+            publicBAOS);
+    deviceTimeseriesMetadataMap
+        .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+        .add(timeseriesMetadata);
+
+    // for VECTOR
+    for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+      // chunkMetadata is time column of a vector series
+      if (chunkMetadata.getMask() == (byte) 0x80) {
+        Map<Path, List<IChunkMetadata>> vectorMap = vectorToPathsMap.get(path);
+
+        for (Map.Entry<Path, List<IChunkMetadata>> entry : 
vectorMap.entrySet()) {
+          flushOneChunkMetadata(entry.getKey(), entry.getValue(), 
vectorToPathsMap);
+        }
+      }
+    }
+  }
+
   /**
    * get the length of normal OutputStream.
    *

Reply via email to