This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch MetadataIndex
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/MetadataIndex by this push:
new 0ac73c0 Match Vector with MetadataIndexTree
new 65e13fe Merge remote-tracking branch 'origin/MetadataIndex'
0ac73c0 is described below
commit 0ac73c0e3bd26a801ee093d65a9be252fd128ee4
Author: samperson1997 <[email protected]>
AuthorDate: Tue Apr 27 15:00:03 2021 +0800
Match Vector with MetadataIndexTree
---
.../file/metadata/MetadataIndexConstructor.java | 53 +++++++--
.../tsfile/file/metadata/MetadataIndexNode.java | 6 +-
.../tsfile/file/metadata/TimeseriesMetadata.java | 11 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 121 +++++++++++++++------
4 files changed, 144 insertions(+), 47 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index dcba911..47fe2e3 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -61,18 +61,55 @@ public class MetadataIndexConstructor {
TimeseriesMetadata timeseriesMetadata;
MetadataIndexNode currentIndexNode =
new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ int serializedTimeseriesMetadataNum = 0;
for (int i = 0; i < entry.getValue().size(); i++) {
timeseriesMetadata = entry.getValue().get(i);
- // when constructing from leaf node, every "degree number of nodes"
are related to an entry
- if (i % config.getMaxDegreeOfIndexNode() == 0) {
- if (currentIndexNode.isFull()) {
- addCurrentIndexNodeToQueue(currentIndexNode,
measurementMetadataIndexQueue, out);
- currentIndexNode = new
MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ if (timeseriesMetadata.getTimeSeriesMetadataType() == (byte) 0x80) {
+ // this timeseriesMetadata is time column of a vector series
+ // calculate the number of value columns in this vector
+ int numOfValueColumns = 0;
+ for (int j = i + 1; j < entry.getValue().size(); j++) {
+ if (entry.getValue().get(j).getTimeSeriesMetadataType() == (byte)
0x40) {
+ numOfValueColumns++;
+ } else {
+ break;
+ }
}
- currentIndexNode.addEntry(
- new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(),
out.getPosition()));
+
+ // only add time column of vector into LEAF_MEASUREMENT node
+ if (currentIndexNode.isEmpty()
+ || serializedTimeseriesMetadataNum + numOfValueColumns + 1
+ > config.getMaxDegreeOfIndexNode() * 1.5) {
+ currentIndexNode.addEntry(
+ new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(),
out.getPosition()));
+ serializedTimeseriesMetadataNum = 0;
+ }
+
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
+ serializedTimeseriesMetadataNum++;
+ for (int j = 0; j < numOfValueColumns; j++) {
+ i++;
+ timeseriesMetadata = entry.getValue().get(i);
+ // value columns of vector should not be added into
LEAF_MEASUREMENT node
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
+ serializedTimeseriesMetadataNum++;
+ }
+ } else {
+ // when constructing from leaf node, every "degree number of nodes"
are related to an
+ // entry
+ if (serializedTimeseriesMetadataNum == 0
+ || serializedTimeseriesMetadataNum >=
config.getMaxDegreeOfIndexNode()) {
+ if (currentIndexNode.isFull()) {
+ addCurrentIndexNodeToQueue(currentIndexNode,
measurementMetadataIndexQueue, out);
+ currentIndexNode = new
MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ }
+ currentIndexNode.addEntry(
+ new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(),
out.getPosition()));
+ serializedTimeseriesMetadataNum = 0;
+ }
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
+ serializedTimeseriesMetadataNum++;
}
- timeseriesMetadata.serializeTo(out.wrapAsStream());
}
addCurrentIndexNodeToQueue(currentIndexNode,
measurementMetadataIndexQueue, out);
deviceMetadataIndexMap.put(
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 427e9ee..9099817 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -75,7 +75,11 @@ public class MetadataIndexNode {
}
boolean isFull() {
- return children.size() == config.getMaxDegreeOfIndexNode();
+ return children.size() >= config.getMaxDegreeOfIndexNode();
+ }
+
+ boolean isEmpty() {
+ return children.size() == 0;
}
MetadataIndexEntry peek() {
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index df285dc..60e57af 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -39,9 +39,14 @@ public class TimeseriesMetadata implements Accountable,
ITimeSeriesMetadata {
private long startOffsetOfChunkMetaDataList;
/**
* 0 means this time series has only one chunk, no need to save the
statistic again in chunk
- * metadata 1 means this time series has more than one chunk, should save
the statistic again in
- * chunk metadata if the 8th bit is 1, it means it is the time column of a
vector series if the
- * 7th bit is 1, it means it is the value column of a vector series
+ * metadata;
+ *
+ * <p>1 means this time series has more than one chunk, should save the
statistic again in chunk
+ * metadata;
+ *
+ * <p>if the 8th bit is 1, it means it is the time column of a vector series;
+ *
+ * <p>if the 7th bit is 1, it means it is the value column of a vector series
*/
private byte timeSeriesMetadataType;
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 41ed225..578d2ab 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -241,15 +241,45 @@ public class TsFileIOWriter {
ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
// group ChunkMetadata by series
+ // only contains ordinary path and time column of vector series
Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+
+ // time column -> ChunkMetadataList TreeMap of value columns in vector
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap = new
HashMap<>();
+
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
- for (IChunkMetadata chunkMetadata :
chunkGroupMetadata.getChunkMetadataList()) {
- Path series = new Path(chunkGroupMetadata.getDevice(),
chunkMetadata.getMeasurementUid());
- chunkMetadataListMap.computeIfAbsent(series, k -> new
ArrayList<>()).add(chunkMetadata);
+ List<ChunkMetadata> chunkMetadataList =
chunkGroupMetadata.getChunkMetadataList();
+ int idx = 0;
+ while (idx < chunkMetadataList.size()) {
+ IChunkMetadata chunkMetadata = chunkMetadataList.get(idx);
+ if (chunkMetadata.getMask() == 0) {
+ Path series = new Path(chunkGroupMetadata.getDevice(),
chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new
ArrayList<>()).add(chunkMetadata);
+ idx++;
+ } else if (chunkMetadata.getMask() == (byte) 0x80) {
+ // time column of a vector series
+ Path series = new Path(chunkGroupMetadata.getDevice(),
chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new
ArrayList<>()).add(chunkMetadata);
+ idx++;
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMapInVector = new
TreeMap<>();
+
+ // value columns of a vector series
+ while (idx < chunkMetadataList.size()
+ && chunkMetadataList.get(idx).getMask() == (byte) 0x40) {
+ chunkMetadata = chunkMetadataList.get(idx);
+ Path vectorSeries =
+ new Path(chunkGroupMetadata.getDevice(),
chunkMetadata.getMeasurementUid());
+ chunkMetadataListMapInVector
+ .computeIfAbsent(vectorSeries, k -> new ArrayList<>())
+ .add(chunkMetadata);
+ idx++;
+ }
+ vectorToPathsMap.put(series, chunkMetadataListMapInVector);
+ }
}
}
- MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
+ MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap,
vectorToPathsMap);
TsFileMetadata tsFileMetaData = new TsFileMetadata();
tsFileMetaData.setMetadataIndex(metadataIndex);
tsFileMetaData.setMetaOffset(metaOffset);
@@ -290,49 +320,70 @@ public class TsFileIOWriter {
*
* @return MetadataIndexEntry list in TsFileMetadata
*/
- private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>>
chunkMetadataListMap)
+ private MetadataIndexNode flushMetadataIndex(
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMap,
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap)
throws IOException {
// convert ChunkMetadataList to this field
deviceTimeseriesMetadataMap = new LinkedHashMap<>();
// create device -> TimeseriesMetaDataList Map
for (Map.Entry<Path, List<IChunkMetadata>> entry :
chunkMetadataListMap.entrySet()) {
- Path path = entry.getKey();
- String device = path.getDevice();
-
- // create TimeseriesMetaData
- PublicBAOS publicBAOS = new PublicBAOS();
- TSDataType dataType = entry.getValue().get(entry.getValue().size() -
1).getDataType();
- Statistics seriesStatistics = Statistics.getStatsByType(dataType);
-
- int chunkMetadataListLength = 0;
- boolean serializeStatistic = (entry.getValue().size() > 1);
- // flush chunkMetadataList one by one
- for (IChunkMetadata chunkMetadata : entry.getValue()) {
- if (!chunkMetadata.getDataType().equals(dataType)) {
- continue;
- }
- chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS,
serializeStatistic);
- seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
- }
- TimeseriesMetadata timeseriesMetadata =
- new TimeseriesMetadata(
- (byte)
- ((serializeStatistic ? (byte) 1 : (byte) 0) |
entry.getValue().get(0).getMask()),
- chunkMetadataListLength,
- path.getMeasurement(),
- dataType,
- seriesStatistics,
- publicBAOS);
- deviceTimeseriesMetadataMap
- .computeIfAbsent(device, k -> new ArrayList<>())
- .add(timeseriesMetadata);
+ // for ordinary path
+ flushOneChunkMetadata(entry.getKey(), entry.getValue(),
vectorToPathsMap);
}
// construct TsFileMetadata and return
return
MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap,
out);
}
+ private void flushOneChunkMetadata(
+ Path path,
+ List<IChunkMetadata> chunkMetadataList,
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap)
+ throws IOException {
+ // create TimeseriesMetaData
+ PublicBAOS publicBAOS = new PublicBAOS();
+ TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() -
1).getDataType();
+ Statistics seriesStatistics = Statistics.getStatsByType(dataType);
+
+ int chunkMetadataListLength = 0;
+ boolean serializeStatistic = (chunkMetadataList.size() > 1);
+ // flush chunkMetadataList one by one
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ if (!chunkMetadata.getDataType().equals(dataType)) {
+ continue;
+ }
+ chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS,
serializeStatistic);
+ seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+ }
+
+ TimeseriesMetadata timeseriesMetadata =
+ new TimeseriesMetadata(
+ (byte)
+ ((serializeStatistic ? (byte) 1 : (byte) 0) |
chunkMetadataList.get(0).getMask()),
+ chunkMetadataListLength,
+ path.getMeasurement(),
+ dataType,
+ seriesStatistics,
+ publicBAOS);
+ deviceTimeseriesMetadataMap
+ .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+ .add(timeseriesMetadata);
+
+ // for VECTOR
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ // chunkMetadata is time column of a vector series
+ if (chunkMetadata.getMask() == (byte) 0x80) {
+ Map<Path, List<IChunkMetadata>> vectorMap = vectorToPathsMap.get(path);
+
+ for (Map.Entry<Path, List<IChunkMetadata>> entry :
vectorMap.entrySet()) {
+ flushOneChunkMetadata(entry.getKey(), entry.getValue(),
vectorToPathsMap);
+ }
+ }
+ }
+ }
+
/**
* get the length of normal OutputStream.
*