This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
     new eda85f5ad2 temp
eda85f5ad2 is described below

commit eda85f5ad2cb6dbac82e920d8e445d22f60597cc
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Sep 6 15:22:26 2022 +0800

    temp
---
 .../write/writer/MemoryControlTsFileIOWriter.java  | 224 ++++++++-------------
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  16 +-
 .../writer/MemoryControlTsFileIOWriterTest.java    |  12 +-
 3 files changed, 104 insertions(+), 148 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
index e84970ced1..371f4d2ae7 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.tsfile.write.writer;
 
+import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
@@ -29,7 +31,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,8 +40,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -51,33 +50,42 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
   protected long currentChunkMetadataSize = 0L;
   protected File chunkMetadataTempFile;
   protected LocalTsFileOutput tempOutput;
-  protected final boolean needSort;
-  protected Queue<Long> sortedSegmentPosition = new ArrayDeque<>();
+  protected final boolean autoControl;
+  // it stores the start address of persisted chunk metadata for per series
+  protected Queue<Long> segmentForPerSeries = new ArrayDeque<>();
+  protected String currentSeries = null;
 
   public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
-  private static final String SORTING_TEMP_FILE = ".scmt";
   private static final byte VECTOR_TYPE = 1;
   private static final byte NORMAL_TYPE = 2;
 
-  public MemoryControlTsFileIOWriter(File file, long maxMetadataSize, boolean 
needSort)
+  public MemoryControlTsFileIOWriter(File file, long maxMetadataSize, boolean 
autoControl)
       throws IOException {
     super(file);
     this.maxMetadataSize = maxMetadataSize;
     this.chunkMetadataTempFile = new File(file.getAbsoluteFile() + 
CHUNK_METADATA_TEMP_FILE_PREFIX);
-    this.needSort = needSort;
+    this.autoControl = autoControl;
   }
 
   @Override
   public void endCurrentChunk() {
     currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
     super.endCurrentChunk();
+    if (this.autoControl) {
+      checkMetadataSizeAndMayFlush();
+    }
+  }
+
+  public boolean checkMetadataSizeAndMayFlush() {
     if (currentChunkMetadataSize > maxMetadataSize) {
       try {
         sortAndFlushChunkMetadata();
+        return true;
       } catch (IOException e) {
         LOG.error("Meets exception when flushing metadata to temp files", e);
       }
     }
+    return false;
   }
 
   protected void sortAndFlushChunkMetadata() throws IOException {
@@ -86,10 +94,10 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
     if (tempOutput == null) {
       tempOutput = new LocalTsFileOutput(new 
FileOutputStream(chunkMetadataTempFile));
     }
-    sortedSegmentPosition.add(tempOutput.getPosition());
     // the file structure in temp file will be
     // ChunkType | chunkSize | chunkBuffer
     for (Map.Entry<Path, List<IChunkMetadata>> entry : 
chunkMetadataListMap.entrySet()) {
+      segmentForPerSeries.add(tempOutput.getPosition());
       Path seriesPath = entry.getKey();
       List<IChunkMetadata> iChunkMetadataList = entry.getValue();
       writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
@@ -103,27 +111,33 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
       return;
     }
     if (iChunkMetadataList.get(0).getDataType() == TSDataType.VECTOR) {
-      IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
-      List<IChunkMetadata> currentValueChunk = new ArrayList<>();
-      List<IChunkMetadata> alignedChunkMetadata = new ArrayList<>();
-      for (int i = 1; i < iChunkMetadataList.size(); ++i) {
-        if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
-          alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk, 
currentValueChunk));
-          currentTimeChunk = iChunkMetadataList.get(i);
-          currentValueChunk = new ArrayList<>();
-        } else {
-          currentValueChunk.add(iChunkMetadataList.get(i));
-        }
-      }
-      if (currentValueChunk.size() > 0) {
-        alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk, 
currentValueChunk));
-      }
+      // pack the TimeChunkMetadata and List<ValueChunkMetadata> into 
List<AlignedChunkMetadata>
+      List<IChunkMetadata> alignedChunkMetadata = 
packAlignedChunkMetadata(iChunkMetadataList);
       writeAlignedChunkMetadata(alignedChunkMetadata, seriesPath, output);
     } else {
       writeNormalChunkMetadata(iChunkMetadataList, seriesPath, output);
     }
   }
 
