This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch encoding_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/encoding_parallel by this push:
     new a11e694  add multiplyflush task
a11e694 is described below

commit a11e694d3f1ad0fe56f4aa34b46d3d5c1ac66cc0
Author: xiangdong huang <[email protected]>
AuthorDate: Tue Mar 16 00:51:32 2021 +0800

    add multiplyflush task
---
 .../iotdb/db/engine/flush/IMemTableFlushTask.java  |  26 ++
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   3 +-
 .../engine/flush/MultiThreadMemTableFlushTask.java | 426 +++++++++++++++++++++
 .../db/engine/flush/MemTableFlushTaskTest.java     |  26 +-
 4 files changed, 468 insertions(+), 13 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/IMemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/IMemTableFlushTask.java
new file mode 100644
index 0000000..d7015c5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/IMemTableFlushTask.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush;
+
+import java.util.concurrent.ExecutionException;
+
+public interface IMemTableFlushTask {
+  /** the function for flushing memtable. */
+  void syncFlushMemTable() throws ExecutionException, InterruptedException;
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..27988ee 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -42,7 +42,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
-public class MemTableFlushTask {
+public class MemTableFlushTask implements IMemTableFlushTask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MemTableFlushTask.class);
   private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER =
@@ -84,6 +84,7 @@ public class MemTableFlushTask {
   }
 
   /** the function for flushing memtable. */
+  @Override
   public void syncFlushMemTable() throws ExecutionException, 
InterruptedException {
     LOGGER.info(
         "The memTable size of SG {} is {}, the avg series points num in chunk 
is {}, total timeseries number is {}",
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
new file mode 100644
index 0000000..93b66d3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
+import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiThreadMemTableFlushTask.class);
+  private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER =
+      FlushSubTaskPoolManager.getInstance();
+  private static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  // we have multiple thread to do the encoding Task.
+  private Future<?>[] encodingTaskFutures;
+  private final Future<?> ioTaskFuture;
+  private RestorableTsFileIOWriter writer;
+
+  int threadSize = 10;
+
+  private final LinkedBlockingQueue<Object>[] encodingTaskQueues =
+      new LinkedBlockingQueue[threadSize];
+  private LinkedBlockingQueue<Object>[] ioTaskQueues =
+      new LinkedBlockingQueue[threadSize]; // this initialization may be 
wasted.
+
+  {
+    if (config.isEnableMemControl() && 
SystemInfo.getInstance().isEncodingFasterThanIo()) {
+      for (int i = 0; i < threadSize; i++) {
+        ioTaskQueues[i] = new 
LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing());
+      }
+    }
+  }
+
+  private String storageGroup;
+
+  private IMemTable memTable;
+
+  private volatile long memSerializeTime = 0L;
+  private volatile long ioTime = 0L;
+
+  /**
+   * @param memTable the memTable to flush
+   * @param writer the writer where memTable will be flushed to (current 
tsfile writer or vm writer)
+   * @param storageGroup current storage group
+   */
+  public MultiThreadMemTableFlushTask(
+      IMemTable memTable, RestorableTsFileIOWriter writer, String 
storageGroup) {
+    this.memTable = memTable;
+    this.writer = writer;
+    this.storageGroup = storageGroup;
+    this.encodingTaskFutures = submitEncodingTasks();
+    this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
+    LOGGER.debug(
+        "flush task of Storage group {} memtable is created, flushing to file 
{}.",
+        storageGroup,
+        writer.getFile().getName());
+  }
+
+  /** the function for flushing memtable. */
+  @Override
+  public void syncFlushMemTable() throws ExecutionException, 
InterruptedException {
+    LOGGER.info(
+        "The memTable size of SG {} is {}, the avg series points num in chunk 
is {}, total timeseries number is {}",
+        storageGroup,
+        memTable.memSize(),
+        memTable.getTotalPointsNum() / memTable.getSeriesNumber(),
+        memTable.getSeriesNumber());
+
+    long estimatedTemporaryMemSize = 0L;
+    if (config.isEnableMemControl() && 
SystemInfo.getInstance().isEncodingFasterThanIo()) {
+      estimatedTemporaryMemSize =
+          memTable.memSize() / memTable.getSeriesNumber() * 
config.getIoTaskQueueSizeForFlushing();
+      
SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+    }
+    long start = System.currentTimeMillis();
+    long sortTime = 0;
+
+    // for map do not use get(key) to iteratate
+    int i = 0;
+    for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry :
+        memTable.getMemTableMap().entrySet()) {
+      encodingTaskQueues[i].put(new 
StartFlushGroupIOTask(memTableEntry.getKey()));
+
+      final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
+      for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : 
value.entrySet()) {
+        long startTime = System.currentTimeMillis();
+        IWritableMemChunk series = iWritableMemChunkEntry.getValue();
+        MeasurementSchema desc = series.getSchema();
+        TVList tvList = series.getSortedTVListForFlush();
+        sortTime += System.currentTimeMillis() - startTime;
+        encodingTaskQueues[i].put(new Pair<>(tvList, desc));
+      }
+
+      encodingTaskQueues[i].put(new EndChunkGroupIoTask());
+      i = (i + 1) % threadSize;
+    }
+    // every encoding task queue has a taskEnd marker
+    for (LinkedBlockingQueue encodingTaskQueue : encodingTaskQueues) {
+      encodingTaskQueue.put(new TaskEnd());
+    }
+    LOGGER.debug(
+        "Storage group {} memtable flushing into file {}: data sort time cost 
{} ms.",
+        storageGroup,
+        writer.getFile().getName(),
+        sortTime);
+
+    for (Future encodingTaskFuture : encodingTaskFutures) {
+      try {
+        encodingTaskFuture.get();
+      } catch (InterruptedException | ExecutionException e) {
+        // any failed encoding task will rollback the whole task
+        for (LinkedBlockingQueue encodingTaskQueue : encodingTaskQueues) {
+          encodingTaskQueue.clear();
+        }
+        ioTaskFuture.cancel(true);
+        throw e;
+      }
+    }
+
+    ioTaskFuture.get();
+
+    try {
+      writer.writePlanIndices();
+    } catch (IOException e) {
+      throw new ExecutionException(e);
+    }
+
+    if (config.isEnableMemControl()) {
+      if (estimatedTemporaryMemSize != 0) {
+        
SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+      }
+      SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= 
memSerializeTime);
+    }
+
+    LOGGER.info(
+        "Storage group {} memtable {} flushing a memtable has finished! Time 
consumption: {}ms",
+        storageGroup,
+        memTable,
+        System.currentTimeMillis() - start);
+  }
+
+  private Future[] submitEncodingTasks() {
+    Future[] futures = new Future[threadSize];
+    for (int i = 0; i < threadSize; i++) {
+      futures[i] =
+          SUB_TASK_POOL_MANAGER.submit(new EncodingTask(i, 
encodingTaskQueues[i], ioTaskQueues[i]));
+    }
+    return futures;
+  }
+
+  class EncodingTask implements Runnable {
+    LinkedBlockingQueue<Object> encodingTaskQueue;
+    LinkedBlockingQueue<Object> ioTaskQueue;
+    int threadNumber;
+
+    EncodingTask(
+        int threadNumber,
+        LinkedBlockingQueue<Object> encodingTaskQueue,
+        LinkedBlockingQueue<Object> ioTaskQueue) {
+      this.threadNumber = threadNumber;
+      this.encodingTaskQueue = encodingTaskQueue;
+      this.ioTaskQueue = ioTaskQueue;
+    }
+
+    private void writeOneSeries(
+        TVList tvPairs, IChunkWriter seriesWriterImpl, TSDataType dataType) {
+      for (int i = 0; i < tvPairs.size(); i++) {
+        long time = tvPairs.getTime(i);
+
+        // skip duplicated data
+        if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
+          continue;
+        }
+
+        // store last point for SDT
+        if (i + 1 == tvPairs.size()) {
+          ((ChunkWriterImpl) seriesWriterImpl).setLastPoint(true);
+        }
+
+        switch (dataType) {
+          case BOOLEAN:
+            seriesWriterImpl.write(time, tvPairs.getBoolean(i));
+            break;
+          case INT32:
+            seriesWriterImpl.write(time, tvPairs.getInt(i));
+            break;
+          case INT64:
+            seriesWriterImpl.write(time, tvPairs.getLong(i));
+            break;
+          case FLOAT:
+            seriesWriterImpl.write(time, tvPairs.getFloat(i));
+            break;
+          case DOUBLE:
+            seriesWriterImpl.write(time, tvPairs.getDouble(i));
+            break;
+          case TEXT:
+            seriesWriterImpl.write(time, tvPairs.getBinary(i));
+            break;
+          default:
+            LOGGER.error("Storage group {} does not support data type: {}", 
storageGroup, dataType);
+            break;
+        }
+      }
+    }
+
+    @SuppressWarnings("squid:S135")
+    @Override
+    public void run() {
+      LOGGER.debug(
+          "Storage group {} memtable flushing to file {} starts to encoding 
data (Thread #{}).",
+          storageGroup,
+          writer.getFile().getName(),
+          threadNumber);
+      while (true) {
+
+        Object task = null;
+        try {
+          task = encodingTaskQueue.take();
+        } catch (InterruptedException e1) {
+          LOGGER.error(
+              "Storage group {}, file {}, Take task from encodingTaskQueue 
Interrupted (Thread #{})",
+              storageGroup,
+              writer.getFile().getName(),
+              threadNumber);
+          Thread.currentThread().interrupt();
+          break;
+        }
+        if (task instanceof StartFlushGroupIOTask || task instanceof 
EndChunkGroupIoTask) {
+          try {
+            ioTaskQueue.put(task);
+          } catch (
+              @SuppressWarnings("squid:S2142")
+              InterruptedException e) {
+            LOGGER.error(
+                "Storage group {} memtable flushing to file {}, encoding task 
is interrupted.",
+                storageGroup,
+                writer.getFile().getName(),
+                e);
+            // generally it is because the thread pool is shutdown so the task 
should be aborted
+            break;
+          }
+        } else if (task instanceof TaskEnd) {
+          break;
+        } else {
+          long starTime = System.currentTimeMillis();
+          Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, 
MeasurementSchema>) task;
+          IChunkWriter seriesWriter = new 
ChunkWriterImpl(encodingMessage.right);
+          writeOneSeries(encodingMessage.left, seriesWriter, 
encodingMessage.right.getType());
+          seriesWriter.sealCurrentPage();
+          seriesWriter.clearPageWriter();
+          try {
+            ioTaskQueue.put(seriesWriter);
+          } catch (InterruptedException e) {
+            LOGGER.error("Put task into ioTaskQueue Interrupted");
+            Thread.currentThread().interrupt();
+          }
+          memSerializeTime += System.currentTimeMillis() - starTime;
+        }
+      }
+      try {
+        ioTaskQueue.put(new TaskEnd());
+      } catch (InterruptedException e) {
+        LOGGER.error("Put task into ioTaskQueue Interrupted");
+        Thread.currentThread().interrupt();
+      }
+
+      LOGGER.debug(
+          "Storage group {}, flushing memtable {} into disk: (Thread #{}) 
Encoding data cost "
+              + "{} ms.",
+          storageGroup,
+          writer.getFile().getName(),
+          threadNumber,
+          memSerializeTime);
+    }
+  }
+
+  @SuppressWarnings("squid:S135")
+  private Runnable ioTask =
+      () -> {
+        LOGGER.debug(
+            "Storage group {} memtable flushing to file {} start io.",
+            storageGroup,
+            writer.getFile().getName());
+        int i = -1;
+        // whether the IO task is writing a new ChunkGroup.
+        // if true, then we can choose task from any queue.
+        // otherwise, we can not change the queue.
+        boolean isNew = true;
+        byte[] finished = new byte[threadSize / Byte.SIZE + 1];
+        for (int j = 0; j < finished.length; j++) {
+          finished[j] = 0;
+        }
+
+        while (true) {
+          Object ioMessage = null;
+          try {
+            if (isNew) {
+              while (ioMessage == null) {
+                // round robin strategy to get a task.
+                i = (i + 1) % threadSize;
+                if ((finished[i / Byte.SIZE] & BIT_UTIL[i % Byte.SIZE]) == 1) {
+                  // means the queue is done
+                  continue;
+                }
+                ioMessage = ioTaskQueues[i].poll(10, TimeUnit.MILLISECONDS);
+              }
+            } else {
+              ioMessage = ioTaskQueues[i].take();
+            }
+          } catch (InterruptedException e1) {
+            LOGGER.error("take task from ioTaskQueue Interrupted");
+            Thread.currentThread().interrupt();
+            break;
+          }
+          long starTime = System.currentTimeMillis();
+          try {
+            if (ioMessage instanceof StartFlushGroupIOTask) {
+              isNew = false;
+              this.writer.startChunkGroup(((StartFlushGroupIOTask) 
ioMessage).deviceId);
+            } else if (ioMessage instanceof TaskEnd) {
+              // queue i is finished
+              finished[i / Byte.SIZE] |= BIT_UTIL[i % Byte.SIZE];
+              // check whether if all queues are finished.
+              int j;
+              for (j = 0; j < threadSize / Byte.SIZE; j++) {
+                if (finished[j] != 0XFF) {
+                  // not finished.
+                  break;
+                }
+              }
+              if (j < threadSize / Byte.SIZE) {
+                continue;
+              }
+              for (j = 0; j < threadSize % Byte.SIZE; j++) {
+                if ((finished[threadSize / Byte.SIZE] & BIT_UTIL[j]) != 1) {
+                  // not finished.
+                  break;
+                }
+              }
+              if (j < threadSize % Byte.SIZE) {
+                continue;
+              }
+              // finished
+              break;
+            } else if (ioMessage instanceof IChunkWriter) {
+              ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+              chunkWriter.writeToFileWriter(this.writer);
+            } else {
+              this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+              this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+              this.writer.endChunkGroup();
+              isNew = true;
+            }
+          } catch (IOException e) {
+            LOGGER.error(
+                "Storage group {} memtable {}, io task meets error.", 
storageGroup, memTable, e);
+            throw new FlushRunTimeException(e);
+          }
+          ioTime += System.currentTimeMillis() - starTime;
+        }
+        LOGGER.debug(
+            "flushing a memtable to file {} in storage group {}, io cost {}ms",
+            writer.getFile().getName(),
+            storageGroup,
+            ioTime);
+      };
+
+  static class TaskEnd {
+
+    TaskEnd() {}
+  }
+
+  static class EndChunkGroupIoTask {
+
+    EndChunkGroupIoTask() {}
+  }
+
+  static class StartFlushGroupIOTask {
+
+    private final String deviceId;
+
+    StartFlushGroupIOTask(String deviceId) {
+      this.deviceId = deviceId;
+    }
+  }
+
+  private static final short[] BIT_UTIL = new short[] {1, 2, 4, 8, 16, 32, 64, 
0XFF};
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
index fe4f2e3..387daef 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
@@ -19,13 +19,6 @@
 
 package org.apache.iotdb.db.engine.flush;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -42,17 +35,23 @@ import 
