This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch serial_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f94b072c31c88bea8500976b23770c241b6fb500
Author: JackieTien97 <[email protected]>
AuthorDate: Sat Jan 30 09:24:41 2021 +0800

    init
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 248 +++++----------------
 .../writelog/recover/TsFileRecoverPerformer.java   |   3 -
 2 files changed, 61 insertions(+), 190 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index c643f59..1cb13b9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,21 +19,15 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
-import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -44,25 +38,12 @@ import org.slf4j.LoggerFactory;
 public class MemTableFlushTask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MemTableFlushTask.class);
-  private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = 
FlushSubTaskPoolManager
-      .getInstance();
-  private static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
-  private final Future<?> encodingTaskFuture;
-  private final Future<?> ioTaskFuture;
-  private RestorableTsFileIOWriter writer;
-
-  private final LinkedBlockingQueue<Object> encodingTaskQueue = new 
LinkedBlockingQueue<>();
-  private final LinkedBlockingQueue<Object> ioTaskQueue = 
(config.isEnableMemControl()
-      && SystemInfo.getInstance().isEncodingFasterThanIo())
-          ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
-          : new LinkedBlockingQueue<>();
-
-  private String storageGroup;
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private final RestorableTsFileIOWriter writer;
 
-  private IMemTable memTable;
+  private final String storageGroup;
 
-  private volatile long memSerializeTime = 0L;
-  private volatile long ioTime = 0L;
+  private final IMemTable memTable;
 
   /**
    * @param memTable the memTable to flush
@@ -74,8 +55,6 @@ public class MemTableFlushTask {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
-    this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
-    this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
     LOGGER.debug("flush task of Storage group {} memtable is created, flushing 
to file {}.",
               storageGroup, writer.getFile().getName());
   }
@@ -83,8 +62,7 @@ public class MemTableFlushTask {
   /**
    * the function for flushing memtable.
    */
