This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e6b9d5a123 HDDS-12825. ReconIncrementalContainerReportHandler is not 
synchronized on datanode. (#8272)
e6b9d5a123 is described below

commit e6b9d5a123ec496fe5c1b905dc2cfe74f343acdb
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Apr 14 08:11:20 2025 -0700

    HDDS-12825. ReconIncrementalContainerReportHandler is not synchronized on 
datanode. (#8272)
---
 .../container/AbstractContainerReportHandler.java  | 25 ++---------
 .../hdds/scm/container/ContainerReportHandler.java |  3 +-
 .../IncrementalContainerReportHandler.java         | 39 +++++++++-------
 .../ReconIncrementalContainerReportHandler.java    | 52 ++--------------------
 4 files changed, 33 insertions(+), 86 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index e3c903da2d..52f4259442 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -61,7 +61,8 @@ abstract class AbstractContainerReportHandler {
   protected abstract Logger getLogger();
 
   /** @return the container in SCM and the replica from a datanode details for 
logging. */
-  static Object getDetailsForLogging(ContainerInfo container, 
ContainerReplicaProto replica, DatanodeDetails datanode) {
+  protected static Object getDetailsForLogging(ContainerInfo container, 
ContainerReplicaProto replica,
+      DatanodeDetails datanode) {
     Objects.requireNonNull(replica, "replica == null");
     Objects.requireNonNull(datanode, "datanode == null");
     if (container != null) {
@@ -93,21 +94,6 @@ public String toString() {
     };
   }
 
-  /**
-   * Process the given ContainerReplica received from specified datanode.
-   *
-   * @param datanodeDetails DatanodeDetails for the DN
-   * @param replicaProto Protobuf representing the replicas
-   * @param publisher EventPublisher instance
-   */
-  protected void processContainerReplica(final DatanodeDetails datanodeDetails,
-      final ContainerReplicaProto replicaProto, final EventPublisher publisher)
-      throws IOException, InvalidStateTransitionException {
-    ContainerInfo container = getContainerManager().getContainer(
-        ContainerID.valueOf(replicaProto.getContainerID()));
-    processContainerReplica(
-        datanodeDetails, container, replicaProto, publisher);
-  }
 
   /**
    * Process the given ContainerReplica received from specified datanode.
@@ -120,18 +106,15 @@ protected void processContainerReplica(final 
DatanodeDetails datanodeDetails,
    */
   protected void processContainerReplica(final DatanodeDetails datanodeDetails,
       final ContainerInfo containerInfo,
-      final ContainerReplicaProto replicaProto, final EventPublisher publisher)
+      final ContainerReplicaProto replicaProto, final EventPublisher 
publisher, Object detailsForLogging)
       throws IOException, InvalidStateTransitionException {
-    final ContainerID containerId = containerInfo.containerID();
-    final Object detailsForLogging = getDetailsForLogging(containerInfo, 
replicaProto, datanodeDetails);
-
     getLogger().debug("Processing replica {}", detailsForLogging);
     // Synchronized block should be replaced by container lock,
     // once we have introduced lock inside ContainerInfo.
     synchronized (containerInfo) {
       updateContainerStats(datanodeDetails, containerInfo, replicaProto, 
detailsForLogging);
       if (!updateContainerState(datanodeDetails, containerInfo, replicaProto, 
publisher, detailsForLogging)) {
-        updateContainerReplica(datanodeDetails, containerId, replicaProto);
+        updateContainerReplica(datanodeDetails, containerInfo.containerID(), 
replicaProto);
       }
     }
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 288c479ba2..b632b1708a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -234,8 +234,7 @@ private void processSingleReplica(final DatanodeDetails 
datanodeDetails,
       return;
     }
     try {
-      processContainerReplica(
-          datanodeDetails, container, replicaProto, publisher);
+      processContainerReplica(datanodeDetails, container, replicaProto, 
publisher, detailsForLogging);
     } catch (IOException | InvalidStateTransitionException e) {
       getLogger().error("Failed to process {}", detailsForLogging, e);
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
index 72722153b9..247e3667d9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
@@ -36,8 +36,8 @@
 /**
  * Handles incremental container reports from datanode.
  */
-public class IncrementalContainerReportHandler extends
-    AbstractContainerReportHandler
+public class IncrementalContainerReportHandler
+    extends AbstractContainerReportHandler
     implements EventHandler<IncrementalContainerReportFromDatanode> {
 
   private static final Logger LOG = LoggerFactory.getLogger(
@@ -58,14 +58,25 @@ protected Logger getLogger() {
   @Override
   public void onMessage(final IncrementalContainerReportFromDatanode report,
                         final EventPublisher publisher) {
+    final DatanodeDetails datanode = getDatanodeDetails(report);
+    if (datanode == null) {
+      return;
+    }
+    processICR(report, publisher, datanode);
+  }
+
+  protected DatanodeDetails getDatanodeDetails(final 
IncrementalContainerReportFromDatanode report) {
     final DatanodeDetails dnFromReport = report.getDatanodeDetails();
     getLogger().debug("Processing incremental container report from datanode 
{}", dnFromReport);
     final DatanodeDetails dd = getNodeManager().getNode(dnFromReport.getID());
     if (dd == null) {
       getLogger().warn("Datanode not found: {}", dnFromReport);
-      return;
     }
+    return dd;
+  }
 
+  protected void processICR(IncrementalContainerReportFromDatanode report,
+      EventPublisher publisher, DatanodeDetails dd) {
     boolean success = false;
     // HDDS-5249 - we must ensure that an ICR and FCR for the same datanode
     // do not run at the same time or it can result in a data consistency
@@ -74,13 +85,15 @@ public void onMessage(final 
IncrementalContainerReportFromDatanode report,
     synchronized (dd) {
       for (ContainerReplicaProto replicaProto :
           report.getReport().getReportList()) {
+        Object detailsForLogging = getDetailsForLogging(null, replicaProto, 
dd);
         ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
-        ContainerInfo container = null;
+        final ContainerInfo container;
         try {
           try {
             container = getContainerManager().getContainer(id);
             // Ensure we reuse the same ContainerID instance in containerInfo
             id = container.containerID();
+            detailsForLogging = getDetailsForLogging(container, replicaProto, 
dd);
           } finally {
             if (replicaProto.getState() == State.DELETED) {
               getNodeManager().removeContainer(dd, id);
@@ -89,27 +102,23 @@ public void onMessage(final 
IncrementalContainerReportFromDatanode report,
             }
           }
           if (ContainerReportValidator.validate(container, dd, replicaProto)) {
-            processContainerReplica(dd, container, replicaProto, publisher);
+            processContainerReplica(dd, container, replicaProto, publisher, 
detailsForLogging);
           }
           success = true;
         } catch (ContainerNotFoundException e) {
-          getLogger().warn("Container {} not found!", 
replicaProto.getContainerID());
+          getLogger().warn("Container not found: {}", detailsForLogging);
         } catch (NodeNotFoundException ex) {
-          getLogger().error("Datanode not found {}", 
report.getDatanodeDetails(), ex);
+          getLogger().error("{}: {}", ex, detailsForLogging);
         } catch (ContainerReplicaNotFoundException e) {
-          getLogger().warn("Container {} replica not found!",
-              replicaProto.getContainerID());
+          getLogger().warn("Container replica not found: {}", 
detailsForLogging, e);
         } catch (SCMException ex) {
           if (ex.getResult() == SCMException.ResultCodes.SCM_NOT_LEADER) {
-            getLogger().info("Failed to process {} container {}: {}",
-                replicaProto.getState(), id, ex.getMessage());
+            getLogger().info("SCM_NOT_LEADER: Failed to process {}", 
detailsForLogging);
           } else {
-            getLogger().error("Exception while processing ICR for container 
{}",
-                replicaProto.getContainerID(), ex);
+            getLogger().info("Failed to process {}", detailsForLogging, ex);
           }
         } catch (IOException | InvalidStateTransitionException e) {
-          getLogger().error("Exception while processing ICR for container {}",
-              replicaProto.getContainerID(), e);
+          getLogger().info("Failed to process {}", detailsForLogging, e);
         }
       }
     }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
index c6f10c9b5b..85fc477cb8 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
@@ -17,19 +17,13 @@
 
 package org.apache.hadoop.ozone.recon.scm;
 
-import java.io.IOException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +36,7 @@ public class ReconIncrementalContainerReportHandler
   private static final Logger LOG = LoggerFactory.getLogger(
       ReconIncrementalContainerReportHandler.class);
 
-  public ReconIncrementalContainerReportHandler(NodeManager nodeManager,
+  ReconIncrementalContainerReportHandler(NodeManager nodeManager,
              ContainerManager containerManager, SCMContext scmContext) {
     super(nodeManager, containerManager, scmContext);
   }
@@ -55,16 +49,8 @@ protected Logger getLogger() {
   @Override
   public void onMessage(final IncrementalContainerReportFromDatanode report,
                         final EventPublisher publisher) {
-    final DatanodeDetails dnFromReport = report.getDatanodeDetails();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing incremental container report from data node {}",
-          dnFromReport);
-    }
-
-    final DatanodeDetails dd = getNodeManager().getNode(dnFromReport.getID());
-    if (dd == null) {
-      LOG.warn("Received container report from unknown datanode {}",
-          dnFromReport);
+    final DatanodeDetails datanode = getDatanodeDetails(report);
+    if (datanode == null) {
       return;
     }
 
@@ -77,36 +63,6 @@ public void onMessage(final 
IncrementalContainerReportFromDatanode report,
       LOG.error("Exception while checking and adding new container.", ioEx);
       return;
     }
-    boolean success = true;
-    for (ContainerReplicaProto replicaProto :
-        report.getReport().getReportList()) {
-      ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
-      ContainerInfo container = null;
-      try {
-        try {
-          container = getContainerManager().getContainer(id);
-          // Ensure we reuse the same ContainerID instance in containerInfo
-          id = container.containerID();
-        } finally {
-          if (replicaProto.getState().equals(
-              ContainerReplicaProto.State.DELETED)) {
-            getNodeManager().removeContainer(dd, id);
-          } else {
-            getNodeManager().addContainer(dd, id);
-          }
-        }
-        processContainerReplica(dd, replicaProto, publisher);
-        success = true;
-      } catch (NodeNotFoundException ex) {
-        success = false;
-        LOG.error("Received ICR from unknown datanode {}.",
-            report.getDatanodeDetails(), ex);
-      } catch (IOException | InvalidStateTransitionException e) {
-        success = false;
-        LOG.error("Exception while processing ICR for container {}",
-            replicaProto.getContainerID());
-      }
-    }
-    containerManager.notifyContainerReportProcessing(false, success);
+    processICR(report, publisher, datanode);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to