This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f6aa05  Add disabled mem control & improve memory efficiency (#149)
4f6aa05 is described below

commit 4f6aa057f9ef44c05f5101f683354dff9bd6f4f7
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Apr 24 14:41:49 2019 +0800

    Add disabled mem control & improve memory efficiency (#149)
    
    * add DisabledMemController
    
    * fix method name
    
    * reuse BufferWriteProcessor and OverflowProcessor to reduce memory waste.
    
    * fix initialized isClose
    
    * fix reopen
    
    * fix closed Overflowprocessor calling switchWorkToMerge
---
 iotdb/iotdb/conf/iotdb-engine.properties           |   1 +
 .../engine/bufferwrite/BufferWriteProcessor.java   | 135 +++++++++++----------
 .../db/engine/filenode/FileNodeProcessor.java      |  88 ++++++--------
 .../db/engine/memcontrol/BasicMemController.java   |   6 +-
 .../memcontrol/DisabledMemController.java}         |  40 ++++--
 .../db/engine/overflow/io/OverflowProcessor.java   |  80 ++++++++++--
 .../db/exception/FileNodeProcessorException.java   |   4 +
 .../engine/overflow/io/OverflowProcessorTest.java  |   2 +
 8 files changed, 216 insertions(+), 140 deletions(-)

diff --git a/iotdb/iotdb/conf/iotdb-engine.properties 
b/iotdb/iotdb/conf/iotdb-engine.properties
index 5c7046f..c91cec7 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -146,6 +146,7 @@ mem_monitor_interval=1000
 # Decide how to control memory used by inserting data.
 # 0 is RecordMemController, which count the size of every record (tuple).
 # 1 is JVMMemController, which use JVM heap memory as threshold.
+# 2 is DisabledMemController, which does not control memory usage.
 mem_controller_type=0
 
 # When a bufferwrite's metadata size (in byte) exceed this, the bufferwrite is 
forced closed.
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 2bc394b..e770e85 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -84,6 +84,9 @@ public class BufferWriteProcessor extends Processor {
   private WriteLogNode logNode;
   private VersionController versionController;
 
+  private boolean isClosed = true;
+  private boolean isFlush = false;
+
   /**
    * constructor of BufferWriteProcessor.
    *
@@ -100,13 +103,37 @@ public class BufferWriteProcessor extends Processor {
     super(processorName);
     this.fileSchema = fileSchema;
     this.baseDir = baseDir;
-    this.fileName = fileName;
 
+    bufferwriteFlushAction = 
parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
+    bufferwriteCloseAction = 
parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
+    filenodeFlushAction = 
parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+
+
+    reopen(fileName);
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+      try {
+        logNode = MultiFileLogNodeManager.getInstance().getNode(
+            processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
+            getBufferwriteRestoreFilePath(),
+            FileNodeManager.getInstance().getRestoreFilePath(processorName));
+      } catch (IOException e) {
+        throw new BufferWriteProcessorException(e);
+      }
+    }
+    this.versionController = versionController;
+
+  }
+
+  public void reopen(String fileName) throws BufferWriteProcessorException {
+    if (!isClosed) {
+      return;
+    }
+    this.fileName = fileName;
     String bDir = baseDir;
     if (bDir.length() > 0 && bDir.charAt(bDir.length() - 1) != 
File.separatorChar) {
       bDir = bDir + File.separatorChar;
     }
-    String dataDirPath = bDir + processorName;
+    String dataDirPath = bDir + getProcessorName();
     File dataDir = new File(dataDirPath);
     if (!dataDir.exists()) {
       dataDir.mkdirs();
@@ -114,29 +141,25 @@ public class BufferWriteProcessor extends Processor {
           dataDirPath);
     }
     this.insertFilePath = new File(dataDir, fileName).getPath();
-    bufferWriteRelativePath = processorName + File.separatorChar + fileName;
+    bufferWriteRelativePath = getProcessorName() + File.separatorChar + 
fileName;
     try {
-      writer = new RestorableTsFileIOWriter(processorName, insertFilePath);
+      writer = new RestorableTsFileIOWriter(getProcessorName(), 
insertFilePath);
     } catch (IOException e) {
       throw new BufferWriteProcessorException(e);
     }
+    if (workMemTable == null) {
+      workMemTable = new PrimitiveMemTable();
+    } else {
+      workMemTable.clear();
+    }
+    isClosed = false;
+    isFlush = false;
+  }
 
-    bufferwriteFlushAction = 
parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
-    bufferwriteCloseAction = 
parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
-    filenodeFlushAction = 
parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
-    workMemTable = new PrimitiveMemTable();
-
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      try {
-        logNode = MultiFileLogNodeManager.getInstance().getNode(
-            processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
-            getBufferwriteRestoreFilePath(),
-            FileNodeManager.getInstance().getRestoreFilePath(processorName));
-      } catch (IOException e) {
-        throw new BufferWriteProcessorException(e);
-      }
+  public void checkOpen() throws BufferWriteProcessorException {
+    if (isClosed) {
+      throw new BufferWriteProcessorException("BufferWriteProcessor already 
closed");
     }
-    this.versionController = versionController;
   }
 
   /**
@@ -153,6 +176,7 @@ public class BufferWriteProcessor extends Processor {
   public boolean write(String deviceId, String measurementId, long timestamp, 
TSDataType dataType,
       String value)
       throws BufferWriteProcessorException {
+    checkOpen();
     TSRecord record = new TSRecord(timestamp, deviceId);
     DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, 
value);
     record.addTuple(dataPoint);
@@ -168,6 +192,7 @@ public class BufferWriteProcessor extends Processor {
    * @throws BufferWriteProcessorException if a flushing operation occurs and 
failed.
    */
   public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException 
{
+    checkOpen();
     long memUsage = MemUtils.getRecordSize(tsRecord);
     BasicMemController.UsageLevel level = BasicMemController.getInstance()
         .acquireUsage(this, memUsage);
@@ -233,7 +258,9 @@ public class BufferWriteProcessor extends Processor {
    * @return corresponding chunk data and chunk metadata in memory
    */
   public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> 
queryBufferWriteData(String deviceId,
-      String measurementId, TSDataType dataType, Map<String, String> props) {
+      String measurementId, TSDataType dataType, Map<String, String> props)
+      throws BufferWriteProcessorException {
+    checkOpen();
     flushQueryLock.lock();
     try {
       MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
@@ -255,10 +282,10 @@ public class BufferWriteProcessor extends Processor {
   private void switchWorkToFlush() {
     flushQueryLock.lock();
     try {
-      if (flushMemTable == null) {
-        flushMemTable = workMemTable;
-        workMemTable = new PrimitiveMemTable();
-      }
+      IMemTable temp = flushMemTable == null ? new PrimitiveMemTable() : 
flushMemTable;
+      flushMemTable = workMemTable;
+      workMemTable = temp;
+      isFlush = true;
     } finally {
       flushQueryLock.unlock();
     }
@@ -268,8 +295,8 @@ public class BufferWriteProcessor extends Processor {
     flushQueryLock.lock();
     try {
       flushMemTable.clear();
-      flushMemTable = null;
       writer.appendMetadata();
+      isFlush = false;
     } finally {
       flushQueryLock.unlock();
     }
@@ -329,6 +356,9 @@ public class BufferWriteProcessor extends Processor {
   // keyword synchronized is added in this method, so that only one flush task 
can be submitted now.
   @Override
   public synchronized Future<Boolean> flush() throws IOException {
+    if (isClosed) {
+      throw new IOException("BufferWriteProcessor closed");
+    }
     // statistic information for flush
     if (lastFlushTime > 0) {
       if (LOGGER.isInfoEnabled()) {
@@ -381,12 +411,18 @@ public class BufferWriteProcessor extends Processor {
 
   @Override
   public void close() throws BufferWriteProcessorException {
+    if (isClosed) {
+      return;
+    }
     try {
       long closeStartTime = System.currentTimeMillis();
       // flush data and wait for finishing flush
       flush().get();
       // end file
       writer.endFile(fileSchema);
+      writer = null;
+      workMemTable.clear();
+
       // update the IntervalFile for interval list
       bufferwriteCloseAction.act();
       // flush the changed information for filenode
@@ -402,6 +438,7 @@ public class BufferWriteProcessor extends Processor {
             DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
             closeEndTime - closeStartTime);
       }
+      isClosed = true;
     } catch (IOException e) {
       LOGGER.error("Close the bufferwrite processor error, the bufferwrite is 
{}.",
           getProcessorName(), e);
@@ -424,13 +461,7 @@ public class BufferWriteProcessor extends Processor {
    * @return True if flushing
    */
   public boolean isFlush() {
-    // starting a flush task has two steps: set the flushMemtable, and then 
set the flushFuture
-    // So, the following case exists: flushMemtable != null but flushFuture is 
done (because the
-    // flushFuture refers to the last finished flush.
-    // And, the following case exists,too: flushMemtable == null, but 
flushFuture is not done.
-    // (flushTask() is not finished, but switchToWork() has done)
-    // So, checking flushMemTable is more meaningful than flushFuture.isDone().
-    return  flushMemTable != null;
+    return isFlush;
   }
 
   /**
@@ -454,38 +485,6 @@ public class BufferWriteProcessor extends Processor {
     return file.length() + memoryUsage();
   }
 
-  /**
-   * Close current TsFile and open a new one for future writes. Block new 
writes and wait until
-   * current writes finish.
-   */
-  public void rollToNewFile() {
-    // TODO : [MemControl] implement this
-  }
-
-  /**
-   * Check if this TsFile has too big metadata or file. If true, close current 
file and open a new
-   * one.
-   */
-  private boolean checkSize() {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    long metaSize = getMetaSize();
-    long fileSize = getFileSize();
-    if (metaSize >= config.getBufferwriteMetaSizeThreshold()
-        || fileSize >= config.getBufferwriteFileSizeThreshold()) {
-      LOGGER.info(
-          "The bufferwrite processor {}, size({}) of the file {} reaches 
threshold {}, "
-              + "size({}) of metadata reaches threshold {}.",
-          getProcessorName(), MemUtils.bytesCntToStr(fileSize), this.fileName,
-          MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()),
-          MemUtils.bytesCntToStr(metaSize),
-          MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()));
-
-      rollToNewFile();
-      return true;
-    }
-    return false;
-  }
-
   public String getBaseDir() {
     return baseDir;
   }
@@ -538,7 +537,9 @@ public class BufferWriteProcessor extends Processor {
    * @param measurementId the measurementId of the timeseries to be deleted.
    * @param timestamp the upper-bound of deletion time.
    */
-  public void delete(String deviceId, String measurementId, long timestamp) {
+  public void delete(String deviceId, String measurementId, long timestamp)
+      throws BufferWriteProcessorException {
+    checkOpen();
     workMemTable.delete(deviceId, measurementId, timestamp);
     if (isFlush()) {
       // flushing MemTable cannot be directly modified since another thread is 
reading it
@@ -572,4 +573,8 @@ public class BufferWriteProcessor extends Processor {
   public String toString() {
     return "BufferWriteProcessor in " + insertFilePath;
   }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 47aca8e..d97594c 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -534,21 +534,17 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
                 + System.currentTimeMillis(),
             params, versionController, fileSchema);
       } catch (BufferWriteProcessorException e) {
-        LOGGER.error("The filenode processor {} failed to get the bufferwrite 
processor.",
-            processorName, e);
-        throw new FileNodeProcessorException(e);
+        throw new FileNodeProcessorException(String
+            .format("The filenode processor %s failed to get the bufferwrite 
processor.",
+                processorName),e);
+      }
+    } else if (bufferWriteProcessor.isClosed()){
+      try {
+        bufferWriteProcessor.reopen(insertTime + 
FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+            + System.currentTimeMillis());
+      } catch (BufferWriteProcessorException e) {
+        throw new FileNodeProcessorException("Cannot reopen 
BufferWriteProcessor", e);
       }
-    }
-    return bufferWriteProcessor;
-  }
-
-  /**
-   * get buffer write processor.
-   */
-  public BufferWriteProcessor getBufferWriteProcessor() throws 
FileNodeProcessorException {
-    if (bufferWriteProcessor == null) {
-      LOGGER.error("The bufferwrite processor is null when get the 
bufferwriteProcessor");
-      throw new FileNodeProcessorException("The bufferwrite processor is 
null");
     }
     return bufferWriteProcessor;
   }
@@ -565,6 +561,8 @@ public class FileNodeProcessor extends Processor implements 
IStatistic {
           .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, 
flushFileNodeProcessorAction);
       overflowProcessor = new OverflowProcessor(processorName, params, 
fileSchema,
           versionController);
+    } else if (overflowProcessor.isClosed()){
+      overflowProcessor.reopen();
     }
     return overflowProcessor;
   }
@@ -573,25 +571,16 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
    * get overflow processor.
    */
   public OverflowProcessor getOverflowProcessor() {
-    if (overflowProcessor == null) {
-      LOGGER.error("The overflow processor is null when getting the 
overflowProcessor");
+    if (overflowProcessor == null || overflowProcessor.isClosed()) {
+      LOGGER.error("The overflow processor is null or closed when getting the 
overflowProcessor");
     }
     return overflowProcessor;
   }
 
   public boolean hasOverflowProcessor() {
-    return overflowProcessor != null;
+    return overflowProcessor != null && !overflowProcessor.isClosed();
   }
 
-  public void setBufferwriteProcessroToClosed() {
-
-    bufferWriteProcessor = null;
-  }
-
-  public boolean hasBufferwriteProcessor() {
-
-    return bufferWriteProcessor != null;
-  }
 
   /**
    * set last update time.
@@ -797,18 +786,13 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
         && !newFileNodes.get(newFileNodes.size() - 
1).getStartTimeMap().isEmpty()) {
       unsealedTsFile = new UnsealedTsFile();
       unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() - 
1).getFilePath());
-      if (bufferWriteProcessor == null) {
-        LOGGER.error(
-            "The last of tsfile {} in filenode processor {} is not closed, "
-                + "but the bufferwrite processor is null.",
-            newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), 
getProcessorName());
-        throw new FileNodeProcessorException(String.format(
-            "The last of tsfile %s in filenode processor %s is not closed, "
-                + "but the bufferwrite processor is null.",
-            newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), 
getProcessorName()));
+
+      try {
+        bufferwritedata = bufferWriteProcessor
+            .queryBufferWriteData(deviceId, measurementId, dataType, 
mSchema.getProps());
+      } catch (BufferWriteProcessorException e) {
+        throw new FileNodeProcessorException(e);
       }
-      bufferwritedata = bufferWriteProcessor
-          .queryBufferWriteData(deviceId, measurementId, dataType, 
mSchema.getProps());
 
       try {
         List<Modification> pathModifications = context.getPathModifications(
@@ -954,7 +938,7 @@ public class FileNodeProcessor extends Processor implements 
IStatistic {
     }
     lastMergeTime = System.currentTimeMillis();
 
-    if (overflowProcessor != null) {
+    if (overflowProcessor != null && !overflowProcessor.isClosed()) {
       if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
           .getConfig().getOverflowFileSizeThreshold()) {
         if (LOGGER.isInfoEnabled()) {
@@ -1082,6 +1066,9 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
     emptyTsFileResource.clear();
     // attention
     try {
+      if (overflowProcessor.isClosed()) {
+        overflowProcessor.reopen();
+      }
       overflowProcessor.switchWorkToMerge();
     } catch (IOException e) {
       LOGGER.error("The filenode processor {} can't switch overflow processor 
from work to merge.",
@@ -1463,6 +1450,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
     // losing some modification.
     mergeDeleteLock.lock();
     QueryContext context = new QueryContext();
+
     try {
       
FileReaderManager.getInstance().increaseFileReaderReference(backupIntervalFile.getFilePath(),
           true);
@@ -1489,6 +1477,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
           // query one measurement in the special deviceId
           String measurementId = path.getMeasurement();
           TSDataType dataType = mManager.getSeriesType(path.getFullPath());
+
           OverflowSeriesDataSource overflowSeriesDataSource = 
overflowProcessor.queryMerge(deviceId,
               measurementId, dataType, true, context);
           Filter timeFilter = FilterFactory
@@ -1714,10 +1703,10 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
   public FileNodeFlushFuture flush() throws IOException {
     Future<Boolean> bufferWriteFlushFuture = null;
     Future<Boolean> overflowFlushFuture = null;
-    if (bufferWriteProcessor != null) {
+    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
       bufferWriteFlushFuture = bufferWriteProcessor.flush();
     }
-    if (overflowProcessor != null) {
+    if (overflowProcessor != null && !overflowProcessor.isClosed()) {
       overflowFlushFuture = overflowProcessor.flush();
     }
     return new FileNodeFlushFuture(bufferWriteFlushFuture, 
overflowFlushFuture);
@@ -1727,7 +1716,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
    * Close the bufferwrite processor.
    */
   public void closeBufferWrite() throws FileNodeProcessorException {
-    if (bufferWriteProcessor == null) {
+    if (bufferWriteProcessor == null || bufferWriteProcessor.isClosed()) {
       return;
     }
     try {
@@ -1735,7 +1724,6 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
         waitForBufferWriteClose();
       }
       bufferWriteProcessor.close();
-      bufferWriteProcessor = null;
     } catch (BufferWriteProcessorException e) {
       throw new FileNodeProcessorException(e);
     }
@@ -1756,7 +1744,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
    * Close the overflow processor.
    */
   public void closeOverflow() throws FileNodeProcessorException {
-    if (overflowProcessor == null) {
+    if (overflowProcessor == null || overflowProcessor.isClosed()) {
       return;
     }
     try {
@@ -1764,9 +1752,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
         waitForOverflowClose();
       }
       overflowProcessor.close();
-      overflowProcessor.clear();
-      overflowProcessor = null;
-    } catch (OverflowProcessorException | IOException e) {
+    } catch (OverflowProcessorException e) {
       throw new FileNodeProcessorException(e);
     }
   }
@@ -1897,7 +1883,7 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
       // delete data in memory
       OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
       ofProcessor.delete(deviceId, measurementId, timestamp, version, 
updatedModFiles);
-      if (bufferWriteProcessor != null) {
+      if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
         bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
       }
     } catch (Exception e) {
@@ -1945,8 +1931,12 @@ public class FileNodeProcessor extends Processor 
implements IStatistic {
       }
       throw e;
     }
-    if (bufferWriteProcessor != null) {
-      bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+    if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+      try {
+        bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+      } catch (BufferWriteProcessorException e) {
+        throw new IOException(e);
+      }
     }
   }
 
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
index 503afa3..e490044 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
@@ -52,8 +52,10 @@ public abstract class BasicMemController implements IService 
{
       case JVM:
         return JVMMemController.getInstance();
       case RECORD:
-      default:
         return RecordMemController.getInstance();
+      case DISABLED:
+      default:
+        return DisabledMemController.getInstance();
     }
   }
 
@@ -175,7 +177,7 @@ public abstract class BasicMemController implements 
IService {
   public abstract void releaseUsage(Object user, long freeSize);
 
   public enum ControllerType {
-    RECORD, JVM
+    RECORD, JVM, DISABLED
   }
 
   public enum UsageLevel {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/DisabledMemController.java
similarity index 52%
copy from 
iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
copy to 
iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/DisabledMemController.java
index 213d3c0..225005c 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/DisabledMemController.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,25 +16,41 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.exception;
+package org.apache.iotdb.db.engine.memcontrol;
 
-public class FileNodeProcessorException extends ProcessorException {
+import org.apache.iotdb.db.conf.IoTDBConfig;
 
-  private static final long serialVersionUID = 7373978140952977661L;
+/**
+ * DisabledMemController is used when the overhead of memory control is too 
high.
+ */
+public class DisabledMemController extends BasicMemController {
+
+  DisabledMemController(IoTDBConfig config) {
+    super(config);
+  }
 
-  public FileNodeProcessorException() {
-    super();
+  @Override
+  public long getTotalUsage() {
+    return 0;
   }
 
-  public FileNodeProcessorException(PathErrorException pathExcp) {
-    super(pathExcp.getMessage());
+  @Override
+  public UsageLevel getCurrLevel() {
+    return UsageLevel.SAFE;
   }
 
-  public FileNodeProcessorException(String msg) {
-    super(msg);
+  @Override
+  public void clear() {
+
+  }
+
+  @Override
+  public UsageLevel acquireUsage(Object user, long usage) {
+    return UsageLevel.SAFE;
   }
 
-  public FileNodeProcessorException(Throwable throwable) {
-    super(throwable.getMessage());
+  @Override
+  public void releaseUsage(Object user, long freeSize) {
+
   }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 2e7302f..656932b 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -96,6 +96,9 @@ public class OverflowProcessor extends Processor {
   private WriteLogNode logNode;
   private VersionController versionController;
 
+  private boolean isClosed = true;
+  private boolean isFlush = false;
+
   public OverflowProcessor(String processorName, Map<String, Action> 
parameters,
       FileSchema fileSchema, VersionController versionController)
       throws IOException {
@@ -108,18 +111,13 @@ public class OverflowProcessor extends Processor {
       overflowDirPath = overflowDirPath + File.separatorChar;
     }
     this.parentPath = overflowDirPath + processorName;
-    File processorDataDir = new File(parentPath);
-    if (!processorDataDir.exists()) {
-      processorDataDir.mkdirs();
-    }
-    // recover file
-    recovery(processorDataDir);
-    // memory
-    workSupport = new OverflowMemtable();
+
     overflowFlushAction = 
parameters.get(FileNodeConstants.OVERFLOW_FLUSH_ACTION);
     filenodeFlushAction = parameters
         .get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
 
+    reopen();
+
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       logNode = MultiFileLogNodeManager.getInstance().getNode(
           processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX,
@@ -128,6 +126,33 @@ public class OverflowProcessor extends Processor {
     }
   }
 
+  public void reopen() throws IOException {
+    if (!isClosed) {
+      return;
+    }
+    // recover file
+    File processorDataDir = new File(parentPath);
+    if (!processorDataDir.exists()) {
+      processorDataDir.mkdirs();
+    }
+    recovery(processorDataDir);
+
+    // memory
+    if (workSupport == null) {
+      workSupport = new OverflowMemtable();
+    } else {
+      workSupport.clear();
+    }
+    isClosed = false;
+    isFlush = false;
+  }
+  public void checkOpen() throws OverflowProcessorException {
+    if (isClosed) {
+      throw new OverflowProcessorException("OverflowProcessor already closed");
+    }
+  }
+
+
   private void recovery(File parentFile) throws IOException {
     String[] subFilePaths = clearFile(parentFile.list());
     if (subFilePaths.length == 0) {
@@ -173,6 +198,11 @@ public class OverflowProcessor extends Processor {
    * insert one time-series record
    */
   public void insert(TSRecord tsRecord) throws IOException {
+    try {
+      checkOpen();
+    } catch (OverflowProcessorException e) {
+      throw new IOException(e);
+    }
     // memory control
     long memUage = MemUtils.getRecordSize(tsRecord);
     UsageLevel usageLevel = 
BasicMemController.getInstance().acquireUsage(this, memUage);
@@ -261,6 +291,11 @@ public class OverflowProcessor extends Processor {
    */
   public void delete(String deviceId, String measurementId, long timestamp, 
long version,
       List<ModificationFile> updatedModFiles) throws IOException {
+    try {
+      checkOpen();
+    } catch (OverflowProcessorException e) {
+      throw new IOException(e);
+    }
     workResource.delete(deviceId, measurementId, timestamp, version, 
updatedModFiles);
     workSupport.delete(deviceId, measurementId, timestamp, false);
     if (isFlush()) {
@@ -278,6 +313,11 @@ public class OverflowProcessor extends Processor {
   public OverflowSeriesDataSource query(String deviceId, String measurementId,
       TSDataType dataType, Map<String, String> props, QueryContext context)
       throws IOException {
+    try {
+      checkOpen();
+    } catch (OverflowProcessorException e) {
+      throw new IOException(e);
+    }
     queryFlushLock.lock();
     try {
       // query insert data in memory and unseqTsFiles
@@ -400,8 +440,10 @@ public class OverflowProcessor extends Processor {
   private void switchWorkToFlush() {
     queryFlushLock.lock();
     try {
+      OverflowMemtable temp = flushSupport == null ? new OverflowMemtable() : 
flushSupport;
       flushSupport = workSupport;
-      workSupport = new OverflowMemtable();
+      workSupport = temp;
+      isFlush = true;
     } finally {
       queryFlushLock.unlock();
     }
@@ -412,7 +454,7 @@ public class OverflowProcessor extends Processor {
     try {
       flushSupport.clear();
       workResource.appendMetadatas();
-      flushSupport = null;
+      isFlush = false;
     } finally {
       queryFlushLock.unlock();
     }
@@ -444,8 +486,7 @@ public class OverflowProcessor extends Processor {
   }
 
   public boolean isFlush() {
-    //see BufferWriteProcess.isFlush()
-    return  flushSupport != null;
+    return isFlush;
   }
 
   private boolean flushTask(String displayMessage) {
@@ -546,6 +587,9 @@ public class OverflowProcessor extends Processor {
 
   @Override
   public void close() throws OverflowProcessorException {
+    if (isClosed) {
+      return;
+    }
     LOGGER.info("The overflow processor {} starts close operation.", 
getProcessorName());
     long closeStartTime = System.currentTimeMillis();
     // flush data
@@ -570,14 +614,22 @@ public class OverflowProcessor extends Processor {
           DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
           closeEndTime - closeStartTime);
     }
+    try {
+      clear();
+    } catch (IOException e) {
+      throw new OverflowProcessorException(e);
+    }
+    isClosed = true;
   }
 
   public void clear() throws IOException {
     if (workResource != null) {
       workResource.close();
+      workResource = null;
     }
     if (mergeResource != null) {
       mergeResource.close();
+      mergeResource = null;
     }
   }
 
@@ -705,4 +757,8 @@ public class OverflowProcessor extends Processor {
   public String toString() {
     return "OverflowProcessor in " + parentPath;
   }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
 }
\ No newline at end of file
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
index 213d3c0..d3cf362 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
@@ -37,4 +37,8 @@ public class FileNodeProcessorException extends 
ProcessorException {
   public FileNodeProcessorException(Throwable throwable) {
     super(throwable.getMessage());
   }
+
+  public FileNodeProcessorException(String msg, Throwable e) {
+    super(msg, e);
+  }
 }
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
index faa7a9a..9877f76 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
@@ -116,6 +116,7 @@ public class OverflowProcessorTest {
     }
     // flush synchronously
     processor.close();
+    processor.reopen();
     overflowSeriesDataSource = processor
         .query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
             OverflowTestUtils.dataType1, Collections.emptyMap(), context);
@@ -153,6 +154,7 @@ public class OverflowProcessorTest {
     processor.close();
     QueryContext context = new QueryContext();
     // test query
+    processor.reopen();
     OverflowSeriesDataSource overflowSeriesDataSource = 
processor.query(OverflowTestUtils.deviceId1,
         OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2, 
Collections.emptyMap(),
         context);

Reply via email to