This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch encoding_parallel in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a35e3ecdd386e99c1e475e61e2554e7220b76398 Author: xiangdong huang <[email protected]> AuthorDate: Fri Mar 19 01:05:00 2021 +0800 test parallel encoding --- .../engine/flush/MultiThreadMemTableFlushTask.java | 101 +++++++++++++++------ .../db/engine/flush/MemTableFlushTaskTest.java | 98 +++++++++++++++++++- 2 files changed, 169 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java index 526c4f6..99788a5 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -50,23 +51,20 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { FlushSubTaskPoolManager.getInstance(); private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); // we have multiple thread to do the encoding Task. - private Future<?>[] encodingTaskFutures; - private final Future<?> ioTaskFuture; + private Future<Long>[] encodingTaskFutures; + private final Future<Long> ioTaskFuture; private RestorableTsFileIOWriter writer; int threadSize = IoTDBDescriptor.getInstance().getConfig().getConcurrentEncodingTasksInOneMemtable(); - private final LinkedBlockingQueue<Object>[] encodingTaskQueues; + private LinkedBlockingQueue<Object>[] encodingTaskQueues; private LinkedBlockingQueue<Object>[] ioTaskQueues; private String storageGroup; private IMemTable memTable; - private volatile long memSerializeTime = 0L; - private volatile long ioTime = 0L; - /** * @param memTable the memTable to flush * @param writer the writer where memTable will be flushed to (current tsfile writer or vm writer) @@ -82,12 +80,20 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { } this.encodingTaskQueues = new LinkedBlockingQueue[threadSize]; ioTaskQueues = new LinkedBlockingQueue[threadSize]; - for (int i = 0; i < threadSize; i++) { - ioTaskQueues[i] = - config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo() - ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing()) - : new LinkedBlockingQueue<>(); - encodingTaskQueues[i] = new LinkedBlockingQueue<>(); + if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) { + LOGGER.debug( + "Encoding is faster than IO, will limit the size of Encoding queue as {}", + config.getIoTaskQueueSizeForFlushing()); + for (int i = 0; i < threadSize; i++) { + ioTaskQueues[i] = new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing()); + encodingTaskQueues[i] = new LinkedBlockingQueue<>(); + } + } else { + LOGGER.debug("Encoding is slower than IO, will do not limit the size of Encoding queue"); + for (int i = 0; i < threadSize; i++) { + ioTaskQueues[i] = new LinkedBlockingQueue<>(); + encodingTaskQueues[i] = new LinkedBlockingQueue<>(); + } } this.encodingTaskFutures = submitEncodingTasks(); @@ -111,8 +117,17 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { long estimatedTemporaryMemSize = 0L; if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) { estimatedTemporaryMemSize = - memTable.memSize() / memTable.getSeriesNumber() * config.getIoTaskQueueSizeForFlushing(); + memTable.memSize() + / memTable.getSeriesNumber() + * threadSize + * config.getIoTaskQueueSizeForFlushing(); + // memTable.memSize() / memTable.getSeriesNumber() * config.getIoTaskQueueSizeForFlushing(); SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize); + LOGGER.debug( + "Assign {} KB memory to the flushing task. SG {}, file {}", + estimatedTemporaryMemSize / 1024, + storageGroup, + writer.getFile().getName()); } long start = System.currentTimeMillis(); long sortTime = 0; @@ -145,10 +160,10 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { storageGroup, writer.getFile().getName(), sortTime); - - for (Future encodingTaskFuture : encodingTaskFutures) { + long memSerializeTime = 0; + for (Future<Long> encodingTaskFuture : encodingTaskFutures) { try { - encodingTaskFuture.get(); + memSerializeTime += encodingTaskFuture.get(); } catch (InterruptedException | ExecutionException e) { // any failed encoding task will rollback the whole task for (LinkedBlockingQueue encodingTaskQueue : encodingTaskQueues) { @@ -161,8 +176,9 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { throw e; } } + memSerializeTime /= threadSize; - ioTaskFuture.get(); + long ioTime = ioTaskFuture.get(); try { writer.writePlanIndices(); @@ -193,11 +209,14 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { return futures; } - class EncodingTask implements Runnable { + class EncodingTask implements Callable<Long> { LinkedBlockingQueue<Object> encodingTaskQueue; LinkedBlockingQueue<Object> ioTaskQueue; int threadNumber; + long consume = 0; + long memSerializeTime = 0; + EncodingTask( int threadNumber, LinkedBlockingQueue<Object> encodingTaskQueue, @@ -250,17 +269,22 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { @SuppressWarnings("squid:S135") @Override - public void run() { + public Long call() { LOGGER.debug( "Storage group {} memtable flushing to file {} starts to encoding data (Thread #{}).", storageGroup, writer.getFile().getName(), threadNumber); + long encodingQueueTakeTime = 0, ioEnqueueTime = 0; + int totalTask = 0; + long st = 0; // temporary vairable while (true) { Object task = null; + st = System.currentTimeMillis(); try { task = encodingTaskQueue.take(); + encodingQueueTakeTime += System.currentTimeMillis() - st; } catch (InterruptedException e1) { LOGGER.error( "Storage group {}, file {}, Take task from encodingTaskQueue Interrupted (Thread #{})", @@ -272,7 +296,10 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { } if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) { try { + st = System.currentTimeMillis(); ioTaskQueue.put(task); + ioEnqueueTime += System.currentTimeMillis() - st; + totalTask++; // TODO } catch ( @SuppressWarnings("squid:S2142") InterruptedException e) { @@ -293,8 +320,12 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType()); seriesWriter.sealCurrentPage(); seriesWriter.clearPageWriter(); + consume += System.currentTimeMillis() - starTime; + totalTask++; // TODO try { + st = System.currentTimeMillis(); ioTaskQueue.put(seriesWriter); + ioEnqueueTime += System.currentTimeMillis() - st; } catch (InterruptedException e) { LOGGER.error("Put task into ioTaskQueue Interrupted"); Thread.currentThread().interrupt(); @@ -303,29 +334,40 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { } } try { + st = System.currentTimeMillis(); ioTaskQueue.put(new TaskEnd()); + ioEnqueueTime += System.currentTimeMillis() - st; } catch (InterruptedException e) { LOGGER.error("Put task into ioTaskQueue Interrupted"); Thread.currentThread().interrupt(); } LOGGER.debug( - "Storage group {}, flushing memtable {} into disk: (Thread #{}) Encoding data cost " - + "{} ms.", + "Storage group {}, flushing memtable {} (size {} KB) into disk: (Thread #{}) Taking task " + + "from Encoding queue time {} ms, Enqueue IO task queue takes {} ms, Encoding data cost " + + "{} ms. real consume time {} ms. total task {}", storageGroup, writer.getFile().getName(), + memTable.memSize() / 1024, threadNumber, - memSerializeTime); + encodingQueueTakeTime, + ioEnqueueTime, + memSerializeTime, + consume, + totalTask); + return memSerializeTime; } } @SuppressWarnings("squid:S135") - private Runnable ioTask = + private Callable ioTask = () -> { + int totalTasks = 0; LOGGER.debug( "Storage group {} memtable flushing to file {} start io.", storageGroup, writer.getFile().getName()); + long ioTime = 0; int i = -1; // whether the IO task is writing a new ChunkGroup. // if true, then we can choose task from any queue. @@ -343,6 +385,8 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { while (ioMessage == null) { // round robin strategy to get a task. i = (i + 1) % threadSize; + // each Byte.SIZE thread takes one byte. So task i is in finished[i/Byte.SIZE]. + // i%Byte.SIZE is the position that i in fisnished[i/Byte.SIZE] if ((finished[i / Byte.SIZE] & BIT_UTIL[i % Byte.SIZE]) == 1) { // means the queue is done continue; @@ -362,13 +406,14 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { if (ioMessage instanceof StartFlushGroupIOTask) { isNew = false; this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId); + totalTasks++; // TODO } else if (ioMessage instanceof TaskEnd) { // queue i is finished finished[i / Byte.SIZE] |= BIT_UTIL[i % Byte.SIZE]; // check whether if all queues are finished. int j; for (j = 0; j < threadSize / Byte.SIZE; j++) { - if (finished[j] != 0XFF) { + if (finished[j] != (byte) 0XFF) { // not finished. break; } @@ -390,11 +435,13 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { } else if (ioMessage instanceof IChunkWriter) { ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage; chunkWriter.writeToFileWriter(this.writer); + totalTasks++; // TODO } else { this.writer.setMinPlanIndex(memTable.getMinPlanIndex()); this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex()); this.writer.endChunkGroup(); isNew = true; + totalTasks++; // TODO } } catch (IOException e) { LOGGER.error( @@ -404,10 +451,12 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask { ioTime += System.currentTimeMillis() - starTime; } LOGGER.debug( - "flushing a memtable to file {} in storage group {}, io cost {}ms", + "flushing a memtable to file {} in storage group {}, io cost {}ms. TotalTask {}", writer.getFile().getName(), storageGroup, - ioTime); + ioTime, + totalTasks); + return ioTime; }; static class TaskEnd { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java index 9a65600..57f893e 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java @@ -19,13 +19,28 @@ package org.apache.iotdb.db.engine.flush; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.LoggerContext; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask.EncodingTask; +import org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask.EndChunkGroupIoTask; +import org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask.StartFlushGroupIOTask; +import org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask.TaskEnd; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; +import org.apache.iotdb.db.utils.datastructure.TVList; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.ReadOnlyTsFile; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Path; @@ -33,6 +48,9 @@ import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.expression.QueryExpression; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; @@ -46,8 +64,21 @@ import java.io.IOException; import java.nio.file.Files; import java.util.Arrays; import java.util.concurrent.ExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MemTableFlushTaskTest { + + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + + { + loggerContext + .getLogger("org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask") + .setLevel(Level.valueOf("trace")); + } + + Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTaskTest.class); + String filePath = "target/tsfile.tsfile"; IMemTable memTable; RestorableTsFileIOWriter writer; @@ -107,11 +138,70 @@ public class MemTableFlushTaskTest { } } - // @Test + @Test + public void testLarge() + throws IllegalPathException, ExecutionException, InterruptedException, IOException { + + IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + config.setConcurrentEncodingTasksInOneMemtable(10); + config.setEnableMemControl(true); + config.setIoTaskQueueSizeForFlushing(2000); + // config.setAllocateMemoryForWrite(1024 * 1024); + // config.setRejectProportion(0.2); + + String[] sensors = {"s1", "s2"}; + Integer[] types = {(int) TSDataType.INT32.serialize(), (int) TSDataType.DOUBLE.serialize()}; + MeasurementMNode[] nodes = { + new MeasurementMNode( + null, + "s1", + new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY), + null), + new MeasurementMNode( + null, + "s2", + new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.RLE, CompressionType.SNAPPY), + null), + }; + String[] devices = new String[10]; + for (int i = 0; i < 10; i++) { + devices[i] = "root.sg.d" + i; + } + for (int flush = 0; flush < 100; flush++) { + for (int loop = 0; loop < 10; loop++) { + for (int i = 0; i < devices.length; i++) { + InsertTabletPlan plan = + new InsertTabletPlan(new PartialPath(devices[i]), sensors, Arrays.asList(types)); + Object[] columns = new Object[2]; + int size = 100_000; + columns[0] = new int[size]; + columns[1] = new double[size]; + long[] times = new long[size]; + for (int j = 0; j < size; j++) { + ((int[]) columns[0])[j] = loop * size + j; + ((double[]) columns[1])[j] = loop * size + j; + times[j] = loop * size + j; + } + plan.setColumns(columns); + plan.setTimes(times); + plan.setMeasurements(sensors); + plan.setMeasurementMNodes(nodes); + memTable.write(plan, 0, size); + } + } + IMemTableFlushTask task = new MultiThreadMemTableFlushTask(memTable, writer, "root.sg"); + task.syncFlushMemTable(); + memTable.clear(); + } + writer.endFile(); + writer.close(); + } + + @Test public void test2() { - byte a = -128; - a &= 0XFF; - System.out.println(a); + byte[] bits = new byte[] {1, 2, 4, 8, 16, 32, 64, -128}; + byte a1 = (byte) 0X80; + System.out.println(a1); } @Test
