This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch virtual_partition_2_merge
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/virtual_partition_2_merge by
this push:
new d79f309 refactor
d79f309 is described below
commit d79f309ffa1f837e9d853cd7c0a955bd6cda4c6e
Author: 151250176 <[email protected]>
AuthorDate: Tue Dec 22 15:47:50 2020 +0800
refactor
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 105 ++++++++++++---------
1 file changed, 62 insertions(+), 43 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 82cc286..864f75e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -398,7 +398,8 @@ public class StorageEngine implements IService {
private StorageGroupProcessor getStorageGroupProcessorByPath(PartialPath
storageGroupPath,
StorageGroupMNode storageGroupMNode)
throws StorageGroupProcessorException, StorageEngineException {
- VirtualStorageGroupManager virtualStorageGroupManager =
processorMap.get(storageGroupMNode.getPartialPath());
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap
+ .get(storageGroupMNode.getPartialPath());
if (virtualStorageGroupManager == null) {
// if finish recover
if (isAllSgReady.get()) {
@@ -413,7 +414,8 @@ public class StorageEngine implements IService {
} else {
// not finished recover, refuse the request
throw new StorageEngineException(
- "the sg " + storageGroupMNode.getPartialPath() + " may not ready
now, please wait and retry later",
+ "the sg " + storageGroupMNode.getPartialPath()
+ + " may not ready now, please wait and retry later",
TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
}
}
@@ -423,17 +425,17 @@ public class StorageEngine implements IService {
public StorageGroupProcessor buildNewStorageGroupProcessor(PartialPath
storageGroupPath,
StorageGroupMNode storageGroupMNode, String storageGroupName)
throws StorageGroupProcessorException {
- StorageGroupProcessor processor;
- logger.info("construct a processor instance, the storage group is
{}, Thread is {}",
- storageGroupPath, Thread.currentThread().getId());
- processor = new StorageGroupProcessor(systemDir + File.separator +
storageGroupPath,
- storageGroupName,
- fileFlushPolicy, storageGroupMNode.getFullPath());
- processor.setDataTTL(storageGroupMNode.getDataTTL());
- processor.setCustomFlushListeners(customFlushListeners);
- processor.setCustomCloseFileListeners(customCloseFileListeners);
- return processor;
- }
+ StorageGroupProcessor processor;
+ logger.info("construct a processor instance, the storage group is {},
Thread is {}",
+ storageGroupPath, Thread.currentThread().getId());
+ processor = new StorageGroupProcessor(systemDir + File.separator +
storageGroupPath,
+ storageGroupName,
+ fileFlushPolicy, storageGroupMNode.getFullPath());
+ processor.setDataTTL(storageGroupMNode.getDataTTL());
+ processor.setCustomFlushListeners(customFlushListeners);
+ processor.setCustomCloseFileListeners(customCloseFileListeners);
+ return processor;
+ }
private void waitAllSgReady(PartialPath storageGroupPath) throws
StorageEngineException {
if (isAllSgReady.get()) {
@@ -537,12 +539,13 @@ public class StorageEngine implements IService {
public void closeStorageGroupProcessor(PartialPath storageGroupPath, boolean
isSeq,
boolean isSync) {
- if(!processorMap.containsKey(storageGroupPath)){
+ if (!processorMap.containsKey(storageGroupPath)) {
return;
}
VirtualStorageGroupManager virtualStorageGroupManager =
processorMap.get(storageGroupPath);
- for (StorageGroupProcessor processor :
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+ for (StorageGroupProcessor processor : virtualStorageGroupManager
+ .getAllVirutalStorageGroupProcessor()) {
if (processor == null) {
continue;
}
@@ -593,12 +596,13 @@ public class StorageEngine implements IService {
boolean isSeq,
boolean isSync)
throws StorageGroupNotSetException {
- if(!processorMap.containsKey(storageGroupPath)){
+ if (!processorMap.containsKey(storageGroupPath)) {
throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
}
VirtualStorageGroupManager virtualStorageGroupManager =
processorMap.get(storageGroupPath);
- for (StorageGroupProcessor processor :
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+ for (StorageGroupProcessor processor : virtualStorageGroupManager
+ .getAllVirutalStorageGroupProcessor()) {
if (processor != null) {
logger
.info("async closing sg processor is called for closing {}, seq =
{}, partitionId = {}",
@@ -640,14 +644,14 @@ public class StorageEngine implements IService {
List<PartialPath> sgPaths =
IoTDB.metaManager.searchAllRelatedStorageGroups(path);
for (PartialPath storageGroupPath : sgPaths) {
// storage group has no data
- if(!processorMap.containsKey(storageGroupPath)){
+ if (!processorMap.containsKey(storageGroupPath)) {
continue;
}
PartialPath newPath = path.alterPrefixPath(storageGroupPath);
for (StorageGroupProcessor storageGroupProcessor :
processorMap.get(storageGroupPath)
.getAllVirutalStorageGroupProcessor()) {
- if(storageGroupProcessor != null){
+ if (storageGroupProcessor != null) {
storageGroupProcessor.delete(newPath, startTime, endTime,
planIndex);
}
}
@@ -666,14 +670,14 @@ public class StorageEngine implements IService {
List<PartialPath> sgPaths =
IoTDB.metaManager.searchAllRelatedStorageGroups(path);
for (PartialPath storageGroupPath : sgPaths) {
// storage group has no data
- if(!processorMap.containsKey(storageGroupPath)){
+ if (!processorMap.containsKey(storageGroupPath)) {
continue;
}
PartialPath newPath = path.alterPrefixPath(storageGroupPath);
for (StorageGroupProcessor storageGroupProcessor :
processorMap.get(storageGroupPath)
.getAllVirutalStorageGroupProcessor()) {
- if(storageGroupProcessor != null){
+ if (storageGroupProcessor != null) {
storageGroupProcessor.delete(newPath, Long.MIN_VALUE,
Long.MAX_VALUE, planIndex);
}
}
@@ -705,7 +709,8 @@ public class StorageEngine implements IService {
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
for (VirtualStorageGroupManager virtualStorageGroupManager :
processorMap.values()) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+ for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupManager
+ .getAllVirutalStorageGroupProcessor()) {
if (storageGroupProcessor != null) {
totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
}
@@ -725,7 +730,8 @@ public class StorageEngine implements IService {
"Current system mode is read only, does not support file upgrade");
}
for (VirtualStorageGroupManager virtualStorageGroupManager :
processorMap.values()) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+ for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupManager
+ .getAllVirutalStorageGroupProcessor()) {
if (storageGroupProcessor != null) {
storageGroupProcessor.upgrade();
}
@@ -744,7 +750,8 @@ public class StorageEngine implements IService {
}
for (VirtualStorageGroupManager virtualStorageGroupManager :
processorMap.values()) {
- for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+ for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupManager
+ .getAllVirutalStorageGroupProcessor()) {
if (storageGroupProcessor != null) {
storageGroupProcessor.merge(fullMerge);
}
@@ -787,25 +794,27 @@ public class StorageEngine implements IService {
public void setTTL(PartialPath storageGroup, long dataTTL) throws
StorageEngineException {
// storage group has no data
- if(!processorMap.containsKey(storageGroup)){
+ if (!processorMap.containsKey(storageGroup)) {
return;
}
- for(StorageGroupProcessor storageGroupProcessor :
processorMap.get(storageGroup).getAllVirutalStorageGroupProcessor()){
- if(storageGroupProcessor != null){
+ for (StorageGroupProcessor storageGroupProcessor :
processorMap.get(storageGroup)
+ .getAllVirutalStorageGroupProcessor()) {
+ if (storageGroupProcessor != null) {
storageGroupProcessor.setDataTTL(dataTTL);
}
}
}
public void deleteStorageGroup(PartialPath storageGroupPath) {
- if(!processorMap.containsKey(storageGroupPath)){
+ if (!processorMap.containsKey(storageGroupPath)) {
return;
}
deleteAllDataFilesInOneStorageGroup(storageGroupPath);
VirtualStorageGroupManager virtualStorageGroupManager =
processorMap.remove(storageGroupPath);
- for (StorageGroupProcessor processor :
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+ for (StorageGroupProcessor processor : virtualStorageGroupManager
+ .getAllVirutalStorageGroupProcessor()) {
if (processor != null) {
processor.deleteFolder(systemDir + File.pathSeparator +
storageGroupPath);
}
@@ -814,7 +823,8 @@ public class StorageEngine implements IService {
public void loadNewTsFileForSync(TsFileResource newTsFileResource)
throws StorageEngineException, LoadFileException, IllegalPathException {
- getProcessorDirectly(new
PartialPath(newTsFileResource.getTsFile().getParentFile().getParentFile().getParentFile().getName()))
+ getProcessorDirectly(new PartialPath(
+
newTsFileResource.getTsFile().getParentFile().getParentFile().getParentFile().getName()))
.loadNewTsFileForSync(newTsFileResource);
}
@@ -865,8 +875,9 @@ public class StorageEngine implements IService {
public Map<PartialPath, Map<Long, List<TsFileResource>>>
getAllClosedStorageGroupTsFile() {
Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
for (Entry<PartialPath, VirtualStorageGroupManager> entry :
processorMap.entrySet()) {
- for(StorageGroupProcessor storageGroupProcessor :
entry.getValue().getAllVirutalStorageGroupProcessor()){
- if(storageGroupProcessor != null){
+ for (StorageGroupProcessor storageGroupProcessor : entry.getValue()
+ .getAllVirutalStorageGroupProcessor()) {
+ if (storageGroupProcessor != null) {
List<TsFileResource> allResources =
storageGroupProcessor.getSequenceFileTreeSet();
allResources.addAll(storageGroupProcessor.getUnSequenceFileList());
for (TsFileResource sequenceFile : allResources) {
@@ -876,7 +887,8 @@ public class StorageEngine implements IService {
long partitionNum = sequenceFile.getTimePartition();
Map<Long, List<TsFileResource>> storageGroupFiles =
ret.computeIfAbsent(entry.getKey()
, n -> new HashMap<>());
- storageGroupFiles.computeIfAbsent(partitionNum, n -> new
ArrayList<>()).add(sequenceFile);
+ storageGroupFiles.computeIfAbsent(partitionNum, n -> new
ArrayList<>())
+ .add(sequenceFile);
}
}
}
@@ -891,12 +903,14 @@ public class StorageEngine implements IService {
public boolean isFileAlreadyExist(TsFileResource tsFileResource, PartialPath
storageGroup,
long partitionNum) {
VirtualStorageGroupManager virtualStorageGroupManager =
processorMap.get(storageGroup);
- if(virtualStorageGroupManager == null){
+ if (virtualStorageGroupManager == null) {
return false;
}
- for(StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()){
- if(storageGroupProcessor != null &&
storageGroupProcessor.isFileAlreadyExist(tsFileResource, partitionNum)){
+ for (StorageGroupProcessor storageGroupProcessor :
virtualStorageGroupManager
+ .getAllVirutalStorageGroupProcessor()) {
+ if (storageGroupProcessor != null && storageGroupProcessor
+ .isFileAlreadyExist(tsFileResource, partitionNum)) {
return true;
}
}
@@ -913,8 +927,9 @@ public class StorageEngine implements IService {
public void setPartitionVersionToMax(PartialPath storageGroup, long
partitionId,
long newMaxVersion)
throws StorageEngineException {
- for(StorageGroupProcessor storageGroupProcessor :
processorMap.get(storageGroup).getAllVirutalStorageGroupProcessor()){
- if(storageGroupProcessor != null){
+ for (StorageGroupProcessor storageGroupProcessor :
processorMap.get(storageGroup)
+ .getAllVirutalStorageGroupProcessor()) {
+ if (storageGroupProcessor != null) {
storageGroupProcessor.setPartitionFileVersionToMax(partitionId,
newMaxVersion);
}
}
@@ -922,8 +937,9 @@ public class StorageEngine implements IService {
public void removePartitions(PartialPath storageGroupPath,
TimePartitionFilter filter)
throws StorageEngineException {
- for(StorageGroupProcessor storageGroupProcessor :
processorMap.get(storageGroupPath).getAllVirutalStorageGroupProcessor()){
- if(storageGroupProcessor != null){
+ for (StorageGroupProcessor storageGroupProcessor :
processorMap.get(storageGroupPath)
+ .getAllVirutalStorageGroupProcessor()) {
+ if (storageGroupProcessor != null) {
storageGroupProcessor.removePartitions(filter);
}
}
@@ -942,15 +958,18 @@ public class StorageEngine implements IService {
public Map<String, List<Pair<Long, Boolean>>>
getWorkingStorageGroupPartitions() {
Map<String, List<Pair<Long, Boolean>>> res = new ConcurrentHashMap<>();
for (Entry<PartialPath, VirtualStorageGroupManager> entry :
processorMap.entrySet()) {
- for(StorageGroupProcessor storageGroupProcessor :
entry.getValue().getAllVirutalStorageGroupProcessor()) {
+ for (StorageGroupProcessor storageGroupProcessor : entry.getValue()
+ .getAllVirutalStorageGroupProcessor()) {
if (storageGroupProcessor != null) {
List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
- for (TsFileProcessor tsFileProcessor :
storageGroupProcessor.getWorkSequenceTsFileProcessors()) {
+ for (TsFileProcessor tsFileProcessor : storageGroupProcessor
+ .getWorkSequenceTsFileProcessors()) {
Pair<Long, Boolean> tmpPair = new
Pair<>(tsFileProcessor.getTimeRangeId(), true);
partitionIdList.add(tmpPair);
}
- for (TsFileProcessor tsFileProcessor :
storageGroupProcessor.getWorkUnsequenceTsFileProcessors()) {
+ for (TsFileProcessor tsFileProcessor : storageGroupProcessor
+ .getWorkUnsequenceTsFileProcessors()) {
Pair<Long, Boolean> tmpPair = new
Pair<>(tsFileProcessor.getTimeRangeId(), false);
partitionIdList.add(tmpPair);
}