This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a4fe367436 [IOTDB-4643] Add Unknown_DataNode_Detector (#7585)
a4fe367436 is described below

commit a4fe3674362400a8cbff8943935ae4d1395e06ca
Author: Weihao Li <[email protected]>
AuthorDate: Mon Oct 17 10:00:05 2022 +0800

    [IOTDB-4643] Add Unknown_DataNode_Detector (#7585)
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    | 11 ++++
 .../statemachine/PartitionRegionStateMachine.java  |  2 +
 .../iotdb/confignode/manager/ConfigManager.java    |  6 ++
 .../apache/iotdb/confignode/manager/IManager.java  |  3 +
 .../iotdb/confignode/manager/TriggerManager.java   |  6 ++
 .../iotdb/confignode/manager/node/NodeManager.java | 75 ++++++++++++++++++++++
 6 files changed, 103 insertions(+)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 7f2fb656f5..f2b0c91bff 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -135,6 +135,9 @@ public class ConfigNodeConfig {
   /** The heartbeat interval in milliseconds */
   private long heartbeatInterval = 1000;
 
+  /** The unknown DataNode detect interval in milliseconds */
+  private long unknownDataNodeDetectInterval = heartbeatInterval;
+
   /** The routing policy of read/write requests */
   private String routingPolicy = RouteBalancer.LEADER_POLICY;
 
@@ -475,6 +478,14 @@ public class ConfigNodeConfig {
     this.heartbeatInterval = heartbeatInterval;
   }
 
+  public long getUnknownDataNodeDetectInterval() {
+    return unknownDataNodeDetectInterval;
+  }
+
+  public void setUnknownDataNodeDetectInterval(long 
unknownDataNodeDetectInterval) {
+    this.unknownDataNodeDetectInterval = unknownDataNodeDetectInterval;
+  }
+
   public String getRoutingPolicy() {
     return routingPolicy;
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index dae61189d3..bcaba4fec8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -158,6 +158,7 @@ public class PartitionRegionStateMachine
       configManager.getProcedureManager().shiftExecutor(true);
       configManager.getLoadManager().startLoadBalancingService();
       configManager.getNodeManager().startHeartbeatService();
+      configManager.getNodeManager().startUnknownDataNodeDetector();
       configManager.getPartitionManager().startRegionCleaner();
     } else {
       LOGGER.info(
@@ -168,6 +169,7 @@ public class PartitionRegionStateMachine
       configManager.getProcedureManager().shiftExecutor(false);
       configManager.getLoadManager().stopLoadBalancingService();
       configManager.getNodeManager().stopHeartbeatService();
+      configManager.getNodeManager().stopUnknownDataNodeDetector();
       configManager.getPartitionManager().stopRegionCleaner();
     }
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 793297de22..a398b6f64c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1188,4 +1188,10 @@ public class ConfigManager implements IManager {
     }
     return null;
   }
+
+  public TSStatus transfer(List<TDataNodeLocation> newUnknownDataList) {
+    LOGGER.info("start Transfer of {}", newUnknownDataList);
+    // transfer trigger
+    return triggerManager.transferTrigger(newUnknownDataList);
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 4d6aad5e57..6330eabdeb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -469,4 +470,6 @@ public interface IManager {
   TGetSeriesSlotListResp getSeriesSlotList(GetSeriesSlotListPlan plan);
 
   TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req);
+
+  TSStatus transfer(List<TDataNodeLocation> newUnknownDataList);
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index 49e7a42512..acce3d6fc8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 
 public class TriggerManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TriggerManager.class);
@@ -130,4 +131,9 @@ public class TriggerManager {
           Collections.emptyList());
     }
   }
+
+  public TSStatus transferTrigger(List<TDataNodeLocation> newUnknownDataList) {
+    // TODO implement
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 9e2558838d..8ada16691d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -80,9 +80,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -99,6 +101,8 @@ public class NodeManager {
 
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
   public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatInterval();
+  private static final long UNKNOWN_DATANODE_DETECT_INTERVAL =
+      CONF.getUnknownDataNodeDetectInterval();
 
   public static final TEndPoint CURRENT_NODE =
       new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
@@ -121,11 +125,19 @@ public class NodeManager {
   private final ScheduledExecutorService heartBeatExecutor =
       
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
 
+  /** Unknown DataNode Detector */
+  private Future<?> currentUnknownDataNodeDetectFuture;
+
+  private final ScheduledExecutorService unknownDataNodeDetectExecutor =
+      
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Unknown-DataNode-Detector");
+  private final Set<TDataNodeLocation> oldUnknownNodes;
+
   public NodeManager(IManager configManager, NodeInfo nodeInfo) {
     this.configManager = configManager;
     this.nodeInfo = nodeInfo;
     this.removeConfigNodeLock = new ReentrantLock();
     this.nodeCacheMap = new ConcurrentHashMap<>();
+    this.oldUnknownNodes = new HashSet<>();
   }
 
   private void setGlobalConfig(DataNodeRegisterResp dataSet) {
@@ -650,6 +662,69 @@ public class NodeManager {
     return nodeCacheMap;
   }
 
+  /** Start unknownDataNodeDetector */
+  public void startUnknownDataNodeDetector() {
+    synchronized (scheduleMonitor) {
+      if (currentUnknownDataNodeDetectFuture == null) {
+        currentUnknownDataNodeDetectFuture =
+            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                unknownDataNodeDetectExecutor,
+                this::detectTask,
+                0,
+                UNKNOWN_DATANODE_DETECT_INTERVAL,
+                TimeUnit.MILLISECONDS);
+        LOGGER.info("Unknown-DataNode-Detector is started successfully.");
+      }
+    }
+  }
+
+  /**
+   * The detectTask executed periodically to find newest UnknownDataNodes
+   *
+   * <p>1.If one DataNode is continuing Unknown, we shouldn't always activate 
Transfer of this Node.
+   *
+   * <p>2.The selected DataNodes may not truly need to transfer, so you should 
ensure safety of the
+   * Data when implement transferMethod in Manager.
+   */
+  private void detectTask() {
+    List<TDataNodeLocation> newUnknownNodes = new ArrayList<>();
+
+    getRegisteredDataNodes()
+        .forEach(
+            DataNodeConfiguration -> {
+              TDataNodeLocation dataNodeLocation = 
DataNodeConfiguration.getLocation();
+              BaseNodeCache newestNodeInformation = 
nodeCacheMap.get(dataNodeLocation.dataNodeId);
+              if (newestNodeInformation != null) {
+                if (newestNodeInformation.getNodeStatus() == 
NodeStatus.Running) {
+                  oldUnknownNodes.remove(dataNodeLocation);
+                } else if (!oldUnknownNodes.contains(dataNodeLocation)
+                    && newestNodeInformation.getNodeStatus() == 
NodeStatus.Unknown) {
+                  newUnknownNodes.add(dataNodeLocation);
+                }
+              }
+            });
+
+    if (!newUnknownNodes.isEmpty()) {
+      TSStatus transferResult = configManager.transfer(newUnknownNodes);
+      if (transferResult.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        oldUnknownNodes.addAll(newUnknownNodes);
+      } else {
+        LOGGER.warn("Fail to transfer because {}, will retry", 
transferResult.getMessage());
+      }
+    }
+  }
+
+  /** Stop the heartbeat service */
+  public void stopUnknownDataNodeDetector() {
+    synchronized (scheduleMonitor) {
+      if (currentUnknownDataNodeDetectFuture != null) {
+        currentUnknownDataNodeDetectFuture.cancel(false);
+        currentUnknownDataNodeDetectFuture = null;
+        LOGGER.info("Unknown-DataNode-Detector is stopped successfully.");
+      }
+    }
+  }
+
   public void removeNodeCache(int nodeId) {
     nodeCacheMap.remove(nodeId);
   }

Reply via email to