+  private List<IChunkMetadata> packAlignedChunkMetadata(List<IChunkMetadata> 
iChunkMetadataList) {
+    IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
+    List<IChunkMetadata> currentValueChunk = new ArrayList<>();
+    List<IChunkMetadata> alignedChunkMetadata = new ArrayList<>();
+    for (int i = 1; i < iChunkMetadataList.size(); ++i) {
+      if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
+        alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk, 
currentValueChunk));
+        currentTimeChunk = iChunkMetadataList.get(i);
+        currentValueChunk = new ArrayList<>();
+      } else {
+        currentValueChunk.add(iChunkMetadataList.get(i));
+      }
+    }
+    if (currentValueChunk.size() > 0) {
+      alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk, 
currentValueChunk));
+    }
+    return alignedChunkMetadata;
+  }
+
   private void writeAlignedChunkMetadata(
       List<IChunkMetadata> iChunkMetadataList, Path seriesPath, 
LocalTsFileOutput output)
       throws IOException {
@@ -150,112 +164,60 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
 
   @Override
   public void endFile() throws IOException {
-    if (this.sortedSegmentPosition.size() > 0) {
+    if (this.segmentForPerSeries.size() > 0) {
       // there is some chunk metadata already been written to the disk
+      // first we should flush the remaining chunk metadata in memory to disk
+      // then read the persisted chunk metadata from disk
       sortAndFlushChunkMetadata();
       tempOutput.close();
     } else {
-      // sort the chunk metadata in memory, and just close the file
+      // sort the chunk metadata in memory, construct the index tree
+      // and just close the file
       tempOutput.close();
       super.endFile();
       return;
     }
 
-    if (needSort) {
-      externalSort();
-    }
-
-    //         super.endFile();
+    // read in the chunk metadata, and construct the index tree
+    readChunkMetadataAndConstructIndexTree();
   }
 
-  protected void externalSort() throws IOException {
-    ChunkMetadataComparator comparator = new ChunkMetadataComparator();
-    int totalSegmentCount = this.sortedSegmentPosition.size();
-    File currentInFile = this.chunkMetadataTempFile;
-    File currentOutFile = new File(this.file.getAbsolutePath() + 
SORTING_TEMP_FILE);
-    LocalTsFileInput inputForWindow1 = null;
-    LocalTsFileInput inputForWindow2 = null;
-    LocalTsFileOutput output = null;
-    while (totalSegmentCount > 1) {
-      try {
-        inputForWindow1 = new LocalTsFileInput(currentInFile.toPath());
-        inputForWindow2 = new LocalTsFileInput(currentInFile.toPath());
-        output = new LocalTsFileOutput(new FileOutputStream(currentOutFile));
-        totalSegmentCount = 0;
-        Queue<Long> newSortedSegmentPosition = new ArrayDeque<>();
-        while (sortedSegmentPosition.size() > 0) {
-          long startPositionForWindow1 = sortedSegmentPosition.poll();
-          if (sortedSegmentPosition.size() == 0) {
-            // Just leave it alone, and record the position
-            newSortedSegmentPosition.add(startPositionForWindow1);
-            continue;
-          }
-          long startPositionForWindow2 = sortedSegmentPosition.poll();
-          ChunkMetadataExternalSortWindow firstWindow =
-              new ChunkMetadataExternalSortWindow(
-                  startPositionForWindow1, startPositionForWindow2, 
inputForWindow1);
-          ChunkMetadataExternalSortWindow secondWindow =
-              new ChunkMetadataExternalSortWindow(
-                  startPositionForWindow2,
-                  sortedSegmentPosition.size() > 0
-                      ? sortedSegmentPosition.element()
-                      : this.chunkMetadataTempFile.length(),
-                  inputForWindow2);
-          firstWindow.getNextSeriesNameAndChunkMetadata();
-          secondWindow.getNextSeriesNameAndChunkMetadata();
-          newSortedSegmentPosition.add(output.getPosition());
-          while (firstWindow.hasNextChunkMetadata() && 
secondWindow.hasNextChunkMetadata()) {
-            Pair<String, IChunkMetadata> pairOfFirstWindow =
-                firstWindow.getCurrentSeriesNameAndChunkMetadata();
-            Pair<String, IChunkMetadata> pairOfSecondWindow =
-                secondWindow.getCurrentSeriesNameAndChunkMetadata();
-            Pair<String, IChunkMetadata> pairToWritten = null;
-            if (comparator.compare(pairOfFirstWindow, pairOfSecondWindow) < 0) 
{
-              pairToWritten = pairOfFirstWindow;
-              if (firstWindow.hasNextChunkMetadata()) {
-                firstWindow.getNextSeriesNameAndChunkMetadata();
-              }
-            } else {
-              pairToWritten = pairOfSecondWindow;
-              if (secondWindow.hasNextChunkMetadata()) {
-                secondWindow.getNextSeriesNameAndChunkMetadata();
-              }
-            }
-            // serialize the chunk to the output
-            if (pairToWritten.right instanceof AlignedChunkMetadata) {
-              writeAlignedChunkMetadata(
-                  Collections.singletonList(pairToWritten.right),
-                  new Path(pairToWritten.left),
-                  output);
-            } else {
-              writeNormalChunkMetadata(
-                  Collections.singletonList(pairToWritten.right),
-                  new Path(pairToWritten.left),
-                  output);
-            }
-          }
-        }
+  private void readChunkMetadataAndConstructIndexTree() throws IOException {
+    tempOutput.close();
+    long metaOffset = out.getPosition();
 
-        output.close();
-        inputForWindow1.close();
-        inputForWindow2.close();
-        FileUtils.delete(currentInFile);
-        currentOutFile.renameTo(currentInFile);
-        File tempFile = currentOutFile;
-        currentOutFile = currentInFile;
-        currentInFile = tempFile;
-      } finally {
-        if (inputForWindow1 != null) {
-          inputForWindow1.close();
-        }
-        if (inputForWindow2 != null) {
-          inputForWindow2.close();
-        }
-        if (output != null) {
-          output.close();
-        }
-      }
+    // serialize the SEPARATOR of MetaData
+    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+    ChunkMetadataReadIterator iterator =
+        new ChunkMetadataReadIterator(
+            0,
+            chunkMetadataTempFile.length(),
+            new LocalTsFileInput(chunkMetadataTempFile.toPath()));
+    while (iterator.hasNextChunkMetadata()) {
+      // 1. read in all chunk metadata of one series
+      // 2. construct the timeseries metadata for this series
+      // 3. construct the index tree node for the series
+      // 4. serialize the timeseries metadata to file
+      TimeseriesMetadata timeseriesMetadata = readTimeseriesMetadata(iterator);
+    }
+  }
+
+  private TimeseriesMetadata readTimeseriesMetadata(ChunkMetadataReadIterator 
iterator)
+      throws IOException {
+    Pair<String, IChunkMetadata> currentPair = iterator.getCurrentPair();
+    if (currentPair == null) {
+      currentPair = iterator.getNextSeriesNameAndChunkMetadata();
+    }
+    if (!currentPair.left.equals(currentSeries)) {
+      // come to a new series
+      currentSeries = currentPair.left;
+    }
+    List<IChunkMetadata> iChunkMetadataList = new ArrayList<>();
+    while (currentPair != null && currentPair.left.equals(currentSeries)) {
+      iChunkMetadataList.add(currentPair.right);
+      currentPair = iterator.getNextSeriesNameAndChunkMetadata();
     }
+    return super.constructOneTimeseriesMetadata(new Path(currentSeries), 
iChunkMetadataList, false);
   }
 
   @Override
@@ -266,32 +228,16 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
     }
   }
 
-  protected static class ChunkMetadataComparator
-      implements Comparator<Pair<String, IChunkMetadata>> {
-
-    @Override
-    public int compare(Pair<String, IChunkMetadata> o1, Pair<String, 
IChunkMetadata> o2) {
-      String seriesNameOfO1 = o1.left;
-      String seriesNameOfO2 = o2.left;
-      int lexicographicalOrder = seriesNameOfO1.compareTo(seriesNameOfO2);
-      if (lexicographicalOrder != 0) {
-        return lexicographicalOrder;
-      } else {
-        return Long.compare(o1.right.getStartTime(), o2.right.getStartTime());
-      }
-    }
-  }
-
-  protected class ChunkMetadataExternalSortWindow {
+  protected class ChunkMetadataReadIterator {
 
     final LocalTsFileInput input;
     final long startPosition;
     final long endPosition;
     final ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
     final ByteBuffer typeBuffer = ByteBuffer.allocate(1);
-    Pair<String, IChunkMetadata> currentPair = null;
+    private Pair<String, IChunkMetadata> currentPair = null;
 
-    ChunkMetadataExternalSortWindow(long startPosition, long endPosition, 
LocalTsFileInput input)
+    ChunkMetadataReadIterator(long startPosition, long endPosition, 
LocalTsFileInput input)
         throws IOException {
       this.startPosition = startPosition;
       this.endPosition = endPosition;
@@ -304,6 +250,10 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
     }
 
     public Pair<String, IChunkMetadata> getNextSeriesNameAndChunkMetadata() 
throws IOException {
+      if (input.position() >= endPosition) {
+        currentPair = null;
+        return null;
+      }
       byte type = readNextChunkMetadataType();
       int size = readNextChunkMetadataSize();
       ByteBuffer chunkBuffer = ByteBuffer.allocate(size);
@@ -322,7 +272,7 @@ public class MemoryControlTsFileIOWriter extends 
TsFileIOWriter {
       return currentPair;
     }
 
-    public Pair<String, IChunkMetadata> getCurrentSeriesNameAndChunkMetadata() 
{
+    public Pair<String, IChunkMetadata> getCurrentPair() {
       return currentPair;
     }
 
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 59a8ec236d..c34ab1df63 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -327,7 +327,7 @@ public class TsFileIOWriter implements AutoCloseable {
     // create device -> TimeseriesMetaDataList Map
     for (Map.Entry<Path, List<IChunkMetadata>> entry : 
chunkMetadataListMap.entrySet()) {
       // for ordinary path
-      flushOneChunkMetadata(entry.getKey(), entry.getValue());
+      constructOneTimeseriesMetadata(entry.getKey(), entry.getValue(), true);
     }
 
     // construct TsFileMetadata and return
@@ -339,8 +339,11 @@ public class TsFileIOWriter implements AutoCloseable {
    *
    * @param path Path of chunk
    * @param chunkMetadataList List of chunkMetadata about path(previous param)
+   * @param needRecordInMap need to record the timeseries metadata in 
deviceTimeseriesMetadataMap
+   * @return the constructed TimeseriesMetadata
    */
-  private void flushOneChunkMetadata(Path path, List<IChunkMetadata> 
chunkMetadataList)
+  protected TimeseriesMetadata constructOneTimeseriesMetadata(
+      Path path, List<IChunkMetadata> chunkMetadataList, boolean 
needRecordInMap)
       throws IOException {
     // create TimeseriesMetaData
     PublicBAOS publicBAOS = new PublicBAOS();
@@ -367,9 +370,12 @@ public class TsFileIOWriter implements AutoCloseable {
             dataType,
             seriesStatistics,
             publicBAOS);
-    deviceTimeseriesMetadataMap
-        .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
-        .add(timeseriesMetadata);
+    if (needRecordInMap) {
+      deviceTimeseriesMetadataMap
+          .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+          .add(timeseriesMetadata);
+    }
+    return timeseriesMetadata;
   }
 
   /**
diff --git 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
index 25bd192974..392a699222 100644
--- 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
+++ 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
@@ -101,9 +101,9 @@ public class MemoryControlTsFileIOWriterTest extends 
MemoryControlTsFileIOWriter
       writer.sortAndFlushChunkMetadata();
       writer.tempOutput.flush();
 
-      ChunkMetadataExternalSortWindow window =
+      ChunkMetadataReadIterator window =
           writer
-          .new ChunkMetadataExternalSortWindow(
+          .new ChunkMetadataReadIterator(
               0,
               writer.chunkMetadataTempFile.length(),
               new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
@@ -157,9 +157,9 @@ public class MemoryControlTsFileIOWriterTest extends 
MemoryControlTsFileIOWriter
             new AlignedChunkMetadata(currentTimeChunkMetadata, 
currentValueChunkMetadata));
       }
 
-      ChunkMetadataExternalSortWindow window =
+      ChunkMetadataReadIterator window =
           writer
-          .new ChunkMetadataExternalSortWindow(
+          .new ChunkMetadataReadIterator(
               0,
               writer.chunkMetadataTempFile.length(),
               new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
@@ -226,9 +226,9 @@ public class MemoryControlTsFileIOWriterTest extends 
MemoryControlTsFileIOWriter
       writer.sortAndFlushChunkMetadata();
       writer.tempOutput.flush();
 
-      ChunkMetadataExternalSortWindow window =
+      ChunkMetadataReadIterator window =
           writer
-          .new ChunkMetadataExternalSortWindow(
+          .new ChunkMetadataReadIterator(
               0,
               writer.chunkMetadataTempFile.length(),
               new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));

Reply via email to