-  public void syncFlushMemTable()
-      throws ExecutionException, InterruptedException {
+  public void syncFlushMemTable() throws ExecutionException, IOException {
     LOGGER.info("The memTable size of SG {} is {}, the avg series points num 
in chunk is {} ",
         storageGroup,
         memTable.memSize(),
@@ -98,10 +76,12 @@ public class MemTableFlushTask {
     }
     long start = System.currentTimeMillis();
     long sortTime = 0;
+    long encodingTime = 0;
+    long ioTime = 0;
 
     //for map do not use get(key) to iteratate
     for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : 
memTable.getMemTableMap().entrySet()) {
-      encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
+      this.writer.startChunkGroup(memTableEntry.getKey());
 
       final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : 
value.entrySet()) {
@@ -109,25 +89,36 @@ public class MemTableFlushTask {
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
         MeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVListForFlush();
-        sortTime += System.currentTimeMillis() - startTime;
-        encodingTaskQueue.put(new Pair<>(tvList, desc));
+        long encodingStartTime = System.currentTimeMillis();
+        sortTime += encodingStartTime - startTime;
+        IChunkWriter seriesWriter = new ChunkWriterImpl(desc);
+        writeOneSeries(tvList, seriesWriter, desc.getType());
+        seriesWriter.sealCurrentPage();
+        seriesWriter.clearPageWriter();
+        long ioStartTime = System.currentTimeMillis();
+        encodingTime += ioStartTime - encodingStartTime;
+        seriesWriter.writeToFileWriter(this.writer);
+        ioTime += System.currentTimeMillis() - ioStartTime;
       }
-
-      encodingTaskQueue.put(new EndChunkGroupIoTask());
+      long ioStartTime = System.currentTimeMillis();
+      this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+      this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+      this.writer.endChunkGroup();
+      ioTime += System.currentTimeMillis() - ioStartTime;
     }
-    encodingTaskQueue.put(new TaskEnd());
-    LOGGER.debug(
+
+    LOGGER.info(
         "Storage group {} memtable flushing into file {}: data sort time cost 
{} ms.",
         storageGroup, writer.getFile().getName(), sortTime);
 
-    try {
-      encodingTaskFuture.get();
-    } catch (InterruptedException | ExecutionException e) {
-      ioTaskFuture.cancel(true);
-      throw e;
-    }
+    LOGGER.info(
+        "Storage group {} memtable flushing into file {}: data encoding time 
cost {} ms.",
+        storageGroup, writer.getFile().getName(), encodingTime);
+
+    LOGGER.info(
+        "Storage group {} memtable flushing into file {}: disk io time cost {} 
ms.",
+        storageGroup, writer.getFile().getName(), ioTime);
 
-    ioTaskFuture.get();
 
     try {
       writer.writePlanIndices();
@@ -139,7 +130,7 @@ public class MemTableFlushTask {
       if (estimatedTemporaryMemSize != 0) {
         
SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
       }
-      SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= 
memSerializeTime);
+      SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= encodingTime);
     }
 
     LOGGER.info(
@@ -147,157 +138,40 @@ public class MemTableFlushTask {
         storageGroup, memTable, System.currentTimeMillis() - start);
   }
 
-  private Runnable encodingTask = new Runnable() {
-    private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
-        TSDataType dataType) {
-      for (int i = 0; i < tvPairs.size(); i++) {
-        long time = tvPairs.getTime(i);
+  private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
+      TSDataType dataType) {
+    for (int i = 0; i < tvPairs.size(); i++) {
+      long time = tvPairs.getTime(i);
 
-        // skip duplicated data
-        if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
-          continue;
-        }
-
-        switch (dataType) {
-          case BOOLEAN:
-            seriesWriterImpl.write(time, tvPairs.getBoolean(i));
-            break;
-          case INT32:
-            seriesWriterImpl.write(time, tvPairs.getInt(i));
-            break;
-          case INT64:
-            seriesWriterImpl.write(time, tvPairs.getLong(i));
-            break;
-          case FLOAT:
-            seriesWriterImpl.write(time, tvPairs.getFloat(i));
-            break;
-          case DOUBLE:
-            seriesWriterImpl.write(time, tvPairs.getDouble(i));
-            break;
-          case TEXT:
-            seriesWriterImpl.write(time, tvPairs.getBinary(i));
-            break;
-          default:
-            LOGGER.error("Storage group {} does not support data type: {}", 
storageGroup,
-                dataType);
-            break;
-        }
+      // skip duplicated data
+      if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
+        continue;
       }
-    }
 
-    @SuppressWarnings("squid:S135")
-    @Override
-    public void run() {
-      LOGGER.debug("Storage group {} memtable flushing to file {} starts to 
encoding data.",
-          storageGroup, writer.getFile().getName());
-      while (true) {
-
-        Object task = null;
-        try {
-          task = encodingTaskQueue.take();
-        } catch (InterruptedException e1) {
-          LOGGER.error("Take task into ioTaskQueue Interrupted");
-          Thread.currentThread().interrupt();
+      switch (dataType) {
+        case BOOLEAN:
+          seriesWriterImpl.write(time, tvPairs.getBoolean(i));
           break;
-        }
-        if (task instanceof StartFlushGroupIOTask || task instanceof 
EndChunkGroupIoTask) {
-          try {
-            ioTaskQueue.put(task);
-          } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
-            LOGGER.error("Storage group {} memtable flushing to file {}, 
encoding task is interrupted.",
-                storageGroup, writer.getFile().getName(), e);
-            // generally it is because the thread pool is shutdown so the task 
should be aborted
-            break;
-          }
-        } else if (task instanceof TaskEnd) {
+        case INT32:
+          seriesWriterImpl.write(time, tvPairs.getInt(i));
           break;
-        } else {
-          long starTime = System.currentTimeMillis();
-          Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, 
MeasurementSchema>) task;
-          IChunkWriter seriesWriter = new 
ChunkWriterImpl(encodingMessage.right);
-          writeOneSeries(encodingMessage.left, seriesWriter, 
encodingMessage.right.getType());
-          seriesWriter.sealCurrentPage();
-          seriesWriter.clearPageWriter();
-          try {
-            ioTaskQueue.put(seriesWriter);
-          } catch (InterruptedException e) {
-            LOGGER.error("Put task into ioTaskQueue Interrupted");
-            Thread.currentThread().interrupt();
-          }
-          memSerializeTime += System.currentTimeMillis() - starTime;
-        }
-      }
-      try {
-        ioTaskQueue.put(new TaskEnd());
-      } catch (InterruptedException e) {
-        LOGGER.error("Put task into ioTaskQueue Interrupted");
-        Thread.currentThread().interrupt();
-      }
-      
-      LOGGER.debug("Storage group {}, flushing memtable {} into disk: Encoding 
data cost "
-          + "{} ms.",
-          storageGroup, writer.getFile().getName(), memSerializeTime);
-    }
-  };
-
-  @SuppressWarnings("squid:S135")
-  private Runnable ioTask = () -> {
-    LOGGER.debug("Storage group {} memtable flushing to file {} start io.",
-        storageGroup, writer.getFile().getName());
-    while (true) {
-      Object ioMessage = null;
-      try {
-        ioMessage = ioTaskQueue.take();
-      } catch (InterruptedException e1) {
-        LOGGER.error("take task from ioTaskQueue Interrupted");
-        Thread.currentThread().interrupt();
-        break;
-      }
-      long starTime = System.currentTimeMillis();
-      try {
-        if (ioMessage instanceof StartFlushGroupIOTask) {
-          this.writer.startChunkGroup(((StartFlushGroupIOTask) 
ioMessage).deviceId);
-        } else if (ioMessage instanceof TaskEnd) {
+        case INT64:
+          seriesWriterImpl.write(time, tvPairs.getLong(i));
+          break;
+        case FLOAT:
+          seriesWriterImpl.write(time, tvPairs.getFloat(i));
+          break;
+        case DOUBLE:
+          seriesWriterImpl.write(time, tvPairs.getDouble(i));
+          break;
+        case TEXT:
+          seriesWriterImpl.write(time, tvPairs.getBinary(i));
+          break;
+        default:
+          LOGGER.error("Storage group {} does not support data type: {}", 
storageGroup,
+              dataType);
           break;
-        } else if (ioMessage instanceof IChunkWriter) {
-          ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
-          chunkWriter.writeToFileWriter(this.writer);
-        } else {
-          this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
-          this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
-          this.writer.endChunkGroup();
-        }
-      } catch (IOException e) {
-        LOGGER.error("Storage group {} memtable {}, io task meets error.", 
storageGroup,
-            memTable, e);
-        throw new FlushRunTimeException(e);
       }
-      ioTime += System.currentTimeMillis() - starTime;
-    }
-    LOGGER.debug("flushing a memtable to file {} in storage group {}, io cost 
{}ms",
-            writer.getFile().getName(), storageGroup, ioTime);
-  };
-
-  static class TaskEnd {
-
-    TaskEnd() {
-
-    }
-  }
-
-  static class EndChunkGroupIoTask {
-
-    EndChunkGroupIoTask() {
-
-    }
-  }
-
-  static class StartFlushGroupIOTask {
-
-    private final String deviceId;
-
-    StartFlushGroupIOTask(String deviceId) {
-      this.deviceId = deviceId;
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index b2f2bcb..e46ddd1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -222,9 +222,6 @@ public class TsFileRecoverPerformer {
       // into it
     } catch (IOException | ExecutionException e) {
       throw new StorageGroupProcessorException(e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new StorageGroupProcessorException(e);
     }
   }
 

Reply via email to