This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch serial_test in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f94b072c31c88bea8500976b23770c241b6fb500 Author: JackieTien97 <[email protected]> AuthorDate: Sat Jan 30 09:24:41 2021 +0800 init --- .../iotdb/db/engine/flush/MemTableFlushTask.java | 248 +++++---------------- .../writelog/recover/TsFileRecoverPerformer.java | 3 - 2 files changed, 61 insertions(+), 190 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index c643f59..1cb13b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -19,21 +19,15 @@ package org.apache.iotdb.db.engine.flush; import java.io.IOException; -import java.util.concurrent.LinkedBlockingQueue; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; -import org.apache.iotdb.db.exception.runtime.FlushRunTimeException; -import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.db.rescon.SystemInfo; +import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -44,25 +38,12 @@ import org.slf4j.LoggerFactory; public class MemTableFlushTask { private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class); - private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager - .getInstance(); - private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - private final Future<?> encodingTaskFuture; - private final Future<?> ioTaskFuture; - private RestorableTsFileIOWriter writer; - - private final LinkedBlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>(); - private final LinkedBlockingQueue<Object> ioTaskQueue = (config.isEnableMemControl() - && SystemInfo.getInstance().isEncodingFasterThanIo()) - ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing()) - : new LinkedBlockingQueue<>(); - - private String storageGroup; + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private final RestorableTsFileIOWriter writer; - private IMemTable memTable; + private final String storageGroup; - private volatile long memSerializeTime = 0L; - private volatile long ioTime = 0L; + private final IMemTable memTable; /** * @param memTable the memTable to flush @@ -74,8 +55,6 @@ public class MemTableFlushTask { this.memTable = memTable; this.writer = writer; this.storageGroup = storageGroup; - this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask); - this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask); LOGGER.debug("flush task of Storage group {} memtable is created, flushing to file {}.", storageGroup, writer.getFile().getName()); } @@ -83,8 +62,7 @@ public class MemTableFlushTask { /** * the function for flushing memtable. */ - public void syncFlushMemTable() - throws ExecutionException, InterruptedException { + public void syncFlushMemTable() throws ExecutionException, IOException { LOGGER.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ", storageGroup, memTable.memSize(), @@ -98,10 +76,12 @@ public class MemTableFlushTask { } long start = System.currentTimeMillis(); long sortTime = 0; + long encodingTime = 0; + long ioTime = 0; //for map do not use get(key) to iteratate for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) { - encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey())); + this.writer.startChunkGroup(memTableEntry.getKey()); final Map<String, IWritableMemChunk> value = memTableEntry.getValue(); for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) { @@ -109,25 +89,36 @@ public class MemTableFlushTask { IWritableMemChunk series = iWritableMemChunkEntry.getValue(); MeasurementSchema desc = series.getSchema(); TVList tvList = series.getSortedTVListForFlush(); - sortTime += System.currentTimeMillis() - startTime; - encodingTaskQueue.put(new Pair<>(tvList, desc)); + long encodingStartTime = System.currentTimeMillis(); + sortTime += encodingStartTime - startTime; + IChunkWriter seriesWriter = new ChunkWriterImpl(desc); + writeOneSeries(tvList, seriesWriter, desc.getType()); + seriesWriter.sealCurrentPage(); + seriesWriter.clearPageWriter(); + long ioStartTime = System.currentTimeMillis(); + encodingTime += ioStartTime - encodingStartTime; + seriesWriter.writeToFileWriter(this.writer); + ioTime += System.currentTimeMillis() - ioStartTime; } - - encodingTaskQueue.put(new EndChunkGroupIoTask()); + long ioStartTime = System.currentTimeMillis(); + this.writer.setMinPlanIndex(memTable.getMinPlanIndex()); + this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex()); + this.writer.endChunkGroup(); + ioTime += System.currentTimeMillis() - ioStartTime; } - encodingTaskQueue.put(new TaskEnd()); - LOGGER.debug( + + LOGGER.info( "Storage group {} memtable flushing into file {}: data sort time cost {} ms.", storageGroup, writer.getFile().getName(), sortTime); - try { - encodingTaskFuture.get(); - } catch (InterruptedException | ExecutionException e) { - ioTaskFuture.cancel(true); - throw e; - } + LOGGER.info( + "Storage group {} memtable flushing into file {}: data encoding time cost {} ms.", + storageGroup, writer.getFile().getName(), encodingTime); + + LOGGER.info( + "Storage group {} memtable flushing into file {}: disk io time cost {} ms.", + storageGroup, writer.getFile().getName(), ioTime); - ioTaskFuture.get(); try { writer.writePlanIndices(); @@ -139,7 +130,7 @@ public class MemTableFlushTask { if (estimatedTemporaryMemSize != 0) { SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize); } - SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime); + SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= encodingTime); } LOGGER.info( @@ -147,157 +138,40 @@ public class MemTableFlushTask { storageGroup, memTable, System.currentTimeMillis() - start); } - private Runnable encodingTask = new Runnable() { - private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl, - TSDataType dataType) { - for (int i = 0; i < tvPairs.size(); i++) { - long time = tvPairs.getTime(i); + private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl, + TSDataType dataType) { + for (int i = 0; i < tvPairs.size(); i++) { + long time = tvPairs.getTime(i); - // skip duplicated data - if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) { - continue; - } - - switch (dataType) { - case BOOLEAN: - seriesWriterImpl.write(time, tvPairs.getBoolean(i)); - break; - case INT32: - seriesWriterImpl.write(time, tvPairs.getInt(i)); - break; - case INT64: - seriesWriterImpl.write(time, tvPairs.getLong(i)); - break; - case FLOAT: - seriesWriterImpl.write(time, tvPairs.getFloat(i)); - break; - case DOUBLE: - seriesWriterImpl.write(time, tvPairs.getDouble(i)); - break; - case TEXT: - seriesWriterImpl.write(time, tvPairs.getBinary(i)); - break; - default: - LOGGER.error("Storage group {} does not support data type: {}", storageGroup, - dataType); - break; - } + // skip duplicated data + if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) { + continue; } - } - @SuppressWarnings("squid:S135") - @Override - public void run() { - LOGGER.debug("Storage group {} memtable flushing to file {} starts to encoding data.", - storageGroup, writer.getFile().getName()); - while (true) { - - Object task = null; - try { - task = encodingTaskQueue.take(); - } catch (InterruptedException e1) { - LOGGER.error("Take task into ioTaskQueue Interrupted"); - Thread.currentThread().interrupt(); + switch (dataType) { + case BOOLEAN: + seriesWriterImpl.write(time, tvPairs.getBoolean(i)); break; - } - if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) { - try { - ioTaskQueue.put(task); - } catch (@SuppressWarnings("squid:S2142") InterruptedException e) { - LOGGER.error("Storage group {} memtable flushing to file {}, encoding task is interrupted.", - storageGroup, writer.getFile().getName(), e); - // generally it is because the thread pool is shutdown so the task should be aborted - break; - } - } else if (task instanceof TaskEnd) { + case INT32: + seriesWriterImpl.write(time, tvPairs.getInt(i)); break; - } else { - long starTime = System.currentTimeMillis(); - Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task; - IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right); - writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType()); - seriesWriter.sealCurrentPage(); - seriesWriter.clearPageWriter(); - try { - ioTaskQueue.put(seriesWriter); - } catch (InterruptedException e) { - LOGGER.error("Put task into ioTaskQueue Interrupted"); - Thread.currentThread().interrupt(); - } - memSerializeTime += System.currentTimeMillis() - starTime; - } - } - try { - ioTaskQueue.put(new TaskEnd()); - } catch (InterruptedException e) { - LOGGER.error("Put task into ioTaskQueue Interrupted"); - Thread.currentThread().interrupt(); - } - - LOGGER.debug("Storage group {}, flushing memtable {} into disk: Encoding data cost " - + "{} ms.", - storageGroup, writer.getFile().getName(), memSerializeTime); - } - }; - - @SuppressWarnings("squid:S135") - private Runnable ioTask = () -> { - LOGGER.debug("Storage group {} memtable flushing to file {} start io.", - storageGroup, writer.getFile().getName()); - while (true) { - Object ioMessage = null; - try { - ioMessage = ioTaskQueue.take(); - } catch (InterruptedException e1) { - LOGGER.error("take task from ioTaskQueue Interrupted"); - Thread.currentThread().interrupt(); - break; - } - long starTime = System.currentTimeMillis(); - try { - if (ioMessage instanceof StartFlushGroupIOTask) { - this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId); - } else if (ioMessage instanceof TaskEnd) { + case INT64: + seriesWriterImpl.write(time, tvPairs.getLong(i)); + break; + case FLOAT: + seriesWriterImpl.write(time, tvPairs.getFloat(i)); + break; + case DOUBLE: + seriesWriterImpl.write(time, tvPairs.getDouble(i)); + break; + case TEXT: + seriesWriterImpl.write(time, tvPairs.getBinary(i)); + break; + default: + LOGGER.error("Storage group {} does not support data type: {}", storageGroup, + dataType); break; - } else if (ioMessage instanceof IChunkWriter) { - ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage; - chunkWriter.writeToFileWriter(this.writer); - } else { - this.writer.setMinPlanIndex(memTable.getMinPlanIndex()); - this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex()); - this.writer.endChunkGroup(); - } - } catch (IOException e) { - LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup, - memTable, e); - throw new FlushRunTimeException(e); } - ioTime += System.currentTimeMillis() - starTime; - } - LOGGER.debug("flushing a memtable to file {} in storage group {}, io cost {}ms", - writer.getFile().getName(), storageGroup, ioTime); - }; - - static class TaskEnd { - - TaskEnd() { - - } - } - - static class EndChunkGroupIoTask { - - EndChunkGroupIoTask() { - - } - } - - static class StartFlushGroupIOTask { - - private final String deviceId; - - StartFlushGroupIOTask(String deviceId) { - this.deviceId = deviceId; } } } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java index b2f2bcb..e46ddd1 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java @@ -222,9 +222,6 @@ public class TsFileRecoverPerformer { // into it } catch (IOException | ExecutionException e) { throw new StorageGroupProcessorException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new StorageGroupProcessorException(e); } }
