This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch 
feature_async_close_tsfile_handle_full_disk_situation
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 091fc2f47f1f20f9af58732a29611afaa039369b
Author: xiangdong huang <[email protected]>
AuthorDate: Sun Jun 30 18:26:15 2019 +0800

    replace flush task runnable with a callable function; and reject future 
writes if there is no disk spaces any more...
---
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 29 ++++++++-
 .../iotdb/db/engine/filenodeV2/FlushManager.java   | 12 +++-
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 42 ++++++------
 .../db/engine/memtable/MemTableFlushTaskV2.java    | 74 +++++++++++++---------
 .../iotdb/db/engine/memtable/MemTablePool.java     |  2 +-
 .../writelog/recover/TsFileRecoverPerformer.java   |  2 +-
 .../iotdb/db/engine/memtable/MemTablePoolTest.java |  2 +-
 7 files changed, 105 insertions(+), 58 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 233620c..ae58ee7 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -56,6 +56,11 @@ public class FileNodeManagerV2 implements IService {
       
.getLogger(org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2.class);
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
+  /*
+   * whether reject all writes (insert, update, delete)
+   */
+  private boolean rejectWrite = false;
+
   /**
    * a folder (system/info/ by default) that persist FileNodeProcessorStore 
classes. Ends with
    * File.separator Each FileNodeManager will have a subfolder.
@@ -79,6 +84,14 @@ public class FileNodeManagerV2 implements IService {
    */
   private volatile FileNodeManagerStatus fileNodeManagerStatus = 
FileNodeManagerStatus.NONE;
 
+  public boolean isRejectWrite() {
+    return rejectWrite;
+  }
+
+  public void setRejectWrite(boolean rejectWrite) {
+    this.rejectWrite = rejectWrite;
+  }
+
   private enum FileNodeManagerStatus {
     NONE, MERGE, CLOSE
   }
@@ -168,7 +181,9 @@ public class FileNodeManagerV2 implements IService {
    * @return an int value represents the insert type, 0: failed; 1: overflow; 
2: bufferwrite
    */
   public boolean insert(InsertPlan insertPlan) throws FileNodeManagerException 
{
-
+    if (rejectWrite) {
+      return false;
+    }
     FileNodeProcessorV2 fileNodeProcessor;
     try {
       fileNodeProcessor = getProcessor(insertPlan.getDeviceId());
@@ -200,22 +215,30 @@ public class FileNodeManagerV2 implements IService {
   /**
    * update data.
    */
-  public void update(String deviceId, String measurementId, long startTime, 
long endTime,
+  public boolean update(String deviceId, String measurementId, long startTime, 
long endTime,
       TSDataType type, String v) {
+    if (rejectWrite) {
+      return false;
+    }
     // TODO
+    return false;
   }
 
   /**
    * delete data.
    */
-  public void delete(String deviceId, String measurementId, long timestamp)
+  public boolean delete(String deviceId, String measurementId, long timestamp)
       throws FileNodeManagerException {
+    if (rejectWrite) {
+      return false;
+    }
     FileNodeProcessorV2 fileNodeProcessor = getProcessor(deviceId);
     try {
       fileNodeProcessor.delete(deviceId, measurementId, timestamp);
     } catch (IOException e) {
       throw new FileNodeManagerException(e);
     }
+    return true;
   }
 
   private void delete(String processorName,
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
index 002f809..b878edc 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
@@ -34,14 +35,15 @@ public class FlushManager {
 
   private FlushPoolManager flushPool = FlushPoolManager.getInstance();
 
-  class FlushThread implements Runnable {
+  class FlushThread implements Callable<Boolean> {
 
     @Override
-    public void run() {
+    public Boolean call() {
       UnsealedTsFileProcessorV2 unsealedTsFileProcessor = 
unsealedTsFileProcessorQueue.poll();
       long startTime = System.currentTimeMillis();
+      boolean flushSuccessed = false;
       try {
-        unsealedTsFileProcessor.flushOneMemTable();
+        flushSuccessed = unsealedTsFileProcessor.flushOneMemTable();
       } catch (IOException e) {
         LOGGER.error("storage group {} flush one memtable meet error",
             unsealedTsFileProcessor.getStorageGroupName(), e);
@@ -51,6 +53,10 @@ public class FlushManager {
       LOGGER.info("storage group {} flush process consume {} ms",
           unsealedTsFileProcessor.getStorageGroupName(), 
System.currentTimeMillis() - startTime);
       registerUnsealedTsFileProcessor(unsealedTsFileProcessor);
+      if (!flushSuccessed) {
+        FileNodeManagerV2.getInstance().setRejectWrite(true);
+      }
+      return flushSuccessed;
     }
   }
 
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 57bafc7..16d675d 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -23,13 +23,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.memtable.EmptyMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -41,11 +39,9 @@ import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -119,10 +115,10 @@ public class UnsealedTsFileProcessorV2 {
 
 //    long start1 = System.currentTimeMillis();
     if (workMemTable == null) {
-      // TODO change the impl of getEmptyMemTable to non-blocking
-      workMemTable = MemTablePool.getInstance().getEmptyMemTable(this);
+      // TODO change the impl of getAvailableMemTable to non-blocking
+      workMemTable = MemTablePool.getInstance().getAvailableMemTable(this);
 
-      // no empty memtable, return failure
+      // no available memtable, return failure
       if (workMemTable == null) {
         return false;
       }
@@ -184,6 +180,9 @@ public class UnsealedTsFileProcessorV2 {
 
 
   public boolean shouldFlush() {
+    if (workMemTable == null) {
+      return false;
+    }
     return workMemTable.memSize() > 
TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
   }
 
@@ -314,48 +313,55 @@ public class UnsealedTsFileProcessorV2 {
    * Take the first MemTable from the flushingMemTables and flush it. Called 
by a flush thread of
    * the flush manager pool
    */
-  void flushOneMemTable() throws IOException {
+  boolean flushOneMemTable() throws IOException {
     IMemTable memTableToFlush;
     memTableToFlush = flushingMemTables.getFirst();
 
     LOGGER.info("storage group {} start to flush a memtable in a flush 
thread", storageGroupName);
 
-    // null memtable only appears when calling asyncClose()
+    boolean flushSuccessed = false;
+    //if the memtable is not an EmptyMemTable (i.e., the memtable is actually 
a memtable).
     if (memTableToFlush.isManagedByMemPool()) {
       MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, 
fileSchema, writer,
           storageGroupName,
           this::releaseFlushedMemTableCallback);
-      flushTask.flushMemTable();
+      flushSuccessed = flushTask.flushMemTable();
+      if (flushSuccessed) {
 //      long start = System.currentTimeMillis();
-      MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName);
+        MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName);
 //      long elapse = System.currentTimeMillis() - start;
 //      if (elapse > 1000) {
 //        LOGGER.info("release a memtable cost: {}", elapse);
 //      }
-      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-        getLogNode().notifyEndFlush();
+        if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+          getLogNode().notifyEndFlush();
+        }
       }
       LOGGER.info("flush a memtable has finished");
     } else {
+      // the memtable is an EmptyMemTable. it is a signal for indicating 
asyncClose()
       LOGGER.info(
           "release an empty memtable from flushing memtable list, which is 
submitted in force flush");
       releaseFlushedMemTableCallback(memTableToFlush);
+      flushSuccessed = true;
     }
 
-    // for sync flush
+    // for notifying syncFlush()
     synchronized (memTableToFlush) {
       memTableToFlush.notify();
     }
 
     if (shouldClose && flushingMemTables.isEmpty()) {
-      endFile();
-
-      // for sync close
+      if (flushSuccessed) {
+        //if !flushSuccessed, then the file may be broken, we do not seal the 
file.
+        endFile();
+      }
+      // for notifying syncClose()
       synchronized (flushingMemTables) {
         flushingMemTables.notify();
       }
     }
-
+    return flushSuccessed;
   }
 
   private void endFile() throws IOException {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index cce9093..d37cf77 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -15,6 +15,7 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -42,7 +43,7 @@ public class MemTableFlushTaskV2 {
   private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
   private static final FlushSubTaskPoolManager subTaskPoolManager = 
FlushSubTaskPoolManager
       .getInstance();
-  private Future ioFlushTaskFuture;
+  private Future<Boolean> ioFlushTaskFuture;
   private NativeRestorableIOWriter tsFileIoWriter;
 
   private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
@@ -72,8 +73,9 @@ public class MemTableFlushTaskV2 {
 
   /**
    * the function for flushing memtable.
+   * this is a synchronized function.
    */
-  public void flushMemTable() {
+  public boolean flushMemTable() {
     long sortTime = 0;
     for (String deviceId : memTable.getMemTableMap().keySet()) {
       encodingTaskQueue.add(deviceId);
@@ -94,14 +96,18 @@ public class MemTableFlushTaskV2 {
         "Storage group {} memtable {}, flushing into disk: data sort time cost 
{} ms.",
         storageGroup, memTable.getVersion(), sortTime);
 
+    Boolean success = false;
     try {
-      ioFlushTaskFuture.get();
+      success = ioFlushTaskFuture.get();
     } catch (InterruptedException | ExecutionException e) {
       LOGGER.error("Waiting for IO flush task end meets error", e);
     }
-
     LOGGER.info("Storage group {} memtable {} flushing a memtable finished!", 
storageGroup, memTable);
-    flushCallBack.accept(memTable);
+    if (success) {
+      //only if successed, we use the callback to release the memtable.
+      flushCallBack.accept(memTable);
+    }
+    return success;
   }
 
 
@@ -131,11 +137,16 @@ public class MemTableFlushTaskV2 {
             }
           } else {
             if (task instanceof String) {
+              // the task indicates that a new Chunk Group begins, the value 
of the task is the deviceId.
+              //so, we just forward the task to the ioTaskQueue
               currDevice = (String) task;
               ioTaskQueue.add(task);
             } else if (task instanceof ChunkGroupIoTask) {
+              //the task indicates that all Chunks in the Chunk Group haven 
been submitted for encoding.
+              //so, we just forward the task to the  ioTaskQueue
               ioTaskQueue.add(task);
             } else {
+              //the task is for encoding and writing a Chunk into memory 
buffer.
               long starTime = System.currentTimeMillis();
               Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, 
MeasurementSchema>) task;
               ChunkBuffer chunkBuffer;
@@ -150,6 +161,7 @@ public class MemTableFlushTaskV2 {
               try {
                 writeOneSeries(encodingMessage.left, seriesWriter,
                     encodingMessage.right.getType());
+                //then we submit a task for flushing the memory buffer to the 
disk
                 ioTaskQueue.add(seriesWriter);
               } catch (IOException e) {
                 LOGGER.error("Storage group {} memtable {}, encoding task 
error.", storageGroup,
@@ -171,11 +183,9 @@ public class MemTableFlushTaskV2 {
   };
 
 
-  //TODO a better way is: for each TsFile, assign it a 
Executors.singleThreadPool,
-  // rather than per each memtable.
-  private Runnable IOTask = new Runnable() {
+  private Callable<Boolean> IOTask = new Callable<Boolean>() {
     @Override
-    public void run() {
+    public Boolean call() {
       try {
         long ioTime = 0;
         boolean returnWhenNoTask = false;
@@ -197,35 +207,37 @@ public class MemTableFlushTaskV2 {
             }
           } else {
             long starTime = System.currentTimeMillis();
-            try {
-              if (ioMessage instanceof String) {
-                tsFileIoWriter.startChunkGroup((String) ioMessage);
-              } else if (ioMessage instanceof IChunkWriter) {
-                if 
(IoTDBDescriptor.getInstance().getConfig().isChunkBufferPoolEnable()) {//chunk 
buffer pool enable
-                  ChunkWriterImpl writer = (ChunkWriterImpl) ioMessage;
-                  writer.writeToFileWriter(tsFileIoWriter);
-                  
ChunkBufferPool.getInstance().putBack(writer.getChunkBuffer());
-                } else {
-                  ((IChunkWriter) ioMessage).writeToFileWriter(tsFileIoWriter);
-                }
+            if (ioMessage instanceof String) {
+              //a new Chunk group begins
+              tsFileIoWriter.startChunkGroup((String) ioMessage);
+            } else if (ioMessage instanceof IChunkWriter) {
+              //writing a memory chunk buffer to the disk
+              if (IoTDBDescriptor.getInstance().getConfig()
+                  .isChunkBufferPoolEnable()) {//chunk buffer pool enable
+                ChunkWriterImpl writer = (ChunkWriterImpl) ioMessage;
+                writer.writeToFileWriter(tsFileIoWriter);
+                ChunkBufferPool.getInstance().putBack(writer.getChunkBuffer());
               } else {
-                ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioMessage;
-                tsFileIoWriter.endChunkGroup(endGroupTask.version);
-                endGroupTask.finished = true;
+                ((IChunkWriter) ioMessage).writeToFileWriter(tsFileIoWriter);
               }
-            } catch (IOException e) {
-              LOGGER.error("Storage group {} memtable {}, io error.", 
storageGroup,
-                  memTable.getVersion(), e);
-              throw new RuntimeException(e);
+            } else {
+              //finishing a chunk group.
+              ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioMessage;
+              tsFileIoWriter.endChunkGroup(endGroupTask.version);
+              endGroupTask.finished = true;
             }
             ioTime += System.currentTimeMillis() - starTime;
           }
         }
-        LOGGER.info("flushing a memtable {} in storage group {}, io cost 
{}ms", memTable.getVersion(),
-            storageGroup, ioTime);
-      } catch (RuntimeException e) {
-        LOGGER.error("io thread is dead", e);
+        LOGGER
+            .info("flushing a memtable {} in storage group {}, io cost {}ms", 
memTable.getVersion(),
+                storageGroup, ioTime);
+      } catch (Exception e) {
+        LOGGER.error("flushing Storage group {} memtable version {} failed.", 
storageGroup,
+            memTable.getVersion(), e);
+        return false;
       }
+      return true;
     }
   };
 
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index 75bb1f1..407d8b4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -45,7 +45,7 @@ public class MemTablePool {
   private MemTablePool() {
   }
 
-  public IMemTable getEmptyMemTable(Object applier) {
+  public IMemTable getAvailableMemTable(Object applier) {
     synchronized (availableMemTables) {
       if (availableMemTables.isEmpty() && size < capacity) {
         size++;
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index b64d5e7..36890bb 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -112,7 +112,7 @@ public class TsFileRecoverPerformer {
     // flush logs
     MemTableFlushTaskV2 tableFlushTask = new 
MemTableFlushTaskV2(recoverMemTable, fileSchema, restorableTsFileIOWriter,
         logNodePrefix, (a) -> {});
-    tableFlushTask.flushMemTable();
+    boolean success = tableFlushTask.flushMemTable();
 
     // close file
     try {
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
index 8c8396a..d315d19 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
@@ -44,7 +44,7 @@ public class MemTablePoolTest {
   public void testGetAndRelease() {
     long time = System.currentTimeMillis();
     for (int i = 0; i < 10; i++) {
-      IMemTable memTable = MemTablePool.getInstance().getEmptyMemTable("test 
case");
+      IMemTable memTable = 
MemTablePool.getInstance().getAvailableMemTable("test case");
       memTables.add(memTable);
     }
     time -= System.currentTimeMillis();

Reply via email to