org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+
 public class MemTableFlushTaskTest {
   String filePath = "target/tsfile.tsfile";
   IMemTable memTable;
   RestorableTsFileIOWriter writer;
-  String[] devices = {"root.sg.d1", "root.sg.d2"};
+  String[] devices = {"root.sg.d1", "root.sg.d2", "root.sg.d3", "root.sg.d4", 
"root.sg.d5"};
   String[] sensors = {"s1", "s2", "s3"};
   Integer[] types = {
     (int) TSDataType.INT32.serialize(),
@@ -68,7 +67,7 @@ public class MemTableFlushTaskTest {
   @Before
   public void setUp() throws IllegalPathException, IOException {
     memTable = new PrimitiveMemTable();
-    for (int i = 0; i < 2; i++) {
+    for (int i = 0; i < devices.length; i++) {
       InsertTabletPlan plan =
           new InsertTabletPlan(new PartialPath(devices[i]), sensors, 
Arrays.asList(types));
       Object[] columns = new Object[3];
@@ -110,7 +109,7 @@ public class MemTableFlushTaskTest {
 
   @Test
   public void test() throws ExecutionException, InterruptedException, 
IOException {
-    MemTableFlushTask task = new MemTableFlushTask(memTable, writer, 
"root.sg");
+    IMemTableFlushTask task = new MultiThreadMemTableFlushTask(memTable, 
writer, "root.sg");
     task.syncFlushMemTable();
     System.out.println("end file.....");
     writer.endFile();
@@ -129,7 +128,10 @@ public class MemTableFlushTaskTest {
                   new Path("root.sg.d1.s3", true),
                   new Path("root.sg.d2.s1", true),
                   new Path("root.sg.d2.s2", true),
-                  new Path("root.sg.d2.s3", true)),
+                  new Path("root.sg.d2.s3", true),
+                  new Path("root.sg.d4.s1", true),
+                  new Path("root.sg.d4.s2", true),
+                  new Path("root.sg.d4.s3", true)),
               null);
       QueryDataSet set = reader.query(expression);
       int time = 1;

Reply via email to