This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_vector by this push:
     new 6ffb37c  add some interfaces in IWritableMemChunk for flushing
6ffb37c is described below

commit 6ffb37c147483ab99578fc5cc19bd761013150de
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Oct 28 11:20:07 2021 +0800

    add some interfaces in IWritableMemChunk for flushing
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 48 ++++++++--------------
 .../db/engine/memtable/IWritableMemChunk.java      |  7 +++-
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  9 +++-
 3 files changed, 29 insertions(+), 35 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 7962da4..3c29135 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -18,6 +18,13 @@
  */
 package org.apache.iotdb.db.engine.flush;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
@@ -28,24 +35,12 @@ import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.db.utils.datastructure.VectorTVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
 /**
  * flush task to flush one memtable using a pipeline model to flush, which is 
sort memtable ->
  * encoding -> write to disk (io task)
@@ -118,13 +113,12 @@ public class MemTableFlushTask {
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : 
value.entrySet()) {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
-        IMeasurementSchema desc = series.getSchema();
         /*
          * sort task (first task of flush pipeline)
          */
-        TVList tvList = series.getSortedTvListForFlush();
+        series.sortTvListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
-        encodingTaskQueue.put(new Pair<>(tvList, desc));
+        encodingTaskQueue.put(series);
       }
 
       encodingTaskQueue.put(new EndChunkGroupIoTask());
@@ -290,7 +284,7 @@ public class MemTableFlushTask {
               writer.getFile().getName());
           while (true) {
 
-            Object task = null;
+            Object task;
             try {
               task = encodingTaskQueue.take();
             } catch (InterruptedException e1) {
@@ -316,15 +310,9 @@ public class MemTableFlushTask {
               break;
             } else {
               long starTime = System.currentTimeMillis();
-              Pair<TVList, IMeasurementSchema> encodingMessage =
-                  (Pair<TVList, IMeasurementSchema>) task;
-              IChunkWriter seriesWriter;
-              if (encodingMessage.left.getDataType() == TSDataType.VECTOR) {
-                seriesWriter = new 
VectorChunkWriterImpl(encodingMessage.right);
-              } else {
-                seriesWriter = new ChunkWriterImpl(encodingMessage.right);
-              }
-              writeOneSeries(encodingMessage.left, seriesWriter, 
encodingMessage.right.getType());
+              IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
+              IChunkWriter seriesWriter = writableMemChunk.createIChunkWrite();
+              writableMemChunk.encode(seriesWriter);
               seriesWriter.sealCurrentPage();
               seriesWriter.clearPageWriter();
               try {
@@ -374,16 +362,12 @@ public class MemTableFlushTask {
               this.writer.startChunkGroup(((StartFlushGroupIOTask) 
ioMessage).deviceId);
             } else if (ioMessage instanceof TaskEnd) {
               break;
-            } else if (ioMessage instanceof ChunkWriterImpl) {
-              ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
-              chunkWriter.writeToFileWriter(this.writer);
-            } else if (ioMessage instanceof VectorChunkWriterImpl) {
-              VectorChunkWriterImpl chunkWriter = (VectorChunkWriterImpl) 
ioMessage;
-              chunkWriter.writeToFileWriter(this.writer);
-            } else {
+            } else if (ioMessage instanceof EndChunkGroupIoTask) {
               this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
               this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
               this.writer.endChunkGroup();
+            } else {
+              ((IChunkWriter) ioMessage).writeToFileWriter(this.writer);
             }
           } catch (IOException e) {
             LOGGER.error(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index b58f3b7..d6a3862 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.List;
@@ -101,7 +102,7 @@ public interface IWritableMemChunk {
    *
    * @return sorted tv list
    */
-  TVList getSortedTvListForFlush();
+  void sortTvListForFlush();
 
   default TVList getTVList() {
     return null;
@@ -116,4 +117,8 @@ public interface IWritableMemChunk {
 
   // For delete one column in the vector
   int delete(long lowerBound, long upperBound, int columnIndex);
+
+  IChunkWriter createIChunkWrite();
+
+  void encode(IChunkWriter chunkWriter);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 09b30e8..f7d359e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.List;
@@ -205,9 +206,8 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public synchronized TVList getSortedTvListForFlush() {
+  public synchronized void sortTvListForFlush() {
     sortTVList();
-    return list;
   }
 
   @Override
@@ -254,6 +254,11 @@ public class WritableMemChunk implements IWritableMemChunk 
{
   }
 
   @Override
+  public IChunkWriter createIChunkWrite() {
+    return null;
+  }
+
+  @Override
   public String toString() {
     int size = list.size();
     int firstIndex = 0;

Reply via email to