This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch encoding_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a35e3ecdd386e99c1e475e61e2554e7220b76398
Author: xiangdong huang <[email protected]>
AuthorDate: Fri Mar 19 01:05:00 2021 +0800

    test parallel encoding
---
 .../engine/flush/MultiThreadMemTableFlushTask.java | 101 +++++++++++++++------
 .../db/engine/flush/MemTableFlushTaskTest.java     |  98 +++++++++++++++++++-
 2 files changed, 169 insertions(+), 30 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
index 526c4f6..99788a5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -50,23 +51,20 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
       FlushSubTaskPoolManager.getInstance();
   private static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
   // we have multiple thread to do the encoding Task.
-  private Future<?>[] encodingTaskFutures;
-  private final Future<?> ioTaskFuture;
+  private Future<Long>[] encodingTaskFutures;
+  private final Future<Long> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
 
   int threadSize =
       
IoTDBDescriptor.getInstance().getConfig().getConcurrentEncodingTasksInOneMemtable();
 
-  private final LinkedBlockingQueue<Object>[] encodingTaskQueues;
+  private LinkedBlockingQueue<Object>[] encodingTaskQueues;
   private LinkedBlockingQueue<Object>[] ioTaskQueues;
 
   private String storageGroup;
 
   private IMemTable memTable;
 
