This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ad993b8 [IOTDB-1083] There is no need to get write lock each time
when we try to get or create TsFileProcessor (#2353)
ad993b8 is described below
commit ad993b8d6ddb5aa375385a0e278ce9701c1c2ae5
Author: Qi Yu <[email protected]>
AuthorDate: Fri Jan 29 15:06:44 2021 +0800
[IOTDB-1083] There is no need to get write lock each time when we try to
get or create TsFileProcessor (#2353)
* [IOTDB-1083] There is no need to get write lock each time when we try to
get or create TsFileProcessor
---
.../engine/storagegroup/StorageGroupProcessor.java | 96 ++++++++++------------
1 file changed, 42 insertions(+), 54 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6ee1ee1..6f8d77f 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -969,7 +969,6 @@ public class StorageGroupProcessor {
long timePartitionId)
throws WriteProcessException {
TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, sequence);
-
if (tsFileProcessor == null) {
return;
}
@@ -1065,58 +1064,52 @@ public class StorageGroupProcessor {
boolean sequence)
throws IOException, DiskSpaceInsufficientException {
- TsFileProcessor res;
- // we have to ensure only one thread can change
workSequenceTsFileProcessors
- writeLock();
- try {
- res = tsFileProcessorTreeMap.get(timeRangeId);
- if (res == null) {
- // we have to remove oldest processor to control the num of the
memtables
- // TODO: use a method to control the number of memtables
- if (tsFileProcessorTreeMap.size()
- >=
IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition()) {
- Map.Entry<Long, TsFileProcessor> processorEntry =
tsFileProcessorTreeMap.firstEntry();
- logger.info(
- "will close a {} TsFile because too many active partitions ({} >
{}) in the storage group {},",
- sequence, tsFileProcessorTreeMap.size(),
-
IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition(),
- logicalStorageGroupName + "-" + virtualStorageGroupId);
- asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
- }
+ TsFileProcessor res = tsFileProcessorTreeMap.get(timeRangeId);
- // build new processor
- TsFileProcessor newProcessor = createTsFileProcessor(sequence,
timeRangeId);
- tsFileProcessorTreeMap.put(timeRangeId, newProcessor);
- tsFileManagement.add(newProcessor.getTsFileResource(), sequence);
- res = newProcessor;
+ if (null == res) {
+ // we have to remove oldest processor to control the num of the memtables
+ // TODO: use a method to control the number of memtables
+ if (tsFileProcessorTreeMap.size()
+ >=
IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition()) {
+ Map.Entry<Long, TsFileProcessor> processorEntry =
tsFileProcessorTreeMap.firstEntry();
+ logger.info(
+ "will close a {} TsFile because too many active partitions ({} >
{}) in the storage group {},",
+ sequence, tsFileProcessorTreeMap.size(),
+
IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition(),
+ logicalStorageGroupName);
+ asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
}
- } finally {
- // unlock in finally
- writeUnlock();
+ // build new processor
+ res = newTsFileProcessor(sequence, timeRangeId);
+ tsFileProcessorTreeMap.put(timeRangeId, res);
+ tsFileManagement.add(res.getTsFileResource(), sequence);
+
}
return res;
}
- private TsFileProcessor createTsFileProcessor(boolean sequence, long
timePartitionId)
+ private TsFileProcessor newTsFileProcessor(boolean sequence, long
timePartitionId)
throws IOException, DiskSpaceInsufficientException {
- String baseDir;
- if (sequence) {
- baseDir = DirectoryManager.getInstance().getNextFolderForSequenceFile();
- } else {
- baseDir =
DirectoryManager.getInstance().getNextFolderForUnSequenceFile();
- }
+ DirectoryManager directoryManager = DirectoryManager.getInstance();
+ String baseDir = sequence ? directoryManager.getNextFolderForSequenceFile()
+ : directoryManager.getNextFolderForUnSequenceFile();
fsFactory.getFile(baseDir + File.separator + logicalStorageGroupName,
virtualStorageGroupId)
.mkdirs();
- String filePath =
- baseDir + File.separator + logicalStorageGroupName + File.separator +
virtualStorageGroupId
- + File.separator + timePartitionId
- + File.separator
- + getNewTsFileName(timePartitionId);
+ String filePath = baseDir + File.separator + logicalStorageGroupName +
File.separator + virtualStorageGroupId
+ + File.separator + timePartitionId
+ + File.separator
+ + getNewTsFileName(timePartitionId);
+ return getTsFileProcessor(sequence, filePath, timePartitionId);
+ }
+
+ private TsFileProcessor getTsFileProcessor(boolean sequence,
+ String filePath,
+ long timePartitionId) throws
IOException {
TsFileProcessor tsFileProcessor;
if (sequence) {
tsFileProcessor = new TsFileProcessor(
@@ -1124,31 +1117,26 @@ public class StorageGroupProcessor {
fsFactory.getFileWithParent(filePath), storageGroupInfo,
this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true,
deviceNumInLastClosedTsFile);
- if (enableMemControl) {
- TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
- tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
- tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
- .getTsFileResource().calculateRamSize());
- }
} else {
tsFileProcessor = new TsFileProcessor(
logicalStorageGroupName + File.separator + virtualStorageGroupId,
fsFactory.getFileWithParent(filePath), storageGroupInfo,
this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false, deviceNumInLastClosedTsFile);
- if (enableMemControl) {
- TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
- tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
- tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
- .getTsFileResource().calculateRamSize());
- }
}
+
+ if (enableMemControl) {
+ TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
+ tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
+ this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
+ tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
+ .getTsFileResource().calculateRamSize());
+ }
+
tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
tsFileProcessor.addFlushListeners(customFlushListeners);
-
tsFileProcessor.setTimeRangeId(timePartitionId);
+
return tsFileProcessor;
}