This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e2bb2ddc57 [IOTDB-3490] Eliminate LocalConfigNode Usage in New Cluster
(#6281)
e2bb2ddc57 is described below
commit e2bb2ddc57320a280608725c71d87b5b9d928be1
Author: Marcos_Zyk <[email protected]>
AuthorDate: Wed Jun 15 10:09:38 2022 +0800
[IOTDB-3490] Eliminate LocalConfigNode Usage in New Cluster (#6281)
---
.../iotdb/db/localconfignode/LocalConfigNode.java | 10 +-
.../db/metadata/schemaregion/SchemaEngine.java | 139 ++++++++++++++++++---
.../java/org/apache/iotdb/db/service/DataNode.java | 7 +-
3 files changed, 132 insertions(+), 24 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index 456fc8252b..9dad431bd1 100644
---
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -59,7 +59,6 @@ import
org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateExceptio
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
-import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.storagegroup.IStorageGroupSchemaManager;
@@ -170,17 +169,18 @@ public class LocalConfigNode {
}
try {
- SchemaResourceManager.initSchemaResource();
templateManager.init();
storageGroupSchemaManager.init();
- Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo =
schemaEngine.init();
+ Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo =
+ schemaEngine.initForLocalConfigNode();
schemaPartitionTable.init(recoveredLocalSchemaRegionInfo);
if (config.getSyncMlogPeriodInMs() != 0) {
timedForceMLogThread =
-
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedForceMLogThread");
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ "LocalConfigNode-TimedForceMLog-Thread");
ScheduledExecutorUtil.unsafelyScheduleAtFixedRate(
timedForceMLogThread,
this::forceMlog,
@@ -209,7 +209,6 @@ public class LocalConfigNode {
}
try {
- SchemaResourceManager.clearSchemaResource();
if (timedForceMLogThread != null) {
timedForceMLogThread.shutdown();
@@ -237,7 +236,6 @@ public class LocalConfigNode {
storageGroupSchemaManager.forceLog();
templateManager.forceLog();
- schemaEngine.forceMlog();
}
// endregion
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index d4632f1a7d..3b850584b8 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -20,14 +20,19 @@
package org.apache.iotdb.db.metadata.schemaregion;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
-import org.apache.iotdb.db.metadata.storagegroup.IStorageGroupSchemaManager;
-import org.apache.iotdb.db.metadata.storagegroup.StorageGroupSchemaManager;
+import org.apache.iotdb.db.metadata.mtree.MTreeAboveSG;
+import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -45,19 +50,22 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
// manage all the schemaRegion in this dataNode
public class SchemaEngine {
- private static IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static final Logger logger =
LoggerFactory.getLogger(SchemaEngine.class);
+
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
- private final IStorageGroupSchemaManager localStorageGroupSchemaManager =
- StorageGroupSchemaManager.getInstance();
+ private MTreeAboveSG sharedPrefixTree;
private Map<SchemaRegionId, ISchemaRegion> schemaRegionMap;
private SchemaEngineMode schemaRegionStoredMode;
- private static final Logger logger =
LoggerFactory.getLogger(SchemaEngine.class);
+ private ScheduledExecutorService timedForceMLogThread;
public void write(SchemaRegionId schemaRegionId, PlanNode planNode) {
planNode.accept(new SchemaExecutionVisitor(),
schemaRegionMap.get(schemaRegionId));
@@ -76,12 +84,40 @@ public class SchemaEngine {
return SchemaEngineManagerHolder.INSTANCE;
}
- public Map<PartialPath, List<SchemaRegionId>> init() throws
MetadataException {
- schemaRegionMap = new ConcurrentHashMap<>();
+ public void init() {
+ try {
+ initForLocalConfigNode();
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ logger.error("Error occurred during SchemaEngine initialization.", e);
+ }
+ }
+
+ public Map<PartialPath, List<SchemaRegionId>> initForLocalConfigNode()
throws MetadataException {
+
schemaRegionStoredMode =
SchemaEngineMode.valueOf(config.getSchemaEngineMode());
logger.info("used schema engine mode: {}.", schemaRegionStoredMode);
- return initSchemaRegion();
+ SchemaResourceManager.initSchemaResource();
+
+ schemaRegionMap = new ConcurrentHashMap<>();
+ sharedPrefixTree = new MTreeAboveSG();
+
+ Map<PartialPath, List<SchemaRegionId>> schemaRegionInfo =
initSchemaRegion();
+
+ if (config.getSyncMlogPeriodInMs() != 0) {
+ timedForceMLogThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ "SchemaEngine-TimedForceMLog-Thread");
+ ScheduledExecutorUtil.unsafelyScheduleAtFixedRate(
+ timedForceMLogThread,
+ this::forceMlog,
+ config.getSyncMlogPeriodInMs(),
+ config.getSyncMlogPeriodInMs(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ return schemaRegionInfo;
}
/**
@@ -91,13 +127,32 @@ public class SchemaEngine {
private Map<PartialPath, List<SchemaRegionId>> initSchemaRegion() throws
MetadataException {
Map<PartialPath, List<SchemaRegionId>> partitionTable = new HashMap<>();
+ File schemaDir = new File(config.getSchemaDir());
+ File[] sgDirList = schemaDir.listFiles();
+
+ if (sgDirList == null) {
+ return partitionTable;
+ }
+
// recover SchemaRegion concurrently
ExecutorService schemaRegionRecoverPools =
IoTDBThreadPoolFactory.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
"SchemaRegion-recover-task");
List<Future<ISchemaRegion>> futures = new ArrayList<>();
- for (PartialPath storageGroup :
localStorageGroupSchemaManager.getAllStorageGroupPaths()) {
+ for (File file : sgDirList) {
+ if (!file.isDirectory()) {
+ continue;
+ }
+
+ PartialPath storageGroup;
+ try {
+ storageGroup = new PartialPath(file.getName());
+ } catch (IllegalPathException illegalPathException) {
+ // not a legal sg dir
+ continue;
+ }
+
List<SchemaRegionId> schemaRegionIdList = new ArrayList<>();
partitionTable.put(storageGroup, schemaRegionIdList);
@@ -149,6 +204,13 @@ public class SchemaEngine {
}
public void clear() {
+ SchemaResourceManager.clearSchemaResource();
+
+ if (timedForceMLogThread != null) {
+ timedForceMLogThread.shutdown();
+ timedForceMLogThread = null;
+ }
+
if (schemaRegionMap != null) {
for (ISchemaRegion schemaRegion : schemaRegionMap.values()) {
schemaRegion.clear();
@@ -156,6 +218,11 @@ public class SchemaEngine {
schemaRegionMap.clear();
schemaRegionMap = null;
}
+
+ if (sharedPrefixTree != null) {
+ sharedPrefixTree.clear();
+ sharedPrefixTree = null;
+ }
}
public ISchemaRegion getSchemaRegion(SchemaRegionId regionId) {
@@ -213,9 +280,8 @@ public class SchemaEngine {
private ISchemaRegion createSchemaRegionWithoutExistenceCheck(
PartialPath storageGroup, SchemaRegionId schemaRegionId) throws
MetadataException {
- ISchemaRegion schemaRegion = null;
- IStorageGroupMNode storageGroupMNode =
-
this.localStorageGroupSchemaManager.ensureStorageGroupByStorageGroupPath(storageGroup);
+ ISchemaRegion schemaRegion;
+ IStorageGroupMNode storageGroupMNode =
ensureStorageGroupByStorageGroupPath(storageGroup);
switch (this.schemaRegionStoredMode) {
case Memory:
schemaRegion = new SchemaRegionMemoryImpl(storageGroup,
schemaRegionId, storageGroupMNode);
@@ -238,8 +304,51 @@ public class SchemaEngine {
return schemaRegion;
}
- public void deleteSchemaRegion(SchemaRegionId schemaRegionId) throws
MetadataException {
- schemaRegionMap.get(schemaRegionId).deleteSchemaRegion();
+ private IStorageGroupMNode ensureStorageGroupByStorageGroupPath(PartialPath
storageGroup)
+ throws MetadataException {
+ try {
+ return
sharedPrefixTree.getStorageGroupNodeByStorageGroupPath(storageGroup);
+ } catch (StorageGroupNotSetException e) {
+ try {
+ sharedPrefixTree.setStorageGroup(storageGroup);
+ } catch (StorageGroupAlreadySetException
storageGroupAlreadySetException) {
+ // do nothing
+ // concurrent timeseries creation may result concurrent
ensureStorageGroup
+ // it's ok that the storageGroup has already been set
+
+ if (storageGroupAlreadySetException.isHasChild()) {
+ // if setStorageGroup failure is because of child, the deviceNode
should not be created.
+ // Timeseries can't be created under a deviceNode without
storageGroup.
+ throw storageGroupAlreadySetException;
+ }
+ }
+
+ return
sharedPrefixTree.getStorageGroupNodeByStorageGroupPath(storageGroup);
+ }
+ }
+
+ public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId)
+ throws MetadataException {
+ ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
+ schemaRegion.deleteSchemaRegion();
schemaRegionMap.remove(schemaRegionId);
+
+ // check whether the sg dir is empty
+ File sgDir = new File(config.getSchemaDir(),
schemaRegion.getStorageGroupFullPath());
+ File[] regionDirList =
+ sgDir.listFiles(
+ (dir, name) -> {
+ try {
+ Integer.parseInt(name);
+ return true;
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ });
+ // remove the empty sg dir
+ if (regionDirList == null || regionDirList.length == 0) {
+ FileUtils.deleteDirectory(sgDir);
+ sharedPrefixTree.deleteStorageGroup(new
PartialPath(schemaRegion.getStorageGroupFullPath()));
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 1e8d500d15..95c9d4015c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService;
import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.protocol.rest.RestService;
@@ -276,7 +277,7 @@ public class DataNode implements DataNodeMBean {
registerManager.register(MetricsService.getInstance());
logger.info("recover the schema...");
- initConfigManager();
+ initSchemaEngine();
registerManager.register(new JMXService());
registerManager.register(FlushManager.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
@@ -399,9 +400,9 @@ public class DataNode implements DataNodeMBean {
+ File.separator));
}
- private void initConfigManager() {
+ private void initSchemaEngine() {
long time = System.currentTimeMillis();
- IoTDB.configManager.init();
+ SchemaEngine.getInstance().init();
long end = System.currentTimeMillis() - time;
logger.info("spend {}ms to recover schema.", end);
logger.info(