This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch two_stage_pipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b79643ee16de2ced1faa29cb70493dff95e3362f
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jan 27 12:37:39 2021 +0800

    init
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 145 +++++++--------------
 1 file changed, 48 insertions(+), 97 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index b64fd8f..cb72070 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -43,17 +42,14 @@ public class MemTableFlushTask {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MemTableFlushTask.class);
   private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = 
FlushSubTaskPoolManager
       .getInstance();
-  private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
 
   private final ConcurrentLinkedQueue<Object> ioTaskQueue = new 
ConcurrentLinkedQueue<>();
-  private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new 
ConcurrentLinkedQueue<>();
   private String storageGroup;
 
   private IMemTable memTable;
 
-  private volatile boolean noMoreEncodingTask = false;
   private volatile boolean noMoreIOTask = false;
 
   /**
@@ -66,7 +62,6 @@ public class MemTableFlushTask {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
-    this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
     this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
     LOGGER.debug("flush task of Storage group {} memtable {} is created ",
         storageGroup, memTable.getVersion());
@@ -83,10 +78,11 @@ public class MemTableFlushTask {
         memTable.getTotalPointsNum() / memTable.getSeriesNumber());
     long start = System.currentTimeMillis();
     long sortTime = 0;
+    long encodingTime = 0;
 
     //for map do not use get(key) to iteratate
     for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : 
memTable.getMemTableMap().entrySet()) {
-      encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
+      ioTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
 
       final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : 
value.entrySet()) {
@@ -94,26 +90,26 @@ public class MemTableFlushTask {
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
         MeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVListForFlush();
-        sortTime += System.currentTimeMillis() - startTime;
-        encodingTaskQueue.add(new Pair<>(tvList, desc));
+        long encodingStartTime = System.currentTimeMillis();
+        sortTime += encodingStartTime - startTime;
+        IChunkWriter seriesWriter = new ChunkWriterImpl(desc);
+        writeOneSeries(tvList, seriesWriter, desc.getType());
+        seriesWriter.sealCurrentPage();
+        seriesWriter.clearPageWriter();
+        encodingTime += (System.currentTimeMillis() - encodingStartTime);
+        ioTaskQueue.add(seriesWriter);
       }
 
-      encodingTaskQueue.add(new EndChunkGroupIoTask());
+      ioTaskQueue.add(new EndChunkGroupIoTask());
     }
 
-    noMoreEncodingTask = true;
+    noMoreIOTask = true;
     LOGGER.info(
         "Storage group {} memtable {}, flushing into disk: data sort time cost 
{} ms.",
         storageGroup, memTable.getVersion(), sortTime);
-
-    try {
-      encodingTaskFuture.get();
-    } catch (InterruptedException | ExecutionException e) {
-      // avoid ioTask waiting forever
-      noMoreIOTask = true;
-      ioTaskFuture.cancel(true);
-      throw e;
-    }
+    LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding 
data cost "
+            + "{} ms.",
+        storageGroup, memTable.getVersion(), encodingTime);
 
     ioTaskFuture.get();
 
@@ -127,91 +123,46 @@ public class MemTableFlushTask {
     LOGGER.info(
         "Storage group {} memtable {} flushing a memtable has finished! Time 
consumption: {}ms",
         storageGroup, memTable, System.currentTimeMillis() - start);
-  }
 
-  private Runnable encodingTask = new Runnable() {
-    private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
-        TSDataType dataType) {
-      for (int i = 0; i < tvPairs.size(); i++) {
-        long time = tvPairs.getTime(i);
 
-        // skip duplicated data
-        if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
-          continue;
-        }
+  }
 
-        switch (dataType) {
-          case BOOLEAN:
-            seriesWriterImpl.write(time, tvPairs.getBoolean(i));
-            break;
-          case INT32:
-            seriesWriterImpl.write(time, tvPairs.getInt(i));
-            break;
-          case INT64:
-            seriesWriterImpl.write(time, tvPairs.getLong(i));
-            break;
-          case FLOAT:
-            seriesWriterImpl.write(time, tvPairs.getFloat(i));
-            break;
-          case DOUBLE:
-            seriesWriterImpl.write(time, tvPairs.getDouble(i));
-            break;
-          case TEXT:
-            seriesWriterImpl.write(time, tvPairs.getBinary(i));
-            break;
-          default:
-            LOGGER.error("Storage group {} does not support data type: {}", 
storageGroup,
-                dataType);
-            break;
-        }
+  private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
+      TSDataType dataType) {
+    for (int i = 0; i < tvPairs.size(); i++) {
+      long time = tvPairs.getTime(i);
+
+      // skip duplicated data
+      if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
+        continue;
       }
-    }
 
-    @SuppressWarnings("squid:S135")
-    @Override
-    public void run() {
-      long memSerializeTime = 0;
-      boolean noMoreMessages = false;
-      LOGGER.debug("Storage group {} memtable {}, starts to encoding data.", 
storageGroup,
-          memTable.getVersion());
-      while (true) {
-        if (noMoreEncodingTask) {
-          noMoreMessages = true;
-        }
-        Object task = encodingTaskQueue.poll();
-        if (task == null) {
-          if (noMoreMessages) {
-            break;
-          }
-          try {
-            TimeUnit.MILLISECONDS.sleep(10);
-          } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
-            LOGGER.error("Storage group {} memtable {}, encoding task is 
interrupted.",
-                storageGroup, memTable.getVersion(), e);
-            // generally it is because the thread pool is shutdown so the task 
should be aborted
-            break;
-          }
-        } else {
-          if (task instanceof StartFlushGroupIOTask || task instanceof 
EndChunkGroupIoTask) {
-            ioTaskQueue.add(task);
-          } else {
-            Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, 
MeasurementSchema>) task;
-            long starTime = System.currentTimeMillis();
-            IChunkWriter seriesWriter = new 
ChunkWriterImpl(encodingMessage.right);
-            writeOneSeries(encodingMessage.left, seriesWriter, 
encodingMessage.right.getType());
-            seriesWriter.sealCurrentPage();
-            seriesWriter.clearPageWriter();
-            memSerializeTime += (System.currentTimeMillis() - starTime);
-            ioTaskQueue.add(seriesWriter);
-          }
-        }
+      switch (dataType) {
+        case BOOLEAN:
+          seriesWriterImpl.write(time, tvPairs.getBoolean(i));
+          break;
+        case INT32:
+          seriesWriterImpl.write(time, tvPairs.getInt(i));
+          break;
+        case INT64:
+          seriesWriterImpl.write(time, tvPairs.getLong(i));
+          break;
+        case FLOAT:
+          seriesWriterImpl.write(time, tvPairs.getFloat(i));
+          break;
+        case DOUBLE:
+          seriesWriterImpl.write(time, tvPairs.getDouble(i));
+          break;
+        case TEXT:
+          seriesWriterImpl.write(time, tvPairs.getBinary(i));
+          break;
+        default:
+          LOGGER.error("Storage group {} does not support data type: {}", 
storageGroup,
+              dataType);
+          break;
       }
-      noMoreIOTask = true;
-      LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding 
data cost "
-              + "{} ms.",
-          storageGroup, memTable.getVersion(), memSerializeTime);
     }
-  };
+  }
 
   @SuppressWarnings("squid:S135")
   private Runnable ioTask = () -> {

Reply via email to