This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e6b9d5a123 HDDS-12825. ReconIncrementalContainerReportHandler is not
synchronized on datanode. (#8272)
e6b9d5a123 is described below
commit e6b9d5a123ec496fe5c1b905dc2cfe74f343acdb
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Apr 14 08:11:20 2025 -0700
HDDS-12825. ReconIncrementalContainerReportHandler is not synchronized on
datanode. (#8272)
---
.../container/AbstractContainerReportHandler.java | 25 ++---------
.../hdds/scm/container/ContainerReportHandler.java | 3 +-
.../IncrementalContainerReportHandler.java | 39 +++++++++-------
.../ReconIncrementalContainerReportHandler.java | 52 ++--------------------
4 files changed, 33 insertions(+), 86 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index e3c903da2d..52f4259442 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -61,7 +61,8 @@ abstract class AbstractContainerReportHandler {
protected abstract Logger getLogger();
/** @return the container in SCM and the replica from a datanode details for
logging. */
- static Object getDetailsForLogging(ContainerInfo container,
ContainerReplicaProto replica, DatanodeDetails datanode) {
+ protected static Object getDetailsForLogging(ContainerInfo container,
ContainerReplicaProto replica,
+ DatanodeDetails datanode) {
Objects.requireNonNull(replica, "replica == null");
Objects.requireNonNull(datanode, "datanode == null");
if (container != null) {
@@ -93,21 +94,6 @@ public String toString() {
};
}
- /**
- * Process the given ContainerReplica received from specified datanode.
- *
- * @param datanodeDetails DatanodeDetails for the DN
- * @param replicaProto Protobuf representing the replicas
- * @param publisher EventPublisher instance
- */
- protected void processContainerReplica(final DatanodeDetails datanodeDetails,
- final ContainerReplicaProto replicaProto, final EventPublisher publisher)
- throws IOException, InvalidStateTransitionException {
- ContainerInfo container = getContainerManager().getContainer(
- ContainerID.valueOf(replicaProto.getContainerID()));
- processContainerReplica(
- datanodeDetails, container, replicaProto, publisher);
- }
/**
* Process the given ContainerReplica received from specified datanode.
@@ -120,18 +106,15 @@ protected void processContainerReplica(final
DatanodeDetails datanodeDetails,
*/
protected void processContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerInfo containerInfo,
- final ContainerReplicaProto replicaProto, final EventPublisher publisher)
+ final ContainerReplicaProto replicaProto, final EventPublisher
publisher, Object detailsForLogging)
throws IOException, InvalidStateTransitionException {
- final ContainerID containerId = containerInfo.containerID();
- final Object detailsForLogging = getDetailsForLogging(containerInfo,
replicaProto, datanodeDetails);
-
getLogger().debug("Processing replica {}", detailsForLogging);
// Synchronized block should be replaced by container lock,
// once we have introduced lock inside ContainerInfo.
synchronized (containerInfo) {
updateContainerStats(datanodeDetails, containerInfo, replicaProto,
detailsForLogging);
if (!updateContainerState(datanodeDetails, containerInfo, replicaProto,
publisher, detailsForLogging)) {
- updateContainerReplica(datanodeDetails, containerId, replicaProto);
+ updateContainerReplica(datanodeDetails, containerInfo.containerID(),
replicaProto);
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 288c479ba2..b632b1708a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -234,8 +234,7 @@ private void processSingleReplica(final DatanodeDetails
datanodeDetails,
return;
}
try {
- processContainerReplica(
- datanodeDetails, container, replicaProto, publisher);
+ processContainerReplica(datanodeDetails, container, replicaProto,
publisher, detailsForLogging);
} catch (IOException | InvalidStateTransitionException e) {
getLogger().error("Failed to process {}", detailsForLogging, e);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
index 72722153b9..247e3667d9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
@@ -36,8 +36,8 @@
/**
* Handles incremental container reports from datanode.
*/
-public class IncrementalContainerReportHandler extends
- AbstractContainerReportHandler
+public class IncrementalContainerReportHandler
+ extends AbstractContainerReportHandler
implements EventHandler<IncrementalContainerReportFromDatanode> {
private static final Logger LOG = LoggerFactory.getLogger(
@@ -58,14 +58,25 @@ protected Logger getLogger() {
@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
+ final DatanodeDetails datanode = getDatanodeDetails(report);
+ if (datanode == null) {
+ return;
+ }
+ processICR(report, publisher, datanode);
+ }
+
+ protected DatanodeDetails getDatanodeDetails(final
IncrementalContainerReportFromDatanode report) {
final DatanodeDetails dnFromReport = report.getDatanodeDetails();
getLogger().debug("Processing incremental container report from datanode
{}", dnFromReport);
final DatanodeDetails dd = getNodeManager().getNode(dnFromReport.getID());
if (dd == null) {
getLogger().warn("Datanode not found: {}", dnFromReport);
- return;
}
+ return dd;
+ }
+ protected void processICR(IncrementalContainerReportFromDatanode report,
+ EventPublisher publisher, DatanodeDetails dd) {
boolean success = false;
// HDDS-5249 - we must ensure that an ICR and FCR for the same datanode
// do not run at the same time or it can result in a data consistency
@@ -74,13 +85,15 @@ public void onMessage(final
IncrementalContainerReportFromDatanode report,
synchronized (dd) {
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
+ Object detailsForLogging = getDetailsForLogging(null, replicaProto,
dd);
ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
- ContainerInfo container = null;
+ final ContainerInfo container;
try {
try {
container = getContainerManager().getContainer(id);
// Ensure we reuse the same ContainerID instance in containerInfo
id = container.containerID();
+ detailsForLogging = getDetailsForLogging(container, replicaProto,
dd);
} finally {
if (replicaProto.getState() == State.DELETED) {
getNodeManager().removeContainer(dd, id);
@@ -89,27 +102,23 @@ public void onMessage(final
IncrementalContainerReportFromDatanode report,
}
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
- processContainerReplica(dd, container, replicaProto, publisher);
+ processContainerReplica(dd, container, replicaProto, publisher,
detailsForLogging);
}
success = true;
} catch (ContainerNotFoundException e) {
- getLogger().warn("Container {} not found!",
replicaProto.getContainerID());
+ getLogger().warn("Container not found: {}", detailsForLogging);
} catch (NodeNotFoundException ex) {
- getLogger().error("Datanode not found {}",
report.getDatanodeDetails(), ex);
+ getLogger().error("{}: {}", ex, detailsForLogging);
} catch (ContainerReplicaNotFoundException e) {
- getLogger().warn("Container {} replica not found!",
- replicaProto.getContainerID());
+ getLogger().warn("Container replica not found: {}",
detailsForLogging, e);
} catch (SCMException ex) {
if (ex.getResult() == SCMException.ResultCodes.SCM_NOT_LEADER) {
- getLogger().info("Failed to process {} container {}: {}",
- replicaProto.getState(), id, ex.getMessage());
+ getLogger().info("SCM_NOT_LEADER: Failed to process {}",
detailsForLogging);
} else {
- getLogger().error("Exception while processing ICR for container
{}",
- replicaProto.getContainerID(), ex);
+ getLogger().info("Failed to process {}", detailsForLogging, ex);
}
} catch (IOException | InvalidStateTransitionException e) {
- getLogger().error("Exception while processing ICR for container {}",
- replicaProto.getContainerID(), e);
+ getLogger().info("Failed to process {}", detailsForLogging, e);
}
}
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
index c6f10c9b5b..85fc477cb8 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
@@ -17,19 +17,13 @@
package org.apache.hadoop.ozone.recon.scm;
-import java.io.IOException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +36,7 @@ public class ReconIncrementalContainerReportHandler
private static final Logger LOG = LoggerFactory.getLogger(
ReconIncrementalContainerReportHandler.class);
- public ReconIncrementalContainerReportHandler(NodeManager nodeManager,
+ ReconIncrementalContainerReportHandler(NodeManager nodeManager,
ContainerManager containerManager, SCMContext scmContext) {
super(nodeManager, containerManager, scmContext);
}
@@ -55,16 +49,8 @@ protected Logger getLogger() {
@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
- final DatanodeDetails dnFromReport = report.getDatanodeDetails();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing incremental container report from data node {}",
- dnFromReport);
- }
-
- final DatanodeDetails dd = getNodeManager().getNode(dnFromReport.getID());
- if (dd == null) {
- LOG.warn("Received container report from unknown datanode {}",
- dnFromReport);
+ final DatanodeDetails datanode = getDatanodeDetails(report);
+ if (datanode == null) {
return;
}
@@ -77,36 +63,6 @@ public void onMessage(final
IncrementalContainerReportFromDatanode report,
LOG.error("Exception while checking and adding new container.", ioEx);
return;
}
- boolean success = true;
- for (ContainerReplicaProto replicaProto :
- report.getReport().getReportList()) {
- ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
- ContainerInfo container = null;
- try {
- try {
- container = getContainerManager().getContainer(id);
- // Ensure we reuse the same ContainerID instance in containerInfo
- id = container.containerID();
- } finally {
- if (replicaProto.getState().equals(
- ContainerReplicaProto.State.DELETED)) {
- getNodeManager().removeContainer(dd, id);
- } else {
- getNodeManager().addContainer(dd, id);
- }
- }
- processContainerReplica(dd, replicaProto, publisher);
- success = true;
- } catch (NodeNotFoundException ex) {
- success = false;
- LOG.error("Received ICR from unknown datanode {}.",
- report.getDatanodeDetails(), ex);
- } catch (IOException | InvalidStateTransitionException e) {
- success = false;
- LOG.error("Exception while processing ICR for container {}",
- replicaProto.getContainerID());
- }
- }
- containerManager.notifyContainerReportProcessing(false, success);
+ processICR(report, publisher, datanode);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]