-  private volatile long memSerializeTime = 0L;
-  private volatile long ioTime = 0L;
-
   /**
    * @param memTable the memTable to flush
    * @param writer the writer where memTable will be flushed to (current 
tsfile writer or vm writer)
@@ -82,12 +80,20 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
     }
     this.encodingTaskQueues = new LinkedBlockingQueue[threadSize];
     ioTaskQueues = new LinkedBlockingQueue[threadSize];
-    for (int i = 0; i < threadSize; i++) {
-      ioTaskQueues[i] =
-          config.isEnableMemControl() && 
SystemInfo.getInstance().isEncodingFasterThanIo()
-              ? new 
LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
-              : new LinkedBlockingQueue<>();
-      encodingTaskQueues[i] = new LinkedBlockingQueue<>();
+    if (config.isEnableMemControl() && 
SystemInfo.getInstance().isEncodingFasterThanIo()) {
+      LOGGER.debug(
+          "Encoding is faster than IO, will limit the size of Encoding queue 
as {}",
+          config.getIoTaskQueueSizeForFlushing());
+      for (int i = 0; i < threadSize; i++) {
+        ioTaskQueues[i] = new 
LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing());
+        encodingTaskQueues[i] = new LinkedBlockingQueue<>();
+      }
+    } else {
+      LOGGER.debug("Encoding is slower than IO, will do not limit the size of 
Encoding queue");
+      for (int i = 0; i < threadSize; i++) {
+        ioTaskQueues[i] = new LinkedBlockingQueue<>();
+        encodingTaskQueues[i] = new LinkedBlockingQueue<>();
+      }
     }
 
     this.encodingTaskFutures = submitEncodingTasks();
@@ -111,8 +117,17 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
     long estimatedTemporaryMemSize = 0L;
     if (config.isEnableMemControl() && 
SystemInfo.getInstance().isEncodingFasterThanIo()) {
       estimatedTemporaryMemSize =
-          memTable.memSize() / memTable.getSeriesNumber() * 
config.getIoTaskQueueSizeForFlushing();
+          memTable.memSize()
+              / memTable.getSeriesNumber()
+              * threadSize
+              * config.getIoTaskQueueSizeForFlushing();
+      // memTable.memSize() / memTable.getSeriesNumber() * 
config.getIoTaskQueueSizeForFlushing();
       
SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+      LOGGER.debug(
+          "Assign {} KB memory to the flushing task. SG {}, file {}",
+          estimatedTemporaryMemSize / 1024,
+          storageGroup,
+          writer.getFile().getName());
     }
     long start = System.currentTimeMillis();
     long sortTime = 0;
@@ -145,10 +160,10 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
         storageGroup,
         writer.getFile().getName(),
         sortTime);
-
-    for (Future encodingTaskFuture : encodingTaskFutures) {
+    long memSerializeTime = 0;
+    for (Future<Long> encodingTaskFuture : encodingTaskFutures) {
       try {
-        encodingTaskFuture.get();
+        memSerializeTime += encodingTaskFuture.get();
       } catch (InterruptedException | ExecutionException e) {
         // any failed encoding task will rollback the whole task
         for (LinkedBlockingQueue encodingTaskQueue : encodingTaskQueues) {
@@ -161,8 +176,9 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
         throw e;
       }
     }
+    memSerializeTime /= threadSize;
 
-    ioTaskFuture.get();
+    long ioTime = ioTaskFuture.get();
 
     try {
       writer.writePlanIndices();
@@ -193,11 +209,14 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
     return futures;
   }
 
-  class EncodingTask implements Runnable {
+  class EncodingTask implements Callable<Long> {
     LinkedBlockingQueue<Object> encodingTaskQueue;
     LinkedBlockingQueue<Object> ioTaskQueue;
     int threadNumber;
 
+    long consume = 0;
+    long memSerializeTime = 0;
+
     EncodingTask(
         int threadNumber,
         LinkedBlockingQueue<Object> encodingTaskQueue,
@@ -250,17 +269,22 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
 
     @SuppressWarnings("squid:S135")
     @Override
-    public void run() {
+    public Long call() {
       LOGGER.debug(
           "Storage group {} memtable flushing to file {} starts to encoding 
data (Thread #{}).",
           storageGroup,
           writer.getFile().getName(),
           threadNumber);
+      long encodingQueueTakeTime = 0, ioEnqueueTime = 0;
+      int totalTask = 0;
+      long st = 0; // temporary vairable
       while (true) {
 
         Object task = null;
+        st = System.currentTimeMillis();
         try {
           task = encodingTaskQueue.take();
+          encodingQueueTakeTime += System.currentTimeMillis() - st;
         } catch (InterruptedException e1) {
           LOGGER.error(
               "Storage group {}, file {}, Take task from encodingTaskQueue 
Interrupted (Thread #{})",
@@ -272,7 +296,10 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
         }
         if (task instanceof StartFlushGroupIOTask || task instanceof 
EndChunkGroupIoTask) {
           try {
+            st = System.currentTimeMillis();
             ioTaskQueue.put(task);
+            ioEnqueueTime += System.currentTimeMillis() - st;
+            totalTask++; // TODO
           } catch (
               @SuppressWarnings("squid:S2142")
               InterruptedException e) {
@@ -293,8 +320,12 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
           writeOneSeries(encodingMessage.left, seriesWriter, 
encodingMessage.right.getType());
           seriesWriter.sealCurrentPage();
           seriesWriter.clearPageWriter();
+          consume += System.currentTimeMillis() - starTime;
+          totalTask++; // TODO
           try {
+            st = System.currentTimeMillis();
             ioTaskQueue.put(seriesWriter);
+            ioEnqueueTime += System.currentTimeMillis() - st;
           } catch (InterruptedException e) {
             LOGGER.error("Put task into ioTaskQueue Interrupted");
             Thread.currentThread().interrupt();
@@ -303,29 +334,40 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
         }
       }
       try {
+        st = System.currentTimeMillis();
         ioTaskQueue.put(new TaskEnd());
+        ioEnqueueTime += System.currentTimeMillis() - st;
       } catch (InterruptedException e) {
         LOGGER.error("Put task into ioTaskQueue Interrupted");
         Thread.currentThread().interrupt();
       }
 
       LOGGER.debug(
-          "Storage group {}, flushing memtable {} into disk: (Thread #{}) 
Encoding data cost "
-              + "{} ms.",
+          "Storage group {}, flushing memtable {} (size {} KB) into disk: 
(Thread #{}) Taking task "
+              + "from Encoding queue time {} ms, Enqueue IO task queue takes 
{} ms, Encoding data cost "
+              + "{} ms. real consume time {} ms. total task {}",
           storageGroup,
           writer.getFile().getName(),
+          memTable.memSize() / 1024,
           threadNumber,
-          memSerializeTime);
+          encodingQueueTakeTime,
+          ioEnqueueTime,
+          memSerializeTime,
+          consume,
+          totalTask);
+      return memSerializeTime;
     }
   }
 
   @SuppressWarnings("squid:S135")
-  private Runnable ioTask =
+  private Callable ioTask =
       () -> {
+        int totalTasks = 0;
         LOGGER.debug(
             "Storage group {} memtable flushing to file {} start io.",
             storageGroup,
             writer.getFile().getName());
+        long ioTime = 0;
         int i = -1;
         // whether the IO task is writing a new ChunkGroup.
         // if true, then we can choose task from any queue.
@@ -343,6 +385,8 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
               while (ioMessage == null) {
                 // round robin strategy to get a task.
                 i = (i + 1) % threadSize;
+                // each Byte.SIZE thread takes one byte. So task i is in 
finished[i/Byte.SIZE].
+                // i%Byte.SIZE is the position that i in fisnished[i/Byte.SIZE]
                 if ((finished[i / Byte.SIZE] & BIT_UTIL[i % Byte.SIZE]) == 1) {
                   // means the queue is done
                   continue;
@@ -362,13 +406,14 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
             if (ioMessage instanceof StartFlushGroupIOTask) {
               isNew = false;
               this.writer.startChunkGroup(((StartFlushGroupIOTask) 
ioMessage).deviceId);
+              totalTasks++; // TODO
             } else if (ioMessage instanceof TaskEnd) {
               // queue i is finished
               finished[i / Byte.SIZE] |= BIT_UTIL[i % Byte.SIZE];
               // check whether if all queues are finished.
               int j;
               for (j = 0; j < threadSize / Byte.SIZE; j++) {
-                if (finished[j] != 0XFF) {
+                if (finished[j] != (byte) 0XFF) {
                   // not finished.
                   break;
                 }
@@ -390,11 +435,13 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
             } else if (ioMessage instanceof IChunkWriter) {
               ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
               chunkWriter.writeToFileWriter(this.writer);
+              totalTasks++; // TODO
             } else {
               this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
               this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
               this.writer.endChunkGroup();
               isNew = true;
+              totalTasks++; // TODO
             }
           } catch (IOException e) {
             LOGGER.error(
@@ -404,10 +451,12 @@ public class MultiThreadMemTableFlushTask implements 
IMemTableFlushTask {
           ioTime += System.currentTimeMillis() - starTime;
         }
         LOGGER.debug(
-            "flushing a memtable to file {} in storage group {}, io cost {}ms",
+            "flushing a memtable to file {} in storage group {}, io cost {}ms. 
TotalTask {}",
             writer.getFile().getName(),
             storageGroup,
-            ioTime);
+            ioTime,
+            totalTasks);
+        return ioTime;
       };
 
   static class TaskEnd {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
index 9a65600..57f893e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
@@ -19,13 +19,28 @@
 
 package org.apache.iotdb.db.engine.flush;
 
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask.EncodingTask;
+import 
org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask.EndChunkGroupIoTask;
+import 
org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask.StartFlushGroupIOTask;
+import org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask.TaskEnd;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -33,6 +48,9 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 
@@ -46,8 +64,21 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MemTableFlushTaskTest {
+
+  LoggerContext loggerContext = (LoggerContext) 
LoggerFactory.getILoggerFactory();
+
+  {
+    loggerContext
+        
.getLogger("org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask")
+        .setLevel(Level.valueOf("trace"));
+  }
+
+  Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTaskTest.class);
+
   String filePath = "target/tsfile.tsfile";
   IMemTable memTable;
   RestorableTsFileIOWriter writer;
@@ -107,11 +138,70 @@ public class MemTableFlushTaskTest {
     }
   }
 
-  // @Test
+  @Test
+  public void testLarge()
+      throws IllegalPathException, ExecutionException, InterruptedException, 
IOException {
+
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    config.setConcurrentEncodingTasksInOneMemtable(10);
+    config.setEnableMemControl(true);
+    config.setIoTaskQueueSizeForFlushing(2000);
+    // config.setAllocateMemoryForWrite(1024 * 1024);
+    // config.setRejectProportion(0.2);
+
+    String[] sensors = {"s1", "s2"};
+    Integer[] types = {(int) TSDataType.INT32.serialize(), (int) 
TSDataType.DOUBLE.serialize()};
+    MeasurementMNode[] nodes = {
+      new MeasurementMNode(
+          null,
+          "s1",
+          new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE, 
CompressionType.SNAPPY),
+          null),
+      new MeasurementMNode(
+          null,
+          "s2",
+          new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.RLE, 
CompressionType.SNAPPY),
+          null),
+    };
+    String[] devices = new String[10];
+    for (int i = 0; i < 10; i++) {
+      devices[i] = "root.sg.d" + i;
+    }
+    for (int flush = 0; flush < 100; flush++) {
+      for (int loop = 0; loop < 10; loop++) {
+        for (int i = 0; i < devices.length; i++) {
+          InsertTabletPlan plan =
+              new InsertTabletPlan(new PartialPath(devices[i]), sensors, 
Arrays.asList(types));
+          Object[] columns = new Object[2];
+          int size = 100_000;
+          columns[0] = new int[size];
+          columns[1] = new double[size];
+          long[] times = new long[size];
+          for (int j = 0; j < size; j++) {
+            ((int[]) columns[0])[j] = loop * size + j;
+            ((double[]) columns[1])[j] = loop * size + j;
+            times[j] = loop * size + j;
+          }
+          plan.setColumns(columns);
+          plan.setTimes(times);
+          plan.setMeasurements(sensors);
+          plan.setMeasurementMNodes(nodes);
+          memTable.write(plan, 0, size);
+        }
+      }
+      IMemTableFlushTask task = new MultiThreadMemTableFlushTask(memTable, 
writer, "root.sg");
+      task.syncFlushMemTable();
+      memTable.clear();
+    }
+    writer.endFile();
+    writer.close();
+  }
+
+  @Test
   public void test2() {
-    byte a = -128;
-    a &= 0XFF;
-    System.out.println(a);
+    byte[] bits = new byte[] {1, 2, 4, 8, 16, 32, 64, -128};
+    byte a1 = (byte) 0X80;
+    System.out.println(a1);
   }
 
   @Test

Reply via email to