This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
     new a25aad0ab4 refactor some codes
a25aad0ab4 is described below

commit a25aad0ab4d858432f2a1dae004c7e30a60ee5a3
Author: Liu Xuxin <[email protected]>
AuthorDate: Fri Sep 9 10:15:29 2022 +0800

    refactor some codes
---
 .../write/writer/MemoryControlTsFileIOWriter.java  | 113 +++++++--------------
 .../tsfile/write/TsFileIntegrityCheckingTool.java  |  30 ++++--
 2 files changed, 63 insertions(+), 80 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
index 0c171c4419..2cfa93e11b 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.tsfile.write.writer;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
@@ -62,8 +61,6 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
   protected long currentChunkMetadataSize = 0L;
   protected File chunkMetadataTempFile;
   protected LocalTsFileOutput tempOutput;
-  // it stores the start address of persisted chunk metadata for per series
-  //  protected Queue<Long> segmentForPerSeries = new ArrayDeque<>();
   protected volatile boolean hasChunkMetadataInDisk = false;
   protected String currentSeries = null;
   // record the total num of path in order to make bloom filter
@@ -71,7 +68,6 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
   Path lastSerializePath = null;
 
   public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
-  private static final byte VECTOR_TYPE = 1;
   private static final byte NORMAL_TYPE = 2;
 
   public MemoryControlTsFileIOWriter(File file, long maxMetadataSize) throws 
IOException {
@@ -86,6 +82,14 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
     super.endCurrentChunk();
   }
 
+  /**
+   * Check if the size of chunk metadata in memory is greater than the given 
threshold. If so, the
+   * chunk metadata will be written to a temp files. <b>Notice! If you are 
writing a aligned device,
+   * you should make sure all data of current writing device has been written 
before this method is
+   * called.</b> For not aligned series, there is no such limitation.
+   *
+   * @throws IOException
+   */
   public void checkMetadataSizeAndMayFlush() throws IOException {
     // This function should be called after all data of an aligned device has 
been written
     if (currentChunkMetadataSize > maxMetadataSize) {
@@ -98,6 +102,12 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
     }
   }
 
+  /**
+   * Sort the chunk metadata by the lexicographical order and the start time 
of the chunk, then
+   * flush them to a temp file.
+   *
+   * @throws IOException
+   */
   protected void sortAndFlushChunkMetadata() throws IOException {
     // group by series
     Map<Path, List<IChunkMetadata>> chunkMetadataListMap = 
groupChunkMetadataListBySeries();
@@ -106,10 +116,11 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
     }
     hasChunkMetadataInDisk = true;
     // the file structure in temp file will be
-    // ChunkType | chunkSize | chunkBuffer
+    // chunkSize | chunkBuffer
     for (Map.Entry<Path, List<IChunkMetadata>> entry : 
chunkMetadataListMap.entrySet()) {
       Path seriesPath = entry.getKey();
       if (!seriesPath.equals(lastSerializePath)) {
+        // record the count of path to construct bloom filter later
         pathCount++;
       }
       List<IChunkMetadata> iChunkMetadataList = entry.getValue();
@@ -126,48 +137,7 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
   private void writeChunkMetadata(
       List<IChunkMetadata> iChunkMetadataList, Path seriesPath, 
LocalTsFileOutput output)
       throws IOException {
-    if (iChunkMetadataList.size() == 0) {
-      return;
-    }
-    writeNormalChunkMetadata(iChunkMetadataList, seriesPath, output);
-  }
-
-  private List<IChunkMetadata> packAlignedChunkMetadata(List<IChunkMetadata> 
iChunkMetadataList) {
-    IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
-    List<IChunkMetadata> currentValueChunk = new ArrayList<>();
-    List<IChunkMetadata> alignedChunkMetadata = new ArrayList<>();
-    for (int i = 1; i < iChunkMetadataList.size(); ++i) {
-      if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
-        alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk, 
currentValueChunk));
-        currentTimeChunk = iChunkMetadataList.get(i);
-        currentValueChunk = new ArrayList<>();
-      } else {
-        currentValueChunk.add(iChunkMetadataList.get(i));
-      }
-    }
-    if (currentValueChunk.size() > 0) {
-      alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk, 
currentValueChunk));
-    }
-    return alignedChunkMetadata;
-  }
-
-  private void writeAlignedChunkMetadata(
-      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, 
LocalTsFileOutput output)
-      throws IOException {
-    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
-      ReadWriteIOUtils.write(VECTOR_TYPE, output);
-      PublicBAOS buffer = new PublicBAOS();
-      int size = chunkMetadata.serializeWithFullInfo(buffer, 
seriesPath.getDevice());
-      ReadWriteIOUtils.write(size, output);
-      buffer.writeTo(output);
-    }
-  }
-
-  private void writeNormalChunkMetadata(
-      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, 
LocalTsFileOutput output)
-      throws IOException {
     for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
-      ReadWriteIOUtils.write(NORMAL_TYPE, output);
       PublicBAOS buffer = new PublicBAOS();
       int size = chunkMetadata.serializeWithFullInfo(buffer, 
seriesPath.getFullPath());
       ReadWriteIOUtils.write(size, output);
@@ -177,19 +147,20 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
 
   @Override
   public void endFile() throws IOException {
-    if (hasChunkMetadataInDisk) {
-      // there is some chunk metadata already been written to the disk
-      // first we should flush the remaining chunk metadata in memory to disk
-      // then read the persisted chunk metadata from disk
-      sortAndFlushChunkMetadata();
-      tempOutput.close();
-    } else {
-      // sort the chunk metadata in memory, construct the index tree
+    if (!hasChunkMetadataInDisk) {
+      // all the chunk metadata is stored in memory
+      // sort the chunk metadata, construct the index tree
       // and just close the file
       super.endFile();
       return;
     }
 
+    // there is some chunk metadata already been written to the disk
+    // first we should flush the remaining chunk metadata in memory to disk
+    // then read the persisted chunk metadata from disk
+    sortAndFlushChunkMetadata();
+    tempOutput.close();
+
     // read in the chunk metadata, and construct the index tree
     readChunkMetadataAndConstructIndexTree();
 
@@ -237,7 +208,9 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
       // construct the index tree node for the series
       Path currentPath = null;
       if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
-        // remove the last . in the series id
+        // this series is the time column of the aligned device
+        // the full series path will be like "root.sg.d."
+        // we remove the last . in the series id here
         currentDevice = currentSeries.substring(0, currentSeries.length() - 1);
       } else {
         currentPath = new Path(currentSeries, true);
@@ -351,31 +324,30 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
       return currentPair != null || this.input.position() < endPosition;
     }
 
-    public Pair<String, IChunkMetadata> getNextSeriesNameAndChunkMetadata() 
throws IOException {
+    /**
+     * Read in next chunk, return the series full path and the chunk metadata.
+     *
+     * @return
+     * @throws IOException
+     */
+    protected Pair<String, IChunkMetadata> getNextSeriesNameAndChunkMetadata() 
throws IOException {
       if (input.position() >= endPosition) {
         currentPair = null;
         return null;
       }
-      byte type = readNextChunkMetadataType();
       int size = readNextChunkMetadataSize();
       ByteBuffer chunkBuffer = ByteBuffer.allocate(size);
       ReadWriteIOUtils.readAsPossible(input, chunkBuffer);
       chunkBuffer.flip();
-      if (type == NORMAL_TYPE) {
-        ChunkMetadata chunkMetadata = new ChunkMetadata();
-        String seriesPath = ChunkMetadata.deserializeWithFullInfo(chunkBuffer, 
chunkMetadata);
-        currentPair = new Pair<>(seriesPath, chunkMetadata);
-      } else {
-        AlignedChunkMetadata chunkMetadata = new AlignedChunkMetadata();
-        String devicePath =
-            AlignedChunkMetadata.deserializeWithFullInfo(chunkBuffer, 
chunkMetadata);
-        currentPair = new Pair<>(devicePath, chunkMetadata);
-      }
+      ChunkMetadata chunkMetadata = new ChunkMetadata();
+      String seriesPath = ChunkMetadata.deserializeWithFullInfo(chunkBuffer, 
chunkMetadata);
+      currentPair = new Pair<>(seriesPath, chunkMetadata);
       return currentPair;
     }
 
     public String getAllChunkMetadataForNextSeries(List<IChunkMetadata> 
iChunkMetadataList)
         throws IOException {
+      // TODO: read all the chunk metadata of a single series once instead of 
reading it iteratively
       if (currentPair == null) {
         if (!hasNextChunkMetadata()) {
           return null;
@@ -400,13 +372,6 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
       return currentPair;
     }
 
-    private byte readNextChunkMetadataType() throws IOException {
-      typeBuffer.clear();
-      ReadWriteIOUtils.readAsPossible(input, typeBuffer);
-      typeBuffer.flip();
-      return ReadWriteIOUtils.readByte(typeBuffer);
-    }
-
     private int readNextChunkMetadataSize() throws IOException {
       sizeBuffer.clear();
       ReadWriteIOUtils.readAsPossible(input, sizeBuffer);
diff --git 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
index 43333a6e02..c97a9a0774 100644
--- 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
+++ 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
@@ -63,6 +63,12 @@ import java.util.Map;
 public class TsFileIntegrityCheckingTool {
   private static Logger LOG = 
LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class);
 
+  /**
+   * This method check the integrity of file by reading it from the start to 
the end. It mainly
+   * checks the integrity of the chunks.
+   *
+   * @param filename
+   */
   public static void checkIntegrityBySequenceRead(String filename) {
     try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
       String headMagicString = reader.readHeadMagic();
@@ -141,6 +147,16 @@ public class TsFileIntegrityCheckingTool {
     }
   }
 
+  /**
+   * This method checks the integrity of the file by mimicking the process of 
the query, which reads
+   * the metadata index tree first, and get the timeseries metadata list and 
chunk metadata list.
+   * After that, this method acquires single chunk according to chunk 
metadata, then it deserializes
+   * the chunk, and verifies the correctness of the data.
+   *
+   * @param filename File to be check
+   * @param originData The origin data in a map format, Device -> SeriesId -> 
List<List<Time,Val>>,
+   *     each inner list stands for a chunk.
+   */
   public static void checkIntegrityByQuery(
       String filename,
       Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> 
originData) {
@@ -148,6 +164,7 @@ public class TsFileIntegrityCheckingTool {
       Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
           reader.getAllTimeseriesMetadata(true);
       Assert.assertEquals(originData.size(), allTimeseriesMetadata.size());
+      // check each series
       for (Map.Entry<String, List<TimeseriesMetadata>> entry : 
allTimeseriesMetadata.entrySet()) {
         String deviceId = entry.getKey();
         List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
@@ -163,6 +180,7 @@ public class TsFileIntegrityCheckingTool {
         if (!vectorMode) {
           // check integrity of not aligned series
           for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) 
{
+            // get its chunk metadata list, and read the chunk
             String measurementId = timeseriesMetadata.getMeasurementId();
             List<List<Pair<Long, TsPrimitiveType>>> originChunks =
                 originData.get(deviceId).get(measurementId);
@@ -173,29 +191,27 @@ public class TsFileIntegrityCheckingTool {
               Chunk chunk = reader.readMemChunk((ChunkMetadata) 
chunkMetadataList.get(i));
               ChunkReader chunkReader = new ChunkReader(chunk, null);
               List<Pair<Long, TsPrimitiveType>> originValue = 
originChunks.get(i);
+              // deserialize the chunk and verify it with origin data
               for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
                 IPointReader pointReader = 
chunkReader.nextPageData().getBatchDataIterator();
                 while (pointReader.hasNextTimeValuePair()) {
                   TimeValuePair pair = pointReader.nextTimeValuePair();
                   Assert.assertEquals(
                       originValue.get(valIdx).left.longValue(), 
pair.getTimestamp());
-                  try {
-                    Assert.assertEquals(originValue.get(valIdx++).right, 
pair.getValue());
-                  } catch (Throwable e) {
-                    System.out.println();
-                  }
+                  Assert.assertEquals(originValue.get(valIdx++).right, 
pair.getValue());
                 }
               }
             }
           }
         } else {
           // check integrity of vector type
-          // 1. check the time column
+          // get the timeseries metadata of the time column
           TimeseriesMetadata timeColumnMetadata = 
timeseriesMetadataList.get(0);
           List<IChunkMetadata> timeChunkMetadataList = 
timeColumnMetadata.getChunkMetadataList();
           
timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
 
           for (int i = 1; i < timeseriesMetadataList.size(); ++i) {
+            // traverse each value column
             List<IChunkMetadata> valueChunkMetadataList =
                 timeseriesMetadataList.get(i).getChunkMetadataList();
             Assert.assertEquals(timeChunkMetadataList.size(), 
valueChunkMetadataList.size());
@@ -206,8 +222,10 @@ public class TsFileIntegrityCheckingTool {
                   reader.readMemChunk((ChunkMetadata) 
timeChunkMetadataList.get(chunkIdx));
               Chunk valueChunk =
                   reader.readMemChunk((ChunkMetadata) 
valueChunkMetadataList.get(chunkIdx));
+              // construct an aligned chunk reader using time chunk and value 
chunk
               IChunkReader chunkReader =
                   new AlignedChunkReader(timeChunk, 
Collections.singletonList(valueChunk), null);
+              // verify the values
               List<Pair<Long, TsPrimitiveType>> originValue = 
originDataChunks.get(chunkIdx);
               for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
                 IBatchDataIterator pointReader = 
chunkReader.nextPageData().getBatchDataIterator();

Reply via email to