This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fa8494cd1d HDDS-5331. Recon: Trigger PipelineSyncTask when DN becomes 
stale and ContainerHealthTask when DN becomes dead. (#4011)
fa8494cd1d is described below

commit fa8494cd1d31f501985f6b62d7b2f3673ac3f705
Author: devmadhuu <[email protected]>
AuthorDate: Thu Dec 22 02:25:11 2022 +0530

    HDDS-5331. Recon: Trigger PipelineSyncTask when DN becomes stale and 
ContainerHealthTask when DN becomes dead. (#4011)
---
 .../ozone/recon/fsck/ContainerHealthTask.java      | 49 +++++++++++-------
 .../hadoop/ozone/recon/scm/PipelineSyncTask.java   | 33 ++++++++----
 .../ozone/recon/scm/ReconDeadNodeHandler.java      | 12 ++++-
 .../ozone/recon/scm/ReconStaleNodeHandler.java     | 58 ++++++++++++++++++++++
 .../scm/ReconStorageContainerManagerFacade.java    | 35 +++++++------
 5 files changed, 143 insertions(+), 44 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index 4badac672e..59ca2ffd1a 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -56,6 +58,8 @@ public class ContainerHealthTask extends ReconScmTask {
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerHealthTask.class);
 
+  private ReadWriteLock lock = new ReentrantReadWriteLock(true);
+
   private StorageContainerServiceProvider scmClient;
   private ContainerManager containerManager;
   private ContainerHealthSchemaManager containerHealthSchemaManager;
@@ -79,26 +83,11 @@ public class ContainerHealthTask extends ReconScmTask {
   }
 
   @Override
-  public synchronized void run() {
+  public void run() {
     try {
       while (canRun()) {
-        wait(interval);
-        long start = Time.monotonicNow();
-        long currentTime = System.currentTimeMillis();
-        long existingCount = processExistingDBRecords(currentTime);
-        LOG.info("Container Health task thread took {} milliseconds to" +
-                " process {} existing database records.",
-            Time.monotonicNow() - start, existingCount);
-        start = Time.monotonicNow();
-        final List<ContainerInfo> containers = 
containerManager.getContainers();
-        containers.stream()
-            .filter(c -> !processedContainers.contains(c))
-            .forEach(c -> processContainer(c, currentTime));
-        recordSingleRunCompletion();
-        LOG.info("Container Health task thread took {} milliseconds for" +
-                " processing {} containers.", Time.monotonicNow() - start,
-            containers.size());
-        processedContainers.clear();
+        triggerContainerHealthCheck();
+        Thread.sleep(interval);
       }
     } catch (Throwable t) {
       LOG.error("Exception in Missing Container task Thread.", t);
@@ -108,6 +97,30 @@ public class ContainerHealthTask extends ReconScmTask {
     }
   }
 
+  public void triggerContainerHealthCheck() {
+    lock.writeLock().lock();
+    try {
+      long start = Time.monotonicNow();
+      long currentTime = System.currentTimeMillis();
+      long existingCount = processExistingDBRecords(currentTime);
+      LOG.info("Container Health task thread took {} milliseconds to" +
+              " process {} existing database records.",
+          Time.monotonicNow() - start, existingCount);
+      start = Time.monotonicNow();
+      final List<ContainerInfo> containers = containerManager.getContainers();
+      containers.stream()
+          .filter(c -> !processedContainers.contains(c))
+          .forEach(c -> processContainer(c, currentTime));
+      recordSingleRunCompletion();
+      LOG.info("Container Health task thread took {} milliseconds for" +
+              " processing {} containers.", Time.monotonicNow() - start,
+          containers.size());
+      processedContainers.clear();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
   private ContainerHealthStatus setCurrentContainer(long recordId)
       throws ContainerNotFoundException {
     ContainerInfo container =
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java
index 43bad418b7..b802efcf48 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java
@@ -22,6 +22,9 @@ import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -49,6 +52,8 @@ public class PipelineSyncTask extends ReconScmTask {
   private StorageContainerServiceProvider scmClient;
   private ReconPipelineManager reconPipelineManager;
   private ReconNodeManager nodeManager;
+
+  private ReadWriteLock lock = new ReentrantReadWriteLock(true);
   private final long interval;
 
   public PipelineSyncTask(ReconPipelineManager pipelineManager,
@@ -64,17 +69,11 @@ public class PipelineSyncTask extends ReconScmTask {
   }
 
   @Override
-  protected synchronized void run() {
+  public void run() {
     try {
       while (canRun()) {
-        long start = Time.monotonicNow();
-        List<Pipeline> pipelinesFromScm = scmClient.getPipelines();
-        reconPipelineManager.initializePipelines(pipelinesFromScm);
-        syncOperationalStateOnDeadNodes();
-        LOG.info("Pipeline sync Thread took {} milliseconds.",
-            Time.monotonicNow() - start);
-        recordSingleRunCompletion();
-        wait(interval);
+        triggerPipelineSyncTask();
+        Thread.sleep(interval);
       }
     } catch (Throwable t) {
       LOG.error("Exception in Pipeline sync Thread.", t);
@@ -84,6 +83,22 @@ public class PipelineSyncTask extends ReconScmTask {
     }
   }
 
+  public void triggerPipelineSyncTask()
+      throws IOException, TimeoutException, NodeNotFoundException {
+    lock.writeLock().lock();
+    try {
+      long start = Time.monotonicNow();
+      List<Pipeline> pipelinesFromScm = scmClient.getPipelines();
+      reconPipelineManager.initializePipelines(pipelinesFromScm);
+      syncOperationalStateOnDeadNodes();
+      LOG.info("Pipeline sync Thread took {} milliseconds.",
+          Time.monotonicNow() - start);
+      recordSingleRunCompletion();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
   /**
    * For every dead node in Recon, update Operational state with that on SCM
    * if different.
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDeadNodeHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDeadNodeHandler.java
index bc1b43a8a4..b64244a124 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDeadNodeHandler.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDeadNodeHandler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.recon.fsck.ContainerHealthTask;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,15 +41,20 @@ public class ReconDeadNodeHandler extends DeadNodeHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(ReconDeadNodeHandler.class);
 
-
   private StorageContainerServiceProvider scmClient;
+  private ContainerHealthTask containerHealthTask;
+  private PipelineSyncTask pipelineSyncTask;
 
   public ReconDeadNodeHandler(NodeManager nodeManager,
                               PipelineManager pipelineManager,
                               ContainerManager containerManager,
-                              StorageContainerServiceProvider scmClient) {
+                              StorageContainerServiceProvider scmClient,
+                              ContainerHealthTask containerHealthTask,
+                              PipelineSyncTask pipelineSyncTask) {
     super(nodeManager, pipelineManager, containerManager);
     this.scmClient = scmClient;
+    this.containerHealthTask = containerHealthTask;
+    this.pipelineSyncTask = pipelineSyncTask;
   }
 
   @Override
@@ -71,6 +77,8 @@ public class ReconDeadNodeHandler extends DeadNodeHandler {
         LOG.warn("Node {} has reached DEAD state, but SCM does not have " +
             "information about it.", datanodeDetails);
       }
+      containerHealthTask.triggerContainerHealthCheck();
+      pipelineSyncTask.triggerPipelineSyncTask();
     } catch (Exception ioEx) {
       LOG.error("Error trying to verify Node operational state from SCM.",
           ioEx);
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStaleNodeHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStaleNodeHandler.java
new file mode 100644
index 0000000000..998f063924
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStaleNodeHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.scm;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recon's handling of Stale node.
+ */
+public class ReconStaleNodeHandler extends StaleNodeHandler {
+
+  private static final Logger LOG =
+        LoggerFactory.getLogger(ReconStaleNodeHandler.class);
+  private PipelineSyncTask pipelineSyncTask;
+
+  public ReconStaleNodeHandler(NodeManager nodeManager,
+                                 PipelineManager pipelineManager,
+                                 OzoneConfiguration conf,
+                                 PipelineSyncTask pipelineSyncTask) {
+    super(nodeManager, pipelineManager, conf);
+    this.pipelineSyncTask = pipelineSyncTask;
+  }
+
+  @Override
+  public void onMessage(final DatanodeDetails datanodeDetails,
+                          final EventPublisher publisher) {
+    super.onMessage(datanodeDetails, publisher);
+    try {
+      pipelineSyncTask.triggerPipelineSyncTask();
+    } catch (Exception exp) {
+      LOG.error("Error trying to trigger pipeline sync task..",
+          exp);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 51499a0d6c..b93e04cbdd 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -197,10 +197,26 @@ public class ReconStorageContainerManagerFacade
     PipelineActionHandler pipelineActionHandler =
         new PipelineActionHandler(pipelineManager, scmContext, conf);
 
+    ReconTaskConfig reconTaskConfig = conf.getObject(ReconTaskConfig.class);
+    PipelineSyncTask pipelineSyncTask = new PipelineSyncTask(
+        pipelineManager,
+        nodeManager,
+        scmServiceProvider,
+        reconTaskStatusDao,
+        reconTaskConfig);
+    ContainerHealthTask containerHealthTask = new ContainerHealthTask(
+        containerManager,
+        scmServiceProvider,
+        reconTaskStatusDao, containerHealthSchemaManager,
+        containerPlacementPolicy,
+        reconTaskConfig);
+
     StaleNodeHandler staleNodeHandler =
-        new StaleNodeHandler(nodeManager, pipelineManager, conf);
+        new ReconStaleNodeHandler(nodeManager, pipelineManager,
+            conf, pipelineSyncTask);
     DeadNodeHandler deadNodeHandler = new ReconDeadNodeHandler(nodeManager,
-        pipelineManager, containerManager, scmServiceProvider);
+        pipelineManager, containerManager,
+        scmServiceProvider, containerHealthTask, pipelineSyncTask);
 
     ContainerReportHandler containerReportHandler =
         new ReconContainerReportHandler(nodeManager, containerManager);
@@ -268,19 +284,8 @@ public class ReconStorageContainerManagerFacade
     eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
     eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
 
-    ReconTaskConfig reconTaskConfig = conf.getObject(ReconTaskConfig.class);
-    reconScmTasks.add(new PipelineSyncTask(
-        pipelineManager,
-        nodeManager,
-        scmServiceProvider,
-        reconTaskStatusDao,
-        reconTaskConfig));
-    reconScmTasks.add(new ContainerHealthTask(
-        containerManager,
-        scmServiceProvider,
-        reconTaskStatusDao, containerHealthSchemaManager,
-        containerPlacementPolicy,
-        reconTaskConfig));
+    reconScmTasks.add(pipelineSyncTask);
+    reconScmTasks.add(containerHealthTask);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to