This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4f6aa05 Add disabled mem control & improve memory efficiency (#149)
4f6aa05 is described below
commit 4f6aa057f9ef44c05f5101f683354dff9bd6f4f7
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Apr 24 14:41:49 2019 +0800
Add disabled mem control & improve memory efficiency (#149)
* add DisabledMemController
* fix method name
* reuse BufferWriteProcessor and OverflowProcessor to reduce memory waste.
* fix initialized isClose
* fix reopen
* fix closed Overflowprocessor calling switchWorkToMerge
---
iotdb/iotdb/conf/iotdb-engine.properties | 1 +
.../engine/bufferwrite/BufferWriteProcessor.java | 135 +++++++++++----------
.../db/engine/filenode/FileNodeProcessor.java | 88 ++++++--------
.../db/engine/memcontrol/BasicMemController.java | 6 +-
.../memcontrol/DisabledMemController.java} | 40 ++++--
.../db/engine/overflow/io/OverflowProcessor.java | 80 ++++++++++--
.../db/exception/FileNodeProcessorException.java | 4 +
.../engine/overflow/io/OverflowProcessorTest.java | 2 +
8 files changed, 216 insertions(+), 140 deletions(-)
diff --git a/iotdb/iotdb/conf/iotdb-engine.properties
b/iotdb/iotdb/conf/iotdb-engine.properties
index 5c7046f..c91cec7 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -146,6 +146,7 @@ mem_monitor_interval=1000
# Decide how to control memory used by inserting data.
# 0 is RecordMemController, which count the size of every record (tuple).
# 1 is JVMMemController, which use JVM heap memory as threshold.
+# 2 is DisabledMemController, which does not control memory usage.
mem_controller_type=0
# When a bufferwrite's metadata size (in byte) exceed this, the bufferwrite is
forced closed.
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 2bc394b..e770e85 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -84,6 +84,9 @@ public class BufferWriteProcessor extends Processor {
private WriteLogNode logNode;
private VersionController versionController;
+ private boolean isClosed = true;
+ private boolean isFlush = false;
+
/**
* constructor of BufferWriteProcessor.
*
@@ -100,13 +103,37 @@ public class BufferWriteProcessor extends Processor {
super(processorName);
this.fileSchema = fileSchema;
this.baseDir = baseDir;
- this.fileName = fileName;
+ bufferwriteFlushAction =
parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
+ bufferwriteCloseAction =
parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
+ filenodeFlushAction =
parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+
+
+ reopen(fileName);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+ try {
+ logNode = MultiFileLogNodeManager.getInstance().getNode(
+ processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
+ getBufferwriteRestoreFilePath(),
+ FileNodeManager.getInstance().getRestoreFilePath(processorName));
+ } catch (IOException e) {
+ throw new BufferWriteProcessorException(e);
+ }
+ }
+ this.versionController = versionController;
+
+ }
+
+ public void reopen(String fileName) throws BufferWriteProcessorException {
+ if (!isClosed) {
+ return;
+ }
+ this.fileName = fileName;
String bDir = baseDir;
if (bDir.length() > 0 && bDir.charAt(bDir.length() - 1) !=
File.separatorChar) {
bDir = bDir + File.separatorChar;
}
- String dataDirPath = bDir + processorName;
+ String dataDirPath = bDir + getProcessorName();
File dataDir = new File(dataDirPath);
if (!dataDir.exists()) {
dataDir.mkdirs();
@@ -114,29 +141,25 @@ public class BufferWriteProcessor extends Processor {
dataDirPath);
}
this.insertFilePath = new File(dataDir, fileName).getPath();
- bufferWriteRelativePath = processorName + File.separatorChar + fileName;
+ bufferWriteRelativePath = getProcessorName() + File.separatorChar +
fileName;
try {
- writer = new RestorableTsFileIOWriter(processorName, insertFilePath);
+ writer = new RestorableTsFileIOWriter(getProcessorName(),
insertFilePath);
} catch (IOException e) {
throw new BufferWriteProcessorException(e);
}
+ if (workMemTable == null) {
+ workMemTable = new PrimitiveMemTable();
+ } else {
+ workMemTable.clear();
+ }
+ isClosed = false;
+ isFlush = false;
+ }
- bufferwriteFlushAction =
parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
- bufferwriteCloseAction =
parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
- filenodeFlushAction =
parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
- workMemTable = new PrimitiveMemTable();
-
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- try {
- logNode = MultiFileLogNodeManager.getInstance().getNode(
- processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
- getBufferwriteRestoreFilePath(),
- FileNodeManager.getInstance().getRestoreFilePath(processorName));
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
+ public void checkOpen() throws BufferWriteProcessorException {
+ if (isClosed) {
+ throw new BufferWriteProcessorException("BufferWriteProcessor already
closed");
}
- this.versionController = versionController;
}
/**
@@ -153,6 +176,7 @@ public class BufferWriteProcessor extends Processor {
public boolean write(String deviceId, String measurementId, long timestamp,
TSDataType dataType,
String value)
throws BufferWriteProcessorException {
+ checkOpen();
TSRecord record = new TSRecord(timestamp, deviceId);
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId,
value);
record.addTuple(dataPoint);
@@ -168,6 +192,7 @@ public class BufferWriteProcessor extends Processor {
* @throws BufferWriteProcessorException if a flushing operation occurs and
failed.
*/
public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException
{
+ checkOpen();
long memUsage = MemUtils.getRecordSize(tsRecord);
BasicMemController.UsageLevel level = BasicMemController.getInstance()
.acquireUsage(this, memUsage);
@@ -233,7 +258,9 @@ public class BufferWriteProcessor extends Processor {
* @return corresponding chunk data and chunk metadata in memory
*/
public Pair<ReadOnlyMemChunk, List<ChunkMetaData>>
queryBufferWriteData(String deviceId,
- String measurementId, TSDataType dataType, Map<String, String> props) {
+ String measurementId, TSDataType dataType, Map<String, String> props)
+ throws BufferWriteProcessorException {
+ checkOpen();
flushQueryLock.lock();
try {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
@@ -255,10 +282,10 @@ public class BufferWriteProcessor extends Processor {
private void switchWorkToFlush() {
flushQueryLock.lock();
try {
- if (flushMemTable == null) {
- flushMemTable = workMemTable;
- workMemTable = new PrimitiveMemTable();
- }
+ IMemTable temp = flushMemTable == null ? new PrimitiveMemTable() :
flushMemTable;
+ flushMemTable = workMemTable;
+ workMemTable = temp;
+ isFlush = true;
} finally {
flushQueryLock.unlock();
}
@@ -268,8 +295,8 @@ public class BufferWriteProcessor extends Processor {
flushQueryLock.lock();
try {
flushMemTable.clear();
- flushMemTable = null;
writer.appendMetadata();
+ isFlush = false;
} finally {
flushQueryLock.unlock();
}
@@ -329,6 +356,9 @@ public class BufferWriteProcessor extends Processor {
// keyword synchronized is added in this method, so that only one flush task
can be submitted now.
@Override
public synchronized Future<Boolean> flush() throws IOException {
+ if (isClosed) {
+ throw new IOException("BufferWriteProcessor closed");
+ }
// statistic information for flush
if (lastFlushTime > 0) {
if (LOGGER.isInfoEnabled()) {
@@ -381,12 +411,18 @@ public class BufferWriteProcessor extends Processor {
@Override
public void close() throws BufferWriteProcessorException {
+ if (isClosed) {
+ return;
+ }
try {
long closeStartTime = System.currentTimeMillis();
// flush data and wait for finishing flush
flush().get();
// end file
writer.endFile(fileSchema);
+ writer = null;
+ workMemTable.clear();
+
// update the IntervalFile for interval list
bufferwriteCloseAction.act();
// flush the changed information for filenode
@@ -402,6 +438,7 @@ public class BufferWriteProcessor extends Processor {
DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
closeEndTime - closeStartTime);
}
+ isClosed = true;
} catch (IOException e) {
LOGGER.error("Close the bufferwrite processor error, the bufferwrite is
{}.",
getProcessorName(), e);
@@ -424,13 +461,7 @@ public class BufferWriteProcessor extends Processor {
* @return True if flushing
*/
public boolean isFlush() {
- // starting a flush task has two steps: set the flushMemtable, and then
set the flushFuture
- // So, the following case exists: flushMemtable != null but flushFuture is
done (because the
- // flushFuture refers to the last finished flush.
- // And, the following case exists,too: flushMemtable == null, but
flushFuture is not done.
- // (flushTask() is not finished, but switchToWork() has done)
- // So, checking flushMemTable is more meaningful than flushFuture.isDone().
- return flushMemTable != null;
+ return isFlush;
}
/**
@@ -454,38 +485,6 @@ public class BufferWriteProcessor extends Processor {
return file.length() + memoryUsage();
}
- /**
- * Close current TsFile and open a new one for future writes. Block new
writes and wait until
- * current writes finish.
- */
- public void rollToNewFile() {
- // TODO : [MemControl] implement this
- }
-
- /**
- * Check if this TsFile has too big metadata or file. If true, close current
file and open a new
- * one.
- */
- private boolean checkSize() {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- long metaSize = getMetaSize();
- long fileSize = getFileSize();
- if (metaSize >= config.getBufferwriteMetaSizeThreshold()
- || fileSize >= config.getBufferwriteFileSizeThreshold()) {
- LOGGER.info(
- "The bufferwrite processor {}, size({}) of the file {} reaches
threshold {}, "
- + "size({}) of metadata reaches threshold {}.",
- getProcessorName(), MemUtils.bytesCntToStr(fileSize), this.fileName,
- MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()),
- MemUtils.bytesCntToStr(metaSize),
- MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()));
-
- rollToNewFile();
- return true;
- }
- return false;
- }
-
public String getBaseDir() {
return baseDir;
}
@@ -538,7 +537,9 @@ public class BufferWriteProcessor extends Processor {
* @param measurementId the measurementId of the timeseries to be deleted.
* @param timestamp the upper-bound of deletion time.
*/
- public void delete(String deviceId, String measurementId, long timestamp) {
+ public void delete(String deviceId, String measurementId, long timestamp)
+ throws BufferWriteProcessorException {
+ checkOpen();
workMemTable.delete(deviceId, measurementId, timestamp);
if (isFlush()) {
// flushing MemTable cannot be directly modified since another thread is
reading it
@@ -572,4 +573,8 @@ public class BufferWriteProcessor extends Processor {
public String toString() {
return "BufferWriteProcessor in " + insertFilePath;
}
+
+ public boolean isClosed() {
+ return isClosed;
+ }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 47aca8e..d97594c 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -534,21 +534,17 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
+ System.currentTimeMillis(),
params, versionController, fileSchema);
} catch (BufferWriteProcessorException e) {
- LOGGER.error("The filenode processor {} failed to get the bufferwrite
processor.",
- processorName, e);
- throw new FileNodeProcessorException(e);
+ throw new FileNodeProcessorException(String
+ .format("The filenode processor %s failed to get the bufferwrite
processor.",
+ processorName),e);
+ }
+ } else if (bufferWriteProcessor.isClosed()){
+ try {
+ bufferWriteProcessor.reopen(insertTime +
FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+ + System.currentTimeMillis());
+ } catch (BufferWriteProcessorException e) {
+ throw new FileNodeProcessorException("Cannot reopen
BufferWriteProcessor", e);
}
- }
- return bufferWriteProcessor;
- }
-
- /**
- * get buffer write processor.
- */
- public BufferWriteProcessor getBufferWriteProcessor() throws
FileNodeProcessorException {
- if (bufferWriteProcessor == null) {
- LOGGER.error("The bufferwrite processor is null when get the
bufferwriteProcessor");
- throw new FileNodeProcessorException("The bufferwrite processor is
null");
}
return bufferWriteProcessor;
}
@@ -565,6 +561,8 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
overflowProcessor = new OverflowProcessor(processorName, params,
fileSchema,
versionController);
+ } else if (overflowProcessor.isClosed()){
+ overflowProcessor.reopen();
}
return overflowProcessor;
}
@@ -573,25 +571,16 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
* get overflow processor.
*/
public OverflowProcessor getOverflowProcessor() {
- if (overflowProcessor == null) {
- LOGGER.error("The overflow processor is null when getting the
overflowProcessor");
+ if (overflowProcessor == null || overflowProcessor.isClosed()) {
+ LOGGER.error("The overflow processor is null or closed when getting the
overflowProcessor");
}
return overflowProcessor;
}
public boolean hasOverflowProcessor() {
- return overflowProcessor != null;
+ return overflowProcessor != null && !overflowProcessor.isClosed();
}
- public void setBufferwriteProcessroToClosed() {
-
- bufferWriteProcessor = null;
- }
-
- public boolean hasBufferwriteProcessor() {
-
- return bufferWriteProcessor != null;
- }
/**
* set last update time.
@@ -797,18 +786,13 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
&& !newFileNodes.get(newFileNodes.size() -
1).getStartTimeMap().isEmpty()) {
unsealedTsFile = new UnsealedTsFile();
unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() -
1).getFilePath());
- if (bufferWriteProcessor == null) {
- LOGGER.error(
- "The last of tsfile {} in filenode processor {} is not closed, "
- + "but the bufferwrite processor is null.",
- newFileNodes.get(newFileNodes.size() - 1).getRelativePath(),
getProcessorName());
- throw new FileNodeProcessorException(String.format(
- "The last of tsfile %s in filenode processor %s is not closed, "
- + "but the bufferwrite processor is null.",
- newFileNodes.get(newFileNodes.size() - 1).getRelativePath(),
getProcessorName()));
+
+ try {
+ bufferwritedata = bufferWriteProcessor
+ .queryBufferWriteData(deviceId, measurementId, dataType,
mSchema.getProps());
+ } catch (BufferWriteProcessorException e) {
+ throw new FileNodeProcessorException(e);
}
- bufferwritedata = bufferWriteProcessor
- .queryBufferWriteData(deviceId, measurementId, dataType,
mSchema.getProps());
try {
List<Modification> pathModifications = context.getPathModifications(
@@ -954,7 +938,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
}
lastMergeTime = System.currentTimeMillis();
- if (overflowProcessor != null) {
+ if (overflowProcessor != null && !overflowProcessor.isClosed()) {
if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
.getConfig().getOverflowFileSizeThreshold()) {
if (LOGGER.isInfoEnabled()) {
@@ -1082,6 +1066,9 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
emptyTsFileResource.clear();
// attention
try {
+ if (overflowProcessor.isClosed()) {
+ overflowProcessor.reopen();
+ }
overflowProcessor.switchWorkToMerge();
} catch (IOException e) {
LOGGER.error("The filenode processor {} can't switch overflow processor
from work to merge.",
@@ -1463,6 +1450,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// losing some modification.
mergeDeleteLock.lock();
QueryContext context = new QueryContext();
+
try {
FileReaderManager.getInstance().increaseFileReaderReference(backupIntervalFile.getFilePath(),
true);
@@ -1489,6 +1477,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// query one measurement in the special deviceId
String measurementId = path.getMeasurement();
TSDataType dataType = mManager.getSeriesType(path.getFullPath());
+
OverflowSeriesDataSource overflowSeriesDataSource =
overflowProcessor.queryMerge(deviceId,
measurementId, dataType, true, context);
Filter timeFilter = FilterFactory
@@ -1714,10 +1703,10 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
public FileNodeFlushFuture flush() throws IOException {
Future<Boolean> bufferWriteFlushFuture = null;
Future<Boolean> overflowFlushFuture = null;
- if (bufferWriteProcessor != null) {
+ if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
bufferWriteFlushFuture = bufferWriteProcessor.flush();
}
- if (overflowProcessor != null) {
+ if (overflowProcessor != null && !overflowProcessor.isClosed()) {
overflowFlushFuture = overflowProcessor.flush();
}
return new FileNodeFlushFuture(bufferWriteFlushFuture,
overflowFlushFuture);
@@ -1727,7 +1716,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
* Close the bufferwrite processor.
*/
public void closeBufferWrite() throws FileNodeProcessorException {
- if (bufferWriteProcessor == null) {
+ if (bufferWriteProcessor == null || bufferWriteProcessor.isClosed()) {
return;
}
try {
@@ -1735,7 +1724,6 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
waitForBufferWriteClose();
}
bufferWriteProcessor.close();
- bufferWriteProcessor = null;
} catch (BufferWriteProcessorException e) {
throw new FileNodeProcessorException(e);
}
@@ -1756,7 +1744,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
* Close the overflow processor.
*/
public void closeOverflow() throws FileNodeProcessorException {
- if (overflowProcessor == null) {
+ if (overflowProcessor == null || overflowProcessor.isClosed()) {
return;
}
try {
@@ -1764,9 +1752,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
waitForOverflowClose();
}
overflowProcessor.close();
- overflowProcessor.clear();
- overflowProcessor = null;
- } catch (OverflowProcessorException | IOException e) {
+ } catch (OverflowProcessorException e) {
throw new FileNodeProcessorException(e);
}
}
@@ -1897,7 +1883,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// delete data in memory
OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
ofProcessor.delete(deviceId, measurementId, timestamp, version,
updatedModFiles);
- if (bufferWriteProcessor != null) {
+ if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
}
} catch (Exception e) {
@@ -1945,8 +1931,12 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
throw e;
}
- if (bufferWriteProcessor != null) {
- bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+ if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+ try {
+ bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+ } catch (BufferWriteProcessorException e) {
+ throw new IOException(e);
+ }
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
index 503afa3..e490044 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
@@ -52,8 +52,10 @@ public abstract class BasicMemController implements IService
{
case JVM:
return JVMMemController.getInstance();
case RECORD:
- default:
return RecordMemController.getInstance();
+ case DISABLED:
+ default:
+ return DisabledMemController.getInstance();
}
}
@@ -175,7 +177,7 @@ public abstract class BasicMemController implements
IService {
public abstract void releaseUsage(Object user, long freeSize);
public enum ControllerType {
- RECORD, JVM
+ RECORD, JVM, DISABLED
}
public enum UsageLevel {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/DisabledMemController.java
similarity index 52%
copy from
iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
copy to
iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/DisabledMemController.java
index 213d3c0..225005c 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/DisabledMemController.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,25 +16,41 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.exception;
+package org.apache.iotdb.db.engine.memcontrol;
-public class FileNodeProcessorException extends ProcessorException {
+import org.apache.iotdb.db.conf.IoTDBConfig;
- private static final long serialVersionUID = 7373978140952977661L;
+/**
+ * DisabledMemController is used when the overhead of memory control is too
high.
+ */
+public class DisabledMemController extends BasicMemController {
+
+ DisabledMemController(IoTDBConfig config) {
+ super(config);
+ }
- public FileNodeProcessorException() {
- super();
+ @Override
+ public long getTotalUsage() {
+ return 0;
}
- public FileNodeProcessorException(PathErrorException pathExcp) {
- super(pathExcp.getMessage());
+ @Override
+ public UsageLevel getCurrLevel() {
+ return UsageLevel.SAFE;
}
- public FileNodeProcessorException(String msg) {
- super(msg);
+ @Override
+ public void clear() {
+
+ }
+
+ @Override
+ public UsageLevel acquireUsage(Object user, long usage) {
+ return UsageLevel.SAFE;
}
- public FileNodeProcessorException(Throwable throwable) {
- super(throwable.getMessage());
+ @Override
+ public void releaseUsage(Object user, long freeSize) {
+
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 2e7302f..656932b 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -96,6 +96,9 @@ public class OverflowProcessor extends Processor {
private WriteLogNode logNode;
private VersionController versionController;
+ private boolean isClosed = true;
+ private boolean isFlush = false;
+
public OverflowProcessor(String processorName, Map<String, Action>
parameters,
FileSchema fileSchema, VersionController versionController)
throws IOException {
@@ -108,18 +111,13 @@ public class OverflowProcessor extends Processor {
overflowDirPath = overflowDirPath + File.separatorChar;
}
this.parentPath = overflowDirPath + processorName;
- File processorDataDir = new File(parentPath);
- if (!processorDataDir.exists()) {
- processorDataDir.mkdirs();
- }
- // recover file
- recovery(processorDataDir);
- // memory
- workSupport = new OverflowMemtable();
+
overflowFlushAction =
parameters.get(FileNodeConstants.OVERFLOW_FLUSH_ACTION);
filenodeFlushAction = parameters
.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+ reopen();
+
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
logNode = MultiFileLogNodeManager.getInstance().getNode(
processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX,
@@ -128,6 +126,33 @@ public class OverflowProcessor extends Processor {
}
}
+ public void reopen() throws IOException {
+ if (!isClosed) {
+ return;
+ }
+ // recover file
+ File processorDataDir = new File(parentPath);
+ if (!processorDataDir.exists()) {
+ processorDataDir.mkdirs();
+ }
+ recovery(processorDataDir);
+
+ // memory
+ if (workSupport == null) {
+ workSupport = new OverflowMemtable();
+ } else {
+ workSupport.clear();
+ }
+ isClosed = false;
+ isFlush = false;
+ }
+ public void checkOpen() throws OverflowProcessorException {
+ if (isClosed) {
+ throw new OverflowProcessorException("OverflowProcessor already closed");
+ }
+ }
+
+
private void recovery(File parentFile) throws IOException {
String[] subFilePaths = clearFile(parentFile.list());
if (subFilePaths.length == 0) {
@@ -173,6 +198,11 @@ public class OverflowProcessor extends Processor {
* insert one time-series record
*/
public void insert(TSRecord tsRecord) throws IOException {
+ try {
+ checkOpen();
+ } catch (OverflowProcessorException e) {
+ throw new IOException(e);
+ }
// memory control
long memUage = MemUtils.getRecordSize(tsRecord);
UsageLevel usageLevel =
BasicMemController.getInstance().acquireUsage(this, memUage);
@@ -261,6 +291,11 @@ public class OverflowProcessor extends Processor {
*/
public void delete(String deviceId, String measurementId, long timestamp,
long version,
List<ModificationFile> updatedModFiles) throws IOException {
+ try {
+ checkOpen();
+ } catch (OverflowProcessorException e) {
+ throw new IOException(e);
+ }
workResource.delete(deviceId, measurementId, timestamp, version,
updatedModFiles);
workSupport.delete(deviceId, measurementId, timestamp, false);
if (isFlush()) {
@@ -278,6 +313,11 @@ public class OverflowProcessor extends Processor {
public OverflowSeriesDataSource query(String deviceId, String measurementId,
TSDataType dataType, Map<String, String> props, QueryContext context)
throws IOException {
+ try {
+ checkOpen();
+ } catch (OverflowProcessorException e) {
+ throw new IOException(e);
+ }
queryFlushLock.lock();
try {
// query insert data in memory and unseqTsFiles
@@ -400,8 +440,10 @@ public class OverflowProcessor extends Processor {
private void switchWorkToFlush() {
queryFlushLock.lock();
try {
+ OverflowMemtable temp = flushSupport == null ? new OverflowMemtable() :
flushSupport;
flushSupport = workSupport;
- workSupport = new OverflowMemtable();
+ workSupport = temp;
+ isFlush = true;
} finally {
queryFlushLock.unlock();
}
@@ -412,7 +454,7 @@ public class OverflowProcessor extends Processor {
try {
flushSupport.clear();
workResource.appendMetadatas();
- flushSupport = null;
+ isFlush = false;
} finally {
queryFlushLock.unlock();
}
@@ -444,8 +486,7 @@ public class OverflowProcessor extends Processor {
}
public boolean isFlush() {
- //see BufferWriteProcess.isFlush()
- return flushSupport != null;
+ return isFlush;
}
private boolean flushTask(String displayMessage) {
@@ -546,6 +587,9 @@ public class OverflowProcessor extends Processor {
@Override
public void close() throws OverflowProcessorException {
+ if (isClosed) {
+ return;
+ }
LOGGER.info("The overflow processor {} starts close operation.",
getProcessorName());
long closeStartTime = System.currentTimeMillis();
// flush data
@@ -570,14 +614,22 @@ public class OverflowProcessor extends Processor {
DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
closeEndTime - closeStartTime);
}
+ try {
+ clear();
+ } catch (IOException e) {
+ throw new OverflowProcessorException(e);
+ }
+ isClosed = true;
}
public void clear() throws IOException {
if (workResource != null) {
workResource.close();
+ workResource = null;
}
if (mergeResource != null) {
mergeResource.close();
+ mergeResource = null;
}
}
@@ -705,4 +757,8 @@ public class OverflowProcessor extends Processor {
public String toString() {
return "OverflowProcessor in " + parentPath;
}
+
+ public boolean isClosed() {
+ return isClosed;
+ }
}
\ No newline at end of file
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
index 213d3c0..d3cf362 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
@@ -37,4 +37,8 @@ public class FileNodeProcessorException extends
ProcessorException {
public FileNodeProcessorException(Throwable throwable) {
super(throwable.getMessage());
}
+
+ public FileNodeProcessorException(String msg, Throwable e) {
+ super(msg, e);
+ }
}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
index faa7a9a..9877f76 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
@@ -116,6 +116,7 @@ public class OverflowProcessorTest {
}
// flush synchronously
processor.close();
+ processor.reopen();
overflowSeriesDataSource = processor
.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
OverflowTestUtils.dataType1, Collections.emptyMap(), context);
@@ -153,6 +154,7 @@ public class OverflowProcessorTest {
processor.close();
QueryContext context = new QueryContext();
// test query
+ processor.reopen();
OverflowSeriesDataSource overflowSeriesDataSource =
processor.query(OverflowTestUtils.deviceId1,
OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2,
Collections.emptyMap(),
context);