This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch virtual_partition_2_merge
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/virtual_partition_2_merge by 
this push:
     new d79f309  refactor
d79f309 is described below

commit d79f309ffa1f837e9d853cd7c0a955bd6cda4c6e
Author: 151250176 <[email protected]>
AuthorDate: Tue Dec 22 15:47:50 2020 +0800

    refactor
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 105 ++++++++++++---------
 1 file changed, 62 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 82cc286..864f75e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -398,7 +398,8 @@ public class StorageEngine implements IService {
   private StorageGroupProcessor getStorageGroupProcessorByPath(PartialPath 
storageGroupPath,
       StorageGroupMNode storageGroupMNode)
       throws StorageGroupProcessorException, StorageEngineException {
-    VirtualStorageGroupManager virtualStorageGroupManager = 
processorMap.get(storageGroupMNode.getPartialPath());
+    VirtualStorageGroupManager virtualStorageGroupManager = processorMap
+        .get(storageGroupMNode.getPartialPath());
     if (virtualStorageGroupManager == null) {
       // if finish recover
       if (isAllSgReady.get()) {
@@ -413,7 +414,8 @@ public class StorageEngine implements IService {
       } else {
         // not finished recover, refuse the request
         throw new StorageEngineException(
-            "the sg " + storageGroupMNode.getPartialPath() + " may not ready 
now, please wait and retry later",
+            "the sg " + storageGroupMNode.getPartialPath()
+                + " may not ready now, please wait and retry later",
             TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
       }
     }
@@ -423,17 +425,17 @@ public class StorageEngine implements IService {
   public StorageGroupProcessor buildNewStorageGroupProcessor(PartialPath 
storageGroupPath,
       StorageGroupMNode storageGroupMNode, String storageGroupName)
       throws StorageGroupProcessorException {
-          StorageGroupProcessor processor;
-          logger.info("construct a processor instance, the storage group is 
{}, Thread is {}",
-              storageGroupPath, Thread.currentThread().getId());
-          processor = new StorageGroupProcessor(systemDir + File.separator + 
storageGroupPath,
-              storageGroupName,
-              fileFlushPolicy, storageGroupMNode.getFullPath());
-          processor.setDataTTL(storageGroupMNode.getDataTTL());
-          processor.setCustomFlushListeners(customFlushListeners);
-          processor.setCustomCloseFileListeners(customCloseFileListeners);
-          return processor;
-        }
+    StorageGroupProcessor processor;
+    logger.info("construct a processor instance, the storage group is {}, 
Thread is {}",
+        storageGroupPath, Thread.currentThread().getId());
+    processor = new StorageGroupProcessor(systemDir + File.separator + 
storageGroupPath,
+        storageGroupName,
+        fileFlushPolicy, storageGroupMNode.getFullPath());
+    processor.setDataTTL(storageGroupMNode.getDataTTL());
+    processor.setCustomFlushListeners(customFlushListeners);
+    processor.setCustomCloseFileListeners(customCloseFileListeners);
+    return processor;
+  }
 
   private void waitAllSgReady(PartialPath storageGroupPath) throws 
StorageEngineException {
     if (isAllSgReady.get()) {
@@ -537,12 +539,13 @@ public class StorageEngine implements IService {
 
   public void closeStorageGroupProcessor(PartialPath storageGroupPath, boolean 
isSeq,
       boolean isSync) {
-    if(!processorMap.containsKey(storageGroupPath)){
+    if (!processorMap.containsKey(storageGroupPath)) {
       return;
     }
 
     VirtualStorageGroupManager virtualStorageGroupManager = 
processorMap.get(storageGroupPath);
-    for (StorageGroupProcessor processor : 
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+    for (StorageGroupProcessor processor : virtualStorageGroupManager
+        .getAllVirutalStorageGroupProcessor()) {
       if (processor == null) {
         continue;
       }
@@ -593,12 +596,13 @@ public class StorageEngine implements IService {
       boolean isSeq,
       boolean isSync)
       throws StorageGroupNotSetException {
-    if(!processorMap.containsKey(storageGroupPath)){
+    if (!processorMap.containsKey(storageGroupPath)) {
       throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
     }
 
     VirtualStorageGroupManager virtualStorageGroupManager = 
processorMap.get(storageGroupPath);
-    for (StorageGroupProcessor processor : 
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+    for (StorageGroupProcessor processor : virtualStorageGroupManager
+        .getAllVirutalStorageGroupProcessor()) {
       if (processor != null) {
         logger
             .info("async closing sg processor is called for closing {}, seq = 
{}, partitionId = {}",
@@ -640,14 +644,14 @@ public class StorageEngine implements IService {
       List<PartialPath> sgPaths = 
IoTDB.metaManager.searchAllRelatedStorageGroups(path);
       for (PartialPath storageGroupPath : sgPaths) {
         // storage group has no data
-        if(!processorMap.containsKey(storageGroupPath)){
+        if (!processorMap.containsKey(storageGroupPath)) {
           continue;
         }
 
         PartialPath newPath = path.alterPrefixPath(storageGroupPath);
         for (StorageGroupProcessor storageGroupProcessor : 
processorMap.get(storageGroupPath)
             .getAllVirutalStorageGroupProcessor()) {
-          if(storageGroupProcessor != null){
+          if (storageGroupProcessor != null) {
             storageGroupProcessor.delete(newPath, startTime, endTime, 
planIndex);
           }
         }
@@ -666,14 +670,14 @@ public class StorageEngine implements IService {
       List<PartialPath> sgPaths = 
IoTDB.metaManager.searchAllRelatedStorageGroups(path);
       for (PartialPath storageGroupPath : sgPaths) {
         // storage group has no data
-        if(!processorMap.containsKey(storageGroupPath)){
+        if (!processorMap.containsKey(storageGroupPath)) {
           continue;
         }
 
         PartialPath newPath = path.alterPrefixPath(storageGroupPath);
         for (StorageGroupProcessor storageGroupProcessor : 
processorMap.get(storageGroupPath)
             .getAllVirutalStorageGroupProcessor()) {
-          if(storageGroupProcessor != null){
+          if (storageGroupProcessor != null) {
             storageGroupProcessor.delete(newPath, Long.MIN_VALUE, 
Long.MAX_VALUE, planIndex);
           }
         }
@@ -705,7 +709,8 @@ public class StorageEngine implements IService {
   public int countUpgradeFiles() {
     int totalUpgradeFileNum = 0;
     for (VirtualStorageGroupManager virtualStorageGroupManager : 
processorMap.values()) {
-      for (StorageGroupProcessor storageGroupProcessor : 
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+      for (StorageGroupProcessor storageGroupProcessor : 
virtualStorageGroupManager
+          .getAllVirutalStorageGroupProcessor()) {
         if (storageGroupProcessor != null) {
           totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
         }
@@ -725,7 +730,8 @@ public class StorageEngine implements IService {
           "Current system mode is read only, does not support file upgrade");
     }
     for (VirtualStorageGroupManager virtualStorageGroupManager : 
processorMap.values()) {
-      for (StorageGroupProcessor storageGroupProcessor : 
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+      for (StorageGroupProcessor storageGroupProcessor : 
virtualStorageGroupManager
+          .getAllVirutalStorageGroupProcessor()) {
         if (storageGroupProcessor != null) {
           storageGroupProcessor.upgrade();
         }
@@ -744,7 +750,8 @@ public class StorageEngine implements IService {
     }
 
     for (VirtualStorageGroupManager virtualStorageGroupManager : 
processorMap.values()) {
-      for (StorageGroupProcessor storageGroupProcessor : 
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+      for (StorageGroupProcessor storageGroupProcessor : 
virtualStorageGroupManager
+          .getAllVirutalStorageGroupProcessor()) {
         if (storageGroupProcessor != null) {
           storageGroupProcessor.merge(fullMerge);
         }
@@ -787,25 +794,27 @@ public class StorageEngine implements IService {
 
   public void setTTL(PartialPath storageGroup, long dataTTL) throws 
StorageEngineException {
     // storage group has no data
-    if(!processorMap.containsKey(storageGroup)){
+    if (!processorMap.containsKey(storageGroup)) {
       return;
     }
 
-    for(StorageGroupProcessor storageGroupProcessor : 
processorMap.get(storageGroup).getAllVirutalStorageGroupProcessor()){
-      if(storageGroupProcessor != null){
+    for (StorageGroupProcessor storageGroupProcessor : 
processorMap.get(storageGroup)
+        .getAllVirutalStorageGroupProcessor()) {
+      if (storageGroupProcessor != null) {
         storageGroupProcessor.setDataTTL(dataTTL);
       }
     }
   }
 
   public void deleteStorageGroup(PartialPath storageGroupPath) {
-    if(!processorMap.containsKey(storageGroupPath)){
+    if (!processorMap.containsKey(storageGroupPath)) {
       return;
     }
 
     deleteAllDataFilesInOneStorageGroup(storageGroupPath);
     VirtualStorageGroupManager virtualStorageGroupManager = 
processorMap.remove(storageGroupPath);
-    for (StorageGroupProcessor processor : 
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()) {
+    for (StorageGroupProcessor processor : virtualStorageGroupManager
+        .getAllVirutalStorageGroupProcessor()) {
       if (processor != null) {
         processor.deleteFolder(systemDir + File.pathSeparator + 
storageGroupPath);
       }
@@ -814,7 +823,8 @@ public class StorageEngine implements IService {
 
   public void loadNewTsFileForSync(TsFileResource newTsFileResource)
       throws StorageEngineException, LoadFileException, IllegalPathException {
-    getProcessorDirectly(new 
PartialPath(newTsFileResource.getTsFile().getParentFile().getParentFile().getParentFile().getName()))
+    getProcessorDirectly(new PartialPath(
+        
newTsFileResource.getTsFile().getParentFile().getParentFile().getParentFile().getName()))
         .loadNewTsFileForSync(newTsFileResource);
   }
 
@@ -865,8 +875,9 @@ public class StorageEngine implements IService {
   public Map<PartialPath, Map<Long, List<TsFileResource>>> 
getAllClosedStorageGroupTsFile() {
     Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
     for (Entry<PartialPath, VirtualStorageGroupManager> entry : 
processorMap.entrySet()) {
-      for(StorageGroupProcessor storageGroupProcessor : 
entry.getValue().getAllVirutalStorageGroupProcessor()){
-        if(storageGroupProcessor != null){
+      for (StorageGroupProcessor storageGroupProcessor : entry.getValue()
+          .getAllVirutalStorageGroupProcessor()) {
+        if (storageGroupProcessor != null) {
           List<TsFileResource> allResources = 
storageGroupProcessor.getSequenceFileTreeSet();
           allResources.addAll(storageGroupProcessor.getUnSequenceFileList());
           for (TsFileResource sequenceFile : allResources) {
@@ -876,7 +887,8 @@ public class StorageEngine implements IService {
             long partitionNum = sequenceFile.getTimePartition();
             Map<Long, List<TsFileResource>> storageGroupFiles = 
ret.computeIfAbsent(entry.getKey()
                 , n -> new HashMap<>());
-            storageGroupFiles.computeIfAbsent(partitionNum, n -> new 
ArrayList<>()).add(sequenceFile);
+            storageGroupFiles.computeIfAbsent(partitionNum, n -> new 
ArrayList<>())
+                .add(sequenceFile);
           }
         }
       }
@@ -891,12 +903,14 @@ public class StorageEngine implements IService {
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, PartialPath 
storageGroup,
       long partitionNum) {
     VirtualStorageGroupManager virtualStorageGroupManager = 
processorMap.get(storageGroup);
-    if(virtualStorageGroupManager == null){
+    if (virtualStorageGroupManager == null) {
       return false;
     }
 
-    for(StorageGroupProcessor storageGroupProcessor : 
virtualStorageGroupManager.getAllVirutalStorageGroupProcessor()){
-      if(storageGroupProcessor != null && 
storageGroupProcessor.isFileAlreadyExist(tsFileResource, partitionNum)){
+    for (StorageGroupProcessor storageGroupProcessor : 
virtualStorageGroupManager
+        .getAllVirutalStorageGroupProcessor()) {
+      if (storageGroupProcessor != null && storageGroupProcessor
+          .isFileAlreadyExist(tsFileResource, partitionNum)) {
         return true;
       }
     }
@@ -913,8 +927,9 @@ public class StorageEngine implements IService {
   public void setPartitionVersionToMax(PartialPath storageGroup, long 
partitionId,
       long newMaxVersion)
       throws StorageEngineException {
-    for(StorageGroupProcessor storageGroupProcessor : 
processorMap.get(storageGroup).getAllVirutalStorageGroupProcessor()){
-      if(storageGroupProcessor != null){
+    for (StorageGroupProcessor storageGroupProcessor : 
processorMap.get(storageGroup)
+        .getAllVirutalStorageGroupProcessor()) {
+      if (storageGroupProcessor != null) {
         storageGroupProcessor.setPartitionFileVersionToMax(partitionId, 
newMaxVersion);
       }
     }
@@ -922,8 +937,9 @@ public class StorageEngine implements IService {
 
   public void removePartitions(PartialPath storageGroupPath, 
TimePartitionFilter filter)
       throws StorageEngineException {
-    for(StorageGroupProcessor storageGroupProcessor : 
processorMap.get(storageGroupPath).getAllVirutalStorageGroupProcessor()){
-      if(storageGroupProcessor != null){
+    for (StorageGroupProcessor storageGroupProcessor : 
processorMap.get(storageGroupPath)
+        .getAllVirutalStorageGroupProcessor()) {
+      if (storageGroupProcessor != null) {
         storageGroupProcessor.removePartitions(filter);
       }
     }
@@ -942,15 +958,18 @@ public class StorageEngine implements IService {
   public Map<String, List<Pair<Long, Boolean>>> 
getWorkingStorageGroupPartitions() {
     Map<String, List<Pair<Long, Boolean>>> res = new ConcurrentHashMap<>();
     for (Entry<PartialPath, VirtualStorageGroupManager> entry : 
processorMap.entrySet()) {
-      for(StorageGroupProcessor storageGroupProcessor : 
entry.getValue().getAllVirutalStorageGroupProcessor()) {
+      for (StorageGroupProcessor storageGroupProcessor : entry.getValue()
+          .getAllVirutalStorageGroupProcessor()) {
         if (storageGroupProcessor != null) {
           List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
-          for (TsFileProcessor tsFileProcessor : 
storageGroupProcessor.getWorkSequenceTsFileProcessors()) {
+          for (TsFileProcessor tsFileProcessor : storageGroupProcessor
+              .getWorkSequenceTsFileProcessors()) {
             Pair<Long, Boolean> tmpPair = new 
Pair<>(tsFileProcessor.getTimeRangeId(), true);
             partitionIdList.add(tmpPair);
           }
 
-          for (TsFileProcessor tsFileProcessor : 
storageGroupProcessor.getWorkUnsequenceTsFileProcessors()) {
+          for (TsFileProcessor tsFileProcessor : storageGroupProcessor
+              .getWorkUnsequenceTsFileProcessors()) {
             Pair<Long, Boolean> tmpPair = new 
Pair<>(tsFileProcessor.getTimeRangeId(), false);
             partitionIdList.add(tmpPair);
           }

Reply via email to