This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch feature_async_close_tsfile_handle_full_disk_situation in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 091fc2f47f1f20f9af58732a29611afaa039369b Author: xiangdong huang <[email protected]> AuthorDate: Sun Jun 30 18:26:15 2019 +0800 replace flush task runnable with a callable function; and reject future writes if there is no disk spaces any more... --- .../db/engine/filenodeV2/FileNodeManagerV2.java | 29 ++++++++- .../iotdb/db/engine/filenodeV2/FlushManager.java | 12 +++- .../filenodeV2/UnsealedTsFileProcessorV2.java | 42 ++++++------ .../db/engine/memtable/MemTableFlushTaskV2.java | 74 +++++++++++++--------- .../iotdb/db/engine/memtable/MemTablePool.java | 2 +- .../writelog/recover/TsFileRecoverPerformer.java | 2 +- .../iotdb/db/engine/memtable/MemTablePoolTest.java | 2 +- 7 files changed, 105 insertions(+), 58 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java index 233620c..ae58ee7 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java @@ -56,6 +56,11 @@ public class FileNodeManagerV2 implements IService { .getLogger(org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2.class); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + /* + * whether reject all writes (insert, update, delete) + */ + private boolean rejectWrite = false; + /** * a folder (system/info/ by default) that persist FileNodeProcessorStore classes. Ends with * File.separator Each FileNodeManager will have a subfolder. @@ -79,6 +84,14 @@ public class FileNodeManagerV2 implements IService { */ private volatile FileNodeManagerStatus fileNodeManagerStatus = FileNodeManagerStatus.NONE; + public boolean isRejectWrite() { + return rejectWrite; + } + + public void setRejectWrite(boolean rejectWrite) { + this.rejectWrite = rejectWrite; + } + private enum FileNodeManagerStatus { NONE, MERGE, CLOSE } @@ -168,7 +181,9 @@ public class FileNodeManagerV2 implements IService { * @return an int value represents the insert type, 0: failed; 1: overflow; 2: bufferwrite */ public boolean insert(InsertPlan insertPlan) throws FileNodeManagerException { - + if (rejectWrite) { + return false; + } FileNodeProcessorV2 fileNodeProcessor; try { fileNodeProcessor = getProcessor(insertPlan.getDeviceId()); @@ -200,22 +215,30 @@ public class FileNodeManagerV2 implements IService { /** * update data. */ - public void update(String deviceId, String measurementId, long startTime, long endTime, + public boolean update(String deviceId, String measurementId, long startTime, long endTime, TSDataType type, String v) { + if (rejectWrite) { + return false; + } // TODO + return false; } /** * delete data. */ - public void delete(String deviceId, String measurementId, long timestamp) + public boolean delete(String deviceId, String measurementId, long timestamp) throws FileNodeManagerException { + if (rejectWrite) { + return false; + } FileNodeProcessorV2 fileNodeProcessor = getProcessor(deviceId); try { fileNodeProcessor.delete(deviceId, measurementId, timestamp); } catch (IOException e) { throw new FileNodeManagerException(e); } + return true; } private void delete(String processorName, diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java index 002f809..b878edc 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.filenodeV2; import java.io.IOException; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; @@ -34,14 +35,15 @@ public class FlushManager { private FlushPoolManager flushPool = FlushPoolManager.getInstance(); - class FlushThread implements Runnable { + class FlushThread implements Callable<Boolean> { @Override - public void run() { + public Boolean call() { UnsealedTsFileProcessorV2 unsealedTsFileProcessor = unsealedTsFileProcessorQueue.poll(); long startTime = System.currentTimeMillis(); + boolean flushSuccessed = false; try { - unsealedTsFileProcessor.flushOneMemTable(); + flushSuccessed = unsealedTsFileProcessor.flushOneMemTable(); } catch (IOException e) { LOGGER.error("storage group {} flush one memtable meet error", unsealedTsFileProcessor.getStorageGroupName(), e); @@ -51,6 +53,10 @@ public class FlushManager { LOGGER.info("storage group {} flush process consume {} ms", unsealedTsFileProcessor.getStorageGroupName(), System.currentTimeMillis() - startTime); registerUnsealedTsFileProcessor(unsealedTsFileProcessor); + if (!flushSuccessed) { + FileNodeManagerV2.getInstance().setRejectWrite(true); + } + return flushSuccessed; } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java index 57bafc7..16d675d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java @@ -23,13 +23,11 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Supplier; -import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.memtable.EmptyMemTable; import org.apache.iotdb.db.engine.memtable.IMemTable; @@ -41,11 +39,9 @@ import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.VersionController; -import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException; import org.apache.iotdb.db.qp.constant.DatetimeUtils; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.utils.QueryUtils; -import org.apache.iotdb.db.utils.datastructure.TVListAllocator; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.node.WriteLogNode; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -119,10 +115,10 @@ public class UnsealedTsFileProcessorV2 { // long start1 = System.currentTimeMillis(); if (workMemTable == null) { - // TODO change the impl of getEmptyMemTable to non-blocking - workMemTable = MemTablePool.getInstance().getEmptyMemTable(this); + // TODO change the impl of getAvailableMemTable to non-blocking + workMemTable = MemTablePool.getInstance().getAvailableMemTable(this); - // no empty memtable, return failure + // no available memtable, return failure if (workMemTable == null) { return false; } @@ -184,6 +180,9 @@ public class UnsealedTsFileProcessorV2 { public boolean shouldFlush() { + if (workMemTable == null) { + return false; + } return workMemTable.memSize() > TSFileDescriptor.getInstance().getConfig().groupSizeInByte; } @@ -314,48 +313,55 @@ public class UnsealedTsFileProcessorV2 { * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of * the flush manager pool */ - void flushOneMemTable() throws IOException { + boolean flushOneMemTable() throws IOException { IMemTable memTableToFlush; memTableToFlush = flushingMemTables.getFirst(); LOGGER.info("storage group {} start to flush a memtable in a flush thread", storageGroupName); - // null memtable only appears when calling asyncClose() + boolean flushSuccessed = false; + //if the memtable is not an EmptyMemTable (i.e., the memtable is actually a memtable). if (memTableToFlush.isManagedByMemPool()) { MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, storageGroupName, this::releaseFlushedMemTableCallback); - flushTask.flushMemTable(); + flushSuccessed = flushTask.flushMemTable(); + if (flushSuccessed) { // long start = System.currentTimeMillis(); - MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName); + MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName); // long elapse = System.currentTimeMillis() - start; // if (elapse > 1000) { // LOGGER.info("release a memtable cost: {}", elapse); // } - if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { - getLogNode().notifyEndFlush(); + if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { + getLogNode().notifyEndFlush(); + } } LOGGER.info("flush a memtable has finished"); } else { + // the memtable is an EmptyMemTable. it is a signal for indicating asyncClose() LOGGER.info( "release an empty memtable from flushing memtable list, which is submitted in force flush"); releaseFlushedMemTableCallback(memTableToFlush); + flushSuccessed = true; } - // for sync flush + // for notifying syncFlush() synchronized (memTableToFlush) { memTableToFlush.notify(); } if (shouldClose && flushingMemTables.isEmpty()) { - endFile(); - - // for sync close + if (flushSuccessed) { + //if !flushSuccessed, then the file may be broken, we do not seal the file. + endFile(); + } + // for notifying syncClose() synchronized (flushingMemTables) { flushingMemTables.notify(); } } - + return flushSuccessed; } private void endFile() throws IOException { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java index cce9093..d37cf77 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java @@ -15,6 +15,7 @@ package org.apache.iotdb.db.engine.memtable; import java.io.IOException; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -42,7 +43,7 @@ public class MemTableFlushTaskV2 { private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte; private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager .getInstance(); - private Future ioFlushTaskFuture; + private Future<Boolean> ioFlushTaskFuture; private NativeRestorableIOWriter tsFileIoWriter; private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue(); @@ -72,8 +73,9 @@ public class MemTableFlushTaskV2 { /** * the function for flushing memtable. + * this is a synchronized function. */ - public void flushMemTable() { + public boolean flushMemTable() { long sortTime = 0; for (String deviceId : memTable.getMemTableMap().keySet()) { encodingTaskQueue.add(deviceId); @@ -94,14 +96,18 @@ public class MemTableFlushTaskV2 { "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.", storageGroup, memTable.getVersion(), sortTime); + Boolean success = false; try { - ioFlushTaskFuture.get(); + success = ioFlushTaskFuture.get(); } catch (InterruptedException | ExecutionException e) { LOGGER.error("Waiting for IO flush task end meets error", e); } - LOGGER.info("Storage group {} memtable {} flushing a memtable finished!", storageGroup, memTable); - flushCallBack.accept(memTable); + if (success) { + //only if successed, we use the callback to release the memtable. + flushCallBack.accept(memTable); + } + return success; } @@ -131,11 +137,16 @@ public class MemTableFlushTaskV2 { } } else { if (task instanceof String) { + // the task indicates that a new Chunk Group begins, the value of the task is the deviceId. + //so, we just forward the task to the ioTaskQueue currDevice = (String) task; ioTaskQueue.add(task); } else if (task instanceof ChunkGroupIoTask) { + //the task indicates that all Chunks in the Chunk Group haven been submitted for encoding. + //so, we just forward the task to the ioTaskQueue ioTaskQueue.add(task); } else { + //the task is for encoding and writing a Chunk into memory buffer. long starTime = System.currentTimeMillis(); Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task; ChunkBuffer chunkBuffer; @@ -150,6 +161,7 @@ public class MemTableFlushTaskV2 { try { writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType()); + //then we submit a task for flushing the memory buffer to the disk ioTaskQueue.add(seriesWriter); } catch (IOException e) { LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup, @@ -171,11 +183,9 @@ public class MemTableFlushTaskV2 { }; - //TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool, - // rather than per each memtable. - private Runnable IOTask = new Runnable() { + private Callable<Boolean> IOTask = new Callable<Boolean>() { @Override - public void run() { + public Boolean call() { try { long ioTime = 0; boolean returnWhenNoTask = false; @@ -197,35 +207,37 @@ public class MemTableFlushTaskV2 { } } else { long starTime = System.currentTimeMillis(); - try { - if (ioMessage instanceof String) { - tsFileIoWriter.startChunkGroup((String) ioMessage); - } else if (ioMessage instanceof IChunkWriter) { - if (IoTDBDescriptor.getInstance().getConfig().isChunkBufferPoolEnable()) {//chunk buffer pool enable - ChunkWriterImpl writer = (ChunkWriterImpl) ioMessage; - writer.writeToFileWriter(tsFileIoWriter); - ChunkBufferPool.getInstance().putBack(writer.getChunkBuffer()); - } else { - ((IChunkWriter) ioMessage).writeToFileWriter(tsFileIoWriter); - } + if (ioMessage instanceof String) { + //a new Chunk group begins + tsFileIoWriter.startChunkGroup((String) ioMessage); + } else if (ioMessage instanceof IChunkWriter) { + //writing a memory chunk buffer to the disk + if (IoTDBDescriptor.getInstance().getConfig() + .isChunkBufferPoolEnable()) {//chunk buffer pool enable + ChunkWriterImpl writer = (ChunkWriterImpl) ioMessage; + writer.writeToFileWriter(tsFileIoWriter); + ChunkBufferPool.getInstance().putBack(writer.getChunkBuffer()); } else { - ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioMessage; - tsFileIoWriter.endChunkGroup(endGroupTask.version); - endGroupTask.finished = true; + ((IChunkWriter) ioMessage).writeToFileWriter(tsFileIoWriter); } - } catch (IOException e) { - LOGGER.error("Storage group {} memtable {}, io error.", storageGroup, - memTable.getVersion(), e); - throw new RuntimeException(e); + } else { + //finishing a chunk group. + ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioMessage; + tsFileIoWriter.endChunkGroup(endGroupTask.version); + endGroupTask.finished = true; } ioTime += System.currentTimeMillis() - starTime; } } - LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(), - storageGroup, ioTime); - } catch (RuntimeException e) { - LOGGER.error("io thread is dead", e); + LOGGER + .info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(), + storageGroup, ioTime); + } catch (Exception e) { + LOGGER.error("flushing Storage group {} memtable version {} failed.", storageGroup, + memTable.getVersion(), e); + return false; } + return true; } }; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java index 75bb1f1..407d8b4 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java @@ -45,7 +45,7 @@ public class MemTablePool { private MemTablePool() { } - public IMemTable getEmptyMemTable(Object applier) { + public IMemTable getAvailableMemTable(Object applier) { synchronized (availableMemTables) { if (availableMemTables.isEmpty() && size < capacity) { size++; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java index b64d5e7..36890bb 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java @@ -112,7 +112,7 @@ public class TsFileRecoverPerformer { // flush logs MemTableFlushTaskV2 tableFlushTask = new MemTableFlushTaskV2(recoverMemTable, fileSchema, restorableTsFileIOWriter, logNodePrefix, (a) -> {}); - tableFlushTask.flushMemTable(); + boolean success = tableFlushTask.flushMemTable(); // close file try { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java index 8c8396a..d315d19 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java @@ -44,7 +44,7 @@ public class MemTablePoolTest { public void testGetAndRelease() { long time = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { - IMemTable memTable = MemTablePool.getInstance().getEmptyMemTable("test case"); + IMemTable memTable = MemTablePool.getInstance().getAvailableMemTable("test case"); memTables.add(memTable); } time -= System.currentTimeMillis();
