This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e2bb2ddc57 [IOTDB-3490] Eliminate LocalConfigNode Usage in New Cluster 
(#6281)
e2bb2ddc57 is described below

commit e2bb2ddc57320a280608725c71d87b5b9d928be1
Author: Marcos_Zyk <[email protected]>
AuthorDate: Wed Jun 15 10:09:38 2022 +0800

    [IOTDB-3490] Eliminate LocalConfigNode Usage in New Cluster (#6281)
---
 .../iotdb/db/localconfignode/LocalConfigNode.java  |  10 +-
 .../db/metadata/schemaregion/SchemaEngine.java     | 139 ++++++++++++++++++---
 .../java/org/apache/iotdb/db/service/DataNode.java |   7 +-
 3 files changed, 132 insertions(+), 24 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index 456fc8252b..9dad431bd1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -59,7 +59,6 @@ import 
org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateExceptio
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
-import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.storagegroup.IStorageGroupSchemaManager;
@@ -170,17 +169,18 @@ public class LocalConfigNode {
     }
 
     try {
-      SchemaResourceManager.initSchemaResource();
 
       templateManager.init();
       storageGroupSchemaManager.init();
 
-      Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo = 
schemaEngine.init();
+      Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo =
+          schemaEngine.initForLocalConfigNode();
       schemaPartitionTable.init(recoveredLocalSchemaRegionInfo);
 
       if (config.getSyncMlogPeriodInMs() != 0) {
         timedForceMLogThread =
-            
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedForceMLogThread");
+            IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+                "LocalConfigNode-TimedForceMLog-Thread");
         ScheduledExecutorUtil.unsafelyScheduleAtFixedRate(
             timedForceMLogThread,
             this::forceMlog,
@@ -209,7 +209,6 @@ public class LocalConfigNode {
     }
 
     try {
-      SchemaResourceManager.clearSchemaResource();
 
       if (timedForceMLogThread != null) {
         timedForceMLogThread.shutdown();
@@ -237,7 +236,6 @@ public class LocalConfigNode {
 
     storageGroupSchemaManager.forceLog();
     templateManager.forceLog();
-    schemaEngine.forceMlog();
   }
 
   // endregion
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index d4632f1a7d..3b850584b8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -20,14 +20,19 @@
 package org.apache.iotdb.db.metadata.schemaregion;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
-import org.apache.iotdb.db.metadata.storagegroup.IStorageGroupSchemaManager;
-import org.apache.iotdb.db.metadata.storagegroup.StorageGroupSchemaManager;
+import org.apache.iotdb.db.metadata.mtree.MTreeAboveSG;
+import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
 import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 
@@ -45,19 +50,22 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 // manage all the schemaRegion in this dataNode
 public class SchemaEngine {
 
-  private static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = 
LoggerFactory.getLogger(SchemaEngine.class);
+
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
-  private final IStorageGroupSchemaManager localStorageGroupSchemaManager =
-      StorageGroupSchemaManager.getInstance();
+  private MTreeAboveSG sharedPrefixTree;
 
   private Map<SchemaRegionId, ISchemaRegion> schemaRegionMap;
   private SchemaEngineMode schemaRegionStoredMode;
 
-  private static final Logger logger = 
LoggerFactory.getLogger(SchemaEngine.class);
+  private ScheduledExecutorService timedForceMLogThread;
 
   public void write(SchemaRegionId schemaRegionId, PlanNode planNode) {
     planNode.accept(new SchemaExecutionVisitor(), 
schemaRegionMap.get(schemaRegionId));
@@ -76,12 +84,40 @@ public class SchemaEngine {
     return SchemaEngineManagerHolder.INSTANCE;
   }
 
-  public Map<PartialPath, List<SchemaRegionId>> init() throws 
MetadataException {
-    schemaRegionMap = new ConcurrentHashMap<>();
+  public void init() {
+    try {
+      initForLocalConfigNode();
+    } catch (MetadataException e) {
+      e.printStackTrace();
+      logger.error("Error occurred during SchemaEngine initialization.", e);
+    }
+  }
+
+  public Map<PartialPath, List<SchemaRegionId>> initForLocalConfigNode() 
throws MetadataException {
+
     schemaRegionStoredMode = 
SchemaEngineMode.valueOf(config.getSchemaEngineMode());
     logger.info("used schema engine mode: {}.", schemaRegionStoredMode);
 
-    return initSchemaRegion();
+    SchemaResourceManager.initSchemaResource();
+
+    schemaRegionMap = new ConcurrentHashMap<>();
+    sharedPrefixTree = new MTreeAboveSG();
+
+    Map<PartialPath, List<SchemaRegionId>> schemaRegionInfo = 
initSchemaRegion();
+
+    if (config.getSyncMlogPeriodInMs() != 0) {
+      timedForceMLogThread =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              "SchemaEngine-TimedForceMLog-Thread");
+      ScheduledExecutorUtil.unsafelyScheduleAtFixedRate(
+          timedForceMLogThread,
+          this::forceMlog,
+          config.getSyncMlogPeriodInMs(),
+          config.getSyncMlogPeriodInMs(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    return schemaRegionInfo;
   }
 
   /**
@@ -91,13 +127,32 @@ public class SchemaEngine {
   private Map<PartialPath, List<SchemaRegionId>> initSchemaRegion() throws 
MetadataException {
     Map<PartialPath, List<SchemaRegionId>> partitionTable = new HashMap<>();
 
+    File schemaDir = new File(config.getSchemaDir());
+    File[] sgDirList = schemaDir.listFiles();
+
+    if (sgDirList == null) {
+      return partitionTable;
+    }
+
     // recover SchemaRegion concurrently
     ExecutorService schemaRegionRecoverPools =
         IoTDBThreadPoolFactory.newFixedThreadPool(
             Runtime.getRuntime().availableProcessors(), 
"SchemaRegion-recover-task");
     List<Future<ISchemaRegion>> futures = new ArrayList<>();
 
-    for (PartialPath storageGroup : 
localStorageGroupSchemaManager.getAllStorageGroupPaths()) {
+    for (File file : sgDirList) {
+      if (!file.isDirectory()) {
+        continue;
+      }
+
+      PartialPath storageGroup;
+      try {
+        storageGroup = new PartialPath(file.getName());
+      } catch (IllegalPathException illegalPathException) {
+        // not a legal sg dir
+        continue;
+      }
+
       List<SchemaRegionId> schemaRegionIdList = new ArrayList<>();
       partitionTable.put(storageGroup, schemaRegionIdList);
 
@@ -149,6 +204,13 @@ public class SchemaEngine {
   }
 
   public void clear() {
+    SchemaResourceManager.clearSchemaResource();
+
+    if (timedForceMLogThread != null) {
+      timedForceMLogThread.shutdown();
+      timedForceMLogThread = null;
+    }
+
     if (schemaRegionMap != null) {
       for (ISchemaRegion schemaRegion : schemaRegionMap.values()) {
         schemaRegion.clear();
@@ -156,6 +218,11 @@ public class SchemaEngine {
       schemaRegionMap.clear();
       schemaRegionMap = null;
     }
+
+    if (sharedPrefixTree != null) {
+      sharedPrefixTree.clear();
+      sharedPrefixTree = null;
+    }
   }
 
   public ISchemaRegion getSchemaRegion(SchemaRegionId regionId) {
@@ -213,9 +280,8 @@ public class SchemaEngine {
 
   private ISchemaRegion createSchemaRegionWithoutExistenceCheck(
       PartialPath storageGroup, SchemaRegionId schemaRegionId) throws 
MetadataException {
-    ISchemaRegion schemaRegion = null;
-    IStorageGroupMNode storageGroupMNode =
-        
this.localStorageGroupSchemaManager.ensureStorageGroupByStorageGroupPath(storageGroup);
+    ISchemaRegion schemaRegion;
+    IStorageGroupMNode storageGroupMNode = 
ensureStorageGroupByStorageGroupPath(storageGroup);
     switch (this.schemaRegionStoredMode) {
       case Memory:
         schemaRegion = new SchemaRegionMemoryImpl(storageGroup, 
schemaRegionId, storageGroupMNode);
@@ -238,8 +304,51 @@ public class SchemaEngine {
     return schemaRegion;
   }
 
-  public void deleteSchemaRegion(SchemaRegionId schemaRegionId) throws 
MetadataException {
-    schemaRegionMap.get(schemaRegionId).deleteSchemaRegion();
+  private IStorageGroupMNode ensureStorageGroupByStorageGroupPath(PartialPath 
storageGroup)
+      throws MetadataException {
+    try {
+      return 
sharedPrefixTree.getStorageGroupNodeByStorageGroupPath(storageGroup);
+    } catch (StorageGroupNotSetException e) {
+      try {
+        sharedPrefixTree.setStorageGroup(storageGroup);
+      } catch (StorageGroupAlreadySetException 
storageGroupAlreadySetException) {
+        // do nothing
+        // concurrent timeseries creation may result concurrent 
ensureStorageGroup
+        // it's ok that the storageGroup has already been set
+
+        if (storageGroupAlreadySetException.isHasChild()) {
+          // if setStorageGroup failure is because of child, the deviceNode 
should not be created.
+          // Timeseries can't be created under a deviceNode without 
storageGroup.
+          throw storageGroupAlreadySetException;
+        }
+      }
+
+      return 
sharedPrefixTree.getStorageGroupNodeByStorageGroupPath(storageGroup);
+    }
+  }
+
+  public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId)
+      throws MetadataException {
+    ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
+    schemaRegion.deleteSchemaRegion();
     schemaRegionMap.remove(schemaRegionId);
+
+    // check whether the sg dir is empty
+    File sgDir = new File(config.getSchemaDir(), 
schemaRegion.getStorageGroupFullPath());
+    File[] regionDirList =
+        sgDir.listFiles(
+            (dir, name) -> {
+              try {
+                Integer.parseInt(name);
+                return true;
+              } catch (NumberFormatException e) {
+                return false;
+              }
+            });
+    // remove the empty sg dir
+    if (regionDirList == null || regionDirList.length == 0) {
+      FileUtils.deleteDirectory(sgDir);
+      sharedPrefixTree.deleteStorageGroup(new 
PartialPath(schemaRegion.getStorageGroupFullPath()));
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 1e8d500d15..95c9d4015c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
 import org.apache.iotdb.db.protocol.rest.RestService;
@@ -276,7 +277,7 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(MetricsService.getInstance());
 
     logger.info("recover the schema...");
-    initConfigManager();
+    initSchemaEngine();
     registerManager.register(new JMXService());
     registerManager.register(FlushManager.getInstance());
     registerManager.register(CacheHitRatioMonitor.getInstance());
@@ -399,9 +400,9 @@ public class DataNode implements DataNodeMBean {
                 + File.separator));
   }
 
-  private void initConfigManager() {
+  private void initSchemaEngine() {
     long time = System.currentTimeMillis();
-    IoTDB.configManager.init();
+    SchemaEngine.getInstance().init();
     long end = System.currentTimeMillis() - time;
     logger.info("spend {}ms to recover schema.", end);
     logger.info(

Reply via email to