This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new acce8c4631 [IOTDB-3781] Reinforce the regionCleaner task's startup
logic (#6804)
acce8c4631 is described below
commit acce8c46312869f4f8b9312e1dfa35ca2b802380
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jul 28 09:10:32 2022 +0800
[IOTDB-3781] Reinforce the regionCleaner task's startup logic (#6804)
[IOTDB-3781] Reinforce the regionCleaner task's startup logic
---
.../statemachine/PartitionRegionStateMachine.java | 6 +-
.../iotdb/confignode/manager/PartitionManager.java | 79 +++++++++++++++-------
.../iotdb/confignode/manager/ProcedureManager.java | 2 +
.../iotdb/confignode/manager/load/LoadManager.java | 4 ++
4 files changed, 64 insertions(+), 27 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 1afcde2f4c..4259e5df71 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -139,12 +139,16 @@ public class PartitionRegionStateMachine implements
IStateMachine, IStateMachine
@Override
public void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint
newLeader) {
if (currentNode.equals(newLeader)) {
- LOGGER.info("Current node {} is Leader, start procedure manager.",
newLeader);
+ LOGGER.info("Current node {} becomes Leader", newLeader);
configManager.getProcedureManager().shiftExecutor(true);
configManager.getLoadManager().start();
+ configManager.getPartitionManager().startRegionCleaner();
} else {
+ LOGGER.info(
+ "Current node {} is not longer the leader, the new leader is {}",
currentNode, newLeader);
configManager.getProcedureManager().shiftExecutor(false);
configManager.getLoadManager().stop();
+ configManager.getPartitionManager().stopRegionCleaner();
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index bd95b0e155..0ba379af5f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -62,6 +62,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -73,23 +74,22 @@ public class PartitionManager {
private final IManager configManager;
private final PartitionInfo partitionInfo;
- private static final int REGION_CLEANER_WORK_INTERVAL = 300;
- private static final int REGION_CLEANER_WORK_INITIAL_DELAY = 10;
private SeriesPartitionExecutor executor;
+
+ /** Region cleaner */
+ // Monitor for leadership change
+ private final Object scheduleMonitor = new Object();
+ // Try to delete Regions in every 10s
+ private static final int REGION_CLEANER_WORK_INTERVAL = 10;
private final ScheduledExecutorService regionCleaner;
+ private Future<?> currentRegionCleanerFuture;
public PartitionManager(IManager configManager, PartitionInfo partitionInfo)
{
this.configManager = configManager;
this.partitionInfo = partitionInfo;
this.regionCleaner =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Cleaner");
- ScheduledExecutorUtil.safelyScheduleAtFixedRate(
- regionCleaner,
- this::clearDeletedRegions,
- REGION_CLEANER_WORK_INITIAL_DELAY,
- REGION_CLEANER_WORK_INTERVAL,
- TimeUnit.SECONDS);
setSeriesPartitionExecutor();
}
@@ -397,24 +397,6 @@ public class PartitionManager {
getConsensusManager().write(preDeleteStorageGroupPlan);
}
- /**
- * Called by {@link PartitionManager#regionCleaner} Delete regions of
logical deleted storage
- * groups periodically.
- */
- public void clearDeletedRegions() {
- if (getConsensusManager().isLeader()) {
- final Set<TRegionReplicaSet> deletedRegionSet =
partitionInfo.getDeletedRegionSet();
- if (!deletedRegionSet.isEmpty()) {
- LOGGER.info(
- "DELETE REGIONS {} START",
- deletedRegionSet.stream()
- .map(TRegionReplicaSet::getRegionId)
- .collect(Collectors.toList()));
- SyncDataNodeClientPool.getInstance().deleteRegions(deletedRegionSet);
- }
- }
- }
-
public void addMetrics() {
partitionInfo.addMetrics();
}
@@ -453,6 +435,51 @@ public class PartitionManager {
return partitionInfo.getRegionStorageGroup(regionId);
}
+ /**
+ * Called by {@link PartitionManager#regionCleaner} Delete regions of
logical deleted storage
+ * groups periodically.
+ */
+ public void clearDeletedRegions() {
+ if (getConsensusManager().isLeader()) {
+ final Set<TRegionReplicaSet> deletedRegionSet =
partitionInfo.getDeletedRegionSet();
+ if (!deletedRegionSet.isEmpty()) {
+ LOGGER.info(
+ "DELETE REGIONS {} START",
+ deletedRegionSet.stream()
+ .map(TRegionReplicaSet::getRegionId)
+ .collect(Collectors.toList()));
+ SyncDataNodeClientPool.getInstance().deleteRegions(deletedRegionSet);
+ }
+ }
+ }
+
+ public void startRegionCleaner() {
+ synchronized (scheduleMonitor) {
+ if (currentRegionCleanerFuture == null) {
+ /* Start the RegionCleaner service */
+ currentRegionCleanerFuture =
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ regionCleaner,
+ this::clearDeletedRegions,
+ 0,
+ REGION_CLEANER_WORK_INTERVAL,
+ TimeUnit.SECONDS);
+ LOGGER.info("RegionCleaner is started successfully.");
+ }
+ }
+ }
+
+ public void stopRegionCleaner() {
+ synchronized (scheduleMonitor) {
+ if (currentRegionCleanerFuture != null) {
+ /* Stop the RegionCleaner service */
+ currentRegionCleanerFuture.cancel(false);
+ currentRegionCleanerFuture = null;
+ LOGGER.info("RegionCleaner is stopped successfully.");
+ }
+ }
+ }
+
public ScheduledExecutorService getRegionCleaner() {
return regionCleaner;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 7ed3527ab3..564a921f7e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -83,6 +83,7 @@ public class ProcedureManager {
CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
store.start();
+ LOGGER.info("ProcedureManager is started successfully.");
}
} else {
if (executor.isRunning()) {
@@ -90,6 +91,7 @@ public class ProcedureManager {
if (!executor.isRunning()) {
executor.join();
store.stop();
+ LOGGER.info("ProcedureManager is stopped successfully.");
}
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 61babc277a..ad55d0cbf2 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -225,6 +225,7 @@ public class LoadManager {
0,
heartbeatInterval,
TimeUnit.MILLISECONDS);
+ LOGGER.info("Heartbeat service is started successfully.");
}
/* Start the load balancing service */
@@ -236,6 +237,7 @@ public class LoadManager {
0,
heartbeatInterval,
TimeUnit.MILLISECONDS);
+ LOGGER.info("LoadBalancing service is started successfully.");
}
}
}
@@ -247,8 +249,10 @@ public class LoadManager {
if (currentHeartbeatFuture != null) {
currentHeartbeatFuture.cancel(false);
currentHeartbeatFuture = null;
+ LOGGER.info("Heartbeat service is stopped successfully.");
currentLoadBalancingFuture.cancel(false);
currentLoadBalancingFuture = null;
+ LOGGER.info("LoadBalancing service is stopped successfully.");
}
}
}