This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch two_stage_pipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b79643ee16de2ced1faa29cb70493dff95e3362f Author: JackieTien97 <[email protected]> AuthorDate: Wed Jan 27 12:37:39 2021 +0800 init --- .../iotdb/db/engine/flush/MemTableFlushTask.java | 145 +++++++-------------- 1 file changed, 48 insertions(+), 97 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index b64fd8f..cb72070 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; import org.apache.iotdb.db.exception.runtime.FlushRunTimeException; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -43,17 +42,14 @@ public class MemTableFlushTask { private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class); private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager .getInstance(); - private final Future<?> encodingTaskFuture; private final Future<?> ioTaskFuture; private RestorableTsFileIOWriter writer; private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>(); - private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new ConcurrentLinkedQueue<>(); private String storageGroup; private IMemTable memTable; - private volatile boolean noMoreEncodingTask = false; private volatile boolean noMoreIOTask = false; /** @@ -66,7 +62,6 @@ public class MemTableFlushTask { this.memTable = memTable; this.writer = writer; this.storageGroup = storageGroup; - this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask); this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask); LOGGER.debug("flush task of Storage group {} memtable {} is created ", storageGroup, memTable.getVersion()); @@ -83,10 +78,11 @@ public class MemTableFlushTask { memTable.getTotalPointsNum() / memTable.getSeriesNumber()); long start = System.currentTimeMillis(); long sortTime = 0; + long encodingTime = 0; //for map do not use get(key) to iteratate for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) { - encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey())); + ioTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey())); final Map<String, IWritableMemChunk> value = memTableEntry.getValue(); for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) { @@ -94,26 +90,26 @@ public class MemTableFlushTask { IWritableMemChunk series = iWritableMemChunkEntry.getValue(); MeasurementSchema desc = series.getSchema(); TVList tvList = series.getSortedTVListForFlush(); - sortTime += System.currentTimeMillis() - startTime; - encodingTaskQueue.add(new Pair<>(tvList, desc)); + long encodingStartTime = System.currentTimeMillis(); + sortTime += encodingStartTime - startTime; + IChunkWriter seriesWriter = new ChunkWriterImpl(desc); + writeOneSeries(tvList, seriesWriter, desc.getType()); + seriesWriter.sealCurrentPage(); + seriesWriter.clearPageWriter(); + encodingTime += (System.currentTimeMillis() - encodingStartTime); + ioTaskQueue.add(seriesWriter); } - encodingTaskQueue.add(new EndChunkGroupIoTask()); + ioTaskQueue.add(new EndChunkGroupIoTask()); } - noMoreEncodingTask = true; + noMoreIOTask = true; LOGGER.info( "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.", storageGroup, memTable.getVersion(), sortTime); - - try { - encodingTaskFuture.get(); - } catch (InterruptedException | ExecutionException e) { - // avoid ioTask waiting forever - noMoreIOTask = true; - ioTaskFuture.cancel(true); - throw e; - } + LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost " + + "{} ms.", + storageGroup, memTable.getVersion(), encodingTime); ioTaskFuture.get(); @@ -127,91 +123,46 @@ public class MemTableFlushTask { LOGGER.info( "Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms", storageGroup, memTable, System.currentTimeMillis() - start); - } - private Runnable encodingTask = new Runnable() { - private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl, - TSDataType dataType) { - for (int i = 0; i < tvPairs.size(); i++) { - long time = tvPairs.getTime(i); - // skip duplicated data - if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) { - continue; - } + } - switch (dataType) { - case BOOLEAN: - seriesWriterImpl.write(time, tvPairs.getBoolean(i)); - break; - case INT32: - seriesWriterImpl.write(time, tvPairs.getInt(i)); - break; - case INT64: - seriesWriterImpl.write(time, tvPairs.getLong(i)); - break; - case FLOAT: - seriesWriterImpl.write(time, tvPairs.getFloat(i)); - break; - case DOUBLE: - seriesWriterImpl.write(time, tvPairs.getDouble(i)); - break; - case TEXT: - seriesWriterImpl.write(time, tvPairs.getBinary(i)); - break; - default: - LOGGER.error("Storage group {} does not support data type: {}", storageGroup, - dataType); - break; - } + private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl, + TSDataType dataType) { + for (int i = 0; i < tvPairs.size(); i++) { + long time = tvPairs.getTime(i); + + // skip duplicated data + if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) { + continue; } - } - @SuppressWarnings("squid:S135") - @Override - public void run() { - long memSerializeTime = 0; - boolean noMoreMessages = false; - LOGGER.debug("Storage group {} memtable {}, starts to encoding data.", storageGroup, - memTable.getVersion()); - while (true) { - if (noMoreEncodingTask) { - noMoreMessages = true; - } - Object task = encodingTaskQueue.poll(); - if (task == null) { - if (noMoreMessages) { - break; - } - try { - TimeUnit.MILLISECONDS.sleep(10); - } catch (@SuppressWarnings("squid:S2142") InterruptedException e) { - LOGGER.error("Storage group {} memtable {}, encoding task is interrupted.", - storageGroup, memTable.getVersion(), e); - // generally it is because the thread pool is shutdown so the task should be aborted - break; - } - } else { - if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) { - ioTaskQueue.add(task); - } else { - Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task; - long starTime = System.currentTimeMillis(); - IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right); - writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType()); - seriesWriter.sealCurrentPage(); - seriesWriter.clearPageWriter(); - memSerializeTime += (System.currentTimeMillis() - starTime); - ioTaskQueue.add(seriesWriter); - } - } + switch (dataType) { + case BOOLEAN: + seriesWriterImpl.write(time, tvPairs.getBoolean(i)); + break; + case INT32: + seriesWriterImpl.write(time, tvPairs.getInt(i)); + break; + case INT64: + seriesWriterImpl.write(time, tvPairs.getLong(i)); + break; + case FLOAT: + seriesWriterImpl.write(time, tvPairs.getFloat(i)); + break; + case DOUBLE: + seriesWriterImpl.write(time, tvPairs.getDouble(i)); + break; + case TEXT: + seriesWriterImpl.write(time, tvPairs.getBinary(i)); + break; + default: + LOGGER.error("Storage group {} does not support data type: {}", storageGroup, + dataType); + break; } - noMoreIOTask = true; - LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost " - + "{} ms.", - storageGroup, memTable.getVersion(), memSerializeTime); } - }; + } @SuppressWarnings("squid:S135") private Runnable ioTask = () -> {
