This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a4fe367436 [IOTDB-4643] Add Unknown_DataNode_Detector (#7585)
a4fe367436 is described below
commit a4fe3674362400a8cbff8943935ae4d1395e06ca
Author: Weihao Li <[email protected]>
AuthorDate: Mon Oct 17 10:00:05 2022 +0800
[IOTDB-4643] Add Unknown_DataNode_Detector (#7585)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 11 ++++
.../statemachine/PartitionRegionStateMachine.java | 2 +
.../iotdb/confignode/manager/ConfigManager.java | 6 ++
.../apache/iotdb/confignode/manager/IManager.java | 3 +
.../iotdb/confignode/manager/TriggerManager.java | 6 ++
.../iotdb/confignode/manager/node/NodeManager.java | 75 ++++++++++++++++++++++
6 files changed, 103 insertions(+)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 7f2fb656f5..f2b0c91bff 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -135,6 +135,9 @@ public class ConfigNodeConfig {
/** The heartbeat interval in milliseconds */
private long heartbeatInterval = 1000;
+ /** The unknown DataNode detect interval in milliseconds */
+ private long unknownDataNodeDetectInterval = heartbeatInterval;
+
/** The routing policy of read/write requests */
private String routingPolicy = RouteBalancer.LEADER_POLICY;
@@ -475,6 +478,14 @@ public class ConfigNodeConfig {
this.heartbeatInterval = heartbeatInterval;
}
+ public long getUnknownDataNodeDetectInterval() {
+ return unknownDataNodeDetectInterval;
+ }
+
+ public void setUnknownDataNodeDetectInterval(long
unknownDataNodeDetectInterval) {
+ this.unknownDataNodeDetectInterval = unknownDataNodeDetectInterval;
+ }
+
public String getRoutingPolicy() {
return routingPolicy;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index dae61189d3..bcaba4fec8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -158,6 +158,7 @@ public class PartitionRegionStateMachine
configManager.getProcedureManager().shiftExecutor(true);
configManager.getLoadManager().startLoadBalancingService();
configManager.getNodeManager().startHeartbeatService();
+ configManager.getNodeManager().startUnknownDataNodeDetector();
configManager.getPartitionManager().startRegionCleaner();
} else {
LOGGER.info(
@@ -168,6 +169,7 @@ public class PartitionRegionStateMachine
configManager.getProcedureManager().shiftExecutor(false);
configManager.getLoadManager().stopLoadBalancingService();
configManager.getNodeManager().stopHeartbeatService();
+ configManager.getNodeManager().stopUnknownDataNodeDetector();
configManager.getPartitionManager().stopRegionCleaner();
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 793297de22..a398b6f64c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1188,4 +1188,10 @@ public class ConfigManager implements IManager {
}
return null;
}
+
+ public TSStatus transfer(List<TDataNodeLocation> newUnknownDataList) {
+ LOGGER.info("start Transfer of {}", newUnknownDataList);
+ // transfer trigger
+ return triggerManager.transferTrigger(newUnknownDataList);
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 4d6aad5e57..6330eabdeb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
@@ -469,4 +470,6 @@ public interface IManager {
TGetSeriesSlotListResp getSeriesSlotList(GetSeriesSlotListPlan plan);
TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req);
+
+ TSStatus transfer(List<TDataNodeLocation> newUnknownDataList);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index 49e7a42512..acce3d6fc8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
public class TriggerManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(TriggerManager.class);
@@ -130,4 +131,9 @@ public class TriggerManager {
Collections.emptyList());
}
}
+
+ public TSStatus transferTrigger(List<TDataNodeLocation> newUnknownDataList) {
+ // TODO implement
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 9e2558838d..8ada16691d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -80,9 +80,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -99,6 +101,8 @@ public class NodeManager {
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatInterval();
+ private static final long UNKNOWN_DATANODE_DETECT_INTERVAL =
+ CONF.getUnknownDataNodeDetectInterval();
public static final TEndPoint CURRENT_NODE =
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
@@ -121,11 +125,19 @@ public class NodeManager {
private final ScheduledExecutorService heartBeatExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
+ /** Unknown DataNode Detector */
+ private Future<?> currentUnknownDataNodeDetectFuture;
+
+ private final ScheduledExecutorService unknownDataNodeDetectExecutor =
+
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Unknown-DataNode-Detector");
+ private final Set<TDataNodeLocation> oldUnknownNodes;
+
public NodeManager(IManager configManager, NodeInfo nodeInfo) {
this.configManager = configManager;
this.nodeInfo = nodeInfo;
this.removeConfigNodeLock = new ReentrantLock();
this.nodeCacheMap = new ConcurrentHashMap<>();
+ this.oldUnknownNodes = new HashSet<>();
}
private void setGlobalConfig(DataNodeRegisterResp dataSet) {
@@ -650,6 +662,69 @@ public class NodeManager {
return nodeCacheMap;
}
+ /** Start unknownDataNodeDetector */
+ public void startUnknownDataNodeDetector() {
+ synchronized (scheduleMonitor) {
+ if (currentUnknownDataNodeDetectFuture == null) {
+ currentUnknownDataNodeDetectFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ unknownDataNodeDetectExecutor,
+ this::detectTask,
+ 0,
+ UNKNOWN_DATANODE_DETECT_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Unknown-DataNode-Detector is started successfully.");
+ }
+ }
+ }
+
+ /**
+ * The detectTask executed periodically to find newest UnknownDataNodes
+ *
+ * <p>1.If one DataNode is continuing Unknown, we shouldn't always activate
Transfer of this Node.
+ *
+ * <p>2.The selected DataNodes may not truly need to transfer, so you should
ensure safety of the
+ * Data when implement transferMethod in Manager.
+ */
+ private void detectTask() {
+ List<TDataNodeLocation> newUnknownNodes = new ArrayList<>();
+
+ getRegisteredDataNodes()
+ .forEach(
+ DataNodeConfiguration -> {
+ TDataNodeLocation dataNodeLocation =
DataNodeConfiguration.getLocation();
+ BaseNodeCache newestNodeInformation =
nodeCacheMap.get(dataNodeLocation.dataNodeId);
+ if (newestNodeInformation != null) {
+ if (newestNodeInformation.getNodeStatus() ==
NodeStatus.Running) {
+ oldUnknownNodes.remove(dataNodeLocation);
+ } else if (!oldUnknownNodes.contains(dataNodeLocation)
+ && newestNodeInformation.getNodeStatus() ==
NodeStatus.Unknown) {
+ newUnknownNodes.add(dataNodeLocation);
+ }
+ }
+ });
+
+ if (!newUnknownNodes.isEmpty()) {
+ TSStatus transferResult = configManager.transfer(newUnknownNodes);
+ if (transferResult.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ oldUnknownNodes.addAll(newUnknownNodes);
+ } else {
+ LOGGER.warn("Fail to transfer because {}, will retry",
transferResult.getMessage());
+ }
+ }
+ }
+
+ /** Stop the heartbeat service */
+ public void stopUnknownDataNodeDetector() {
+ synchronized (scheduleMonitor) {
+ if (currentUnknownDataNodeDetectFuture != null) {
+ currentUnknownDataNodeDetectFuture.cancel(false);
+ currentUnknownDataNodeDetectFuture = null;
+ LOGGER.info("Unknown-DataNode-Detector is stopped successfully.");
+ }
+ }
+ }
+
public void removeNodeCache(int nodeId) {
nodeCacheMap.remove(nodeId);
}