This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fb43023f79 HDDS-11331. Fix Datanode unable to report for a long time 
(#7090)
fb43023f79 is described below

commit fb43023f79fb096dbd1714feeae4ddd411605373
Author: jianghuazhu <[email protected]>
AuthorDate: Thu Aug 22 00:08:09 2024 +0800

    HDDS-11331. Fix Datanode unable to report for a long time (#7090)
---
 .../common/statemachine/StateContext.java          | 164 +++++++++++++--------
 1 file changed, 99 insertions(+), 65 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 93a4590597..55cd57d9dc 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -27,6 +27,10 @@ import java.util.Map;
 import java.util.OptionalLong;
 import java.util.Queue;
 import java.util.Set;
+import java.util.Objects;
+import java.util.LinkedHashMap;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -45,6 +49,7 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Descriptors.Descriptor;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -53,10 +58,12 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import 
org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
 import 
org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import 
org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -112,7 +119,7 @@ public class StateContext {
   private final Map<InetSocketAddress, List<Message>>
       incrementalReportsQueue;
   private final Map<InetSocketAddress, Queue<ContainerAction>> 
containerActions;
-  private final Map<InetSocketAddress, Queue<PipelineAction>> pipelineActions;
+  private final Map<InetSocketAddress, PipelineActionMap> pipelineActions;
   private DatanodeStateMachine.DatanodeStates state;
   private boolean shutdownOnError = false;
   private boolean shutdownGracefully = false;
@@ -178,7 +185,7 @@ public class StateContext {
     pipelineReports = new AtomicReference<>();
     endpoints = new HashSet<>();
     containerActions = new HashMap<>();
-    pipelineActions = new HashMap<>();
+    pipelineActions = new ConcurrentHashMap<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
     threadPoolNotAvailableCount = new AtomicLong(0);
@@ -518,47 +525,16 @@ public class StateContext {
     }
   }
 
-  /**
-   * Helper function for addPipelineActionIfAbsent that check if inputs are the
-   * same close pipeline action.
-   *
-   * Important Note: Make sure to double check for correctness before using 
this
-   * helper function for other purposes!
-   *
-   * @return true if a1 and a2 are the same close pipeline action,
-   *         false otherwise
-   */
-  boolean isSameClosePipelineAction(PipelineAction a1, PipelineAction a2) {
-    return a1.getAction() == a2.getAction()
-        && a1.hasClosePipeline()
-        && a2.hasClosePipeline()
-        && a1.getClosePipeline().getPipelineID()
-        .equals(a2.getClosePipeline().getPipelineID());
-  }
-
   /**
    * Add PipelineAction to PipelineAction queue if it's not present.
    *
    * @param pipelineAction PipelineAction to be added
    */
   public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
-    synchronized (pipelineActions) {
-      /**
-       * If pipelineAction queue already contains entry for the pipeline id
-       * with same action, we should just return.
-       * Note: We should not use pipelineActions.contains(pipelineAction) here
-       * as, pipelineAction has a msg string. So even if two msgs differ though
-       * action remains same on the given pipeline, it will end up adding it
-       * multiple times here.
-       */
-      for (InetSocketAddress endpoint : endpoints) {
-        final Queue<PipelineAction> actionsForEndpoint =
-            pipelineActions.get(endpoint);
-        if (actionsForEndpoint.stream().noneMatch(
-            action -> isSameClosePipelineAction(action, pipelineAction))) {
-          actionsForEndpoint.add(pipelineAction);
-        }
-      }
+    // Put only if the pipeline id with the same action is absent.
+    final PipelineKey key = new PipelineKey(pipelineAction);
+    for (InetSocketAddress endpoint : endpoints) {
+      pipelineActions.get(endpoint).putIfAbsent(key, pipelineAction);
     }
   }
 
@@ -571,34 +547,17 @@ public class StateContext {
   public List<PipelineAction> getPendingPipelineAction(
       InetSocketAddress endpoint,
       int maxLimit) {
-    List<PipelineAction> pipelineActionList = new ArrayList<>();
-    List<PipelineAction> persistPipelineAction = new ArrayList<>();
-    synchronized (pipelineActions) {
-      if (!pipelineActions.isEmpty() &&
-          CollectionUtils.isNotEmpty(pipelineActions.get(endpoint))) {
-        Queue<PipelineAction> actionsForEndpoint =
-            this.pipelineActions.get(endpoint);
-        int size = actionsForEndpoint.size();
-        int limit = size > maxLimit ? maxLimit : size;
-        for (int count = 0; count < limit; count++) {
-          // Add closePipeline back to the pipelineAction queue until
-          // pipeline is closed and removed from the DN.
-          PipelineAction action = actionsForEndpoint.poll();
-          if (action.hasClosePipeline()) {
-            if (parentDatanodeStateMachine.getContainer().getPipelineReport()
-                .getPipelineReportList().stream().noneMatch(
-                    report -> action.getClosePipeline().getPipelineID()
-                        .equals(report.getPipelineID()))) {
-              continue;
-            }
-            persistPipelineAction.add(action);
-          }
-          pipelineActionList.add(action);
-        }
-        actionsForEndpoint.addAll(persistPipelineAction);
-      }
-      return pipelineActionList;
+    final PipelineActionMap map = pipelineActions.get(endpoint);
+    if (map == null) {
+      return Collections.emptyList();
     }
+    final OzoneContainer ozoneContainer = parentDatanodeStateMachine.
+        getContainer();
+    if (ozoneContainer == null) {
+      return Collections.emptyList();
+    }
+    final PipelineReportsProto reports = ozoneContainer.getPipelineReport();
+    return map.getActions(reports.getPipelineReportList(), maxLimit);
   }
 
   /**
@@ -927,7 +886,7 @@ public class StateContext {
     if (!endpoints.contains(endpoint)) {
       this.endpoints.add(endpoint);
       this.containerActions.put(endpoint, new LinkedList<>());
-      this.pipelineActions.put(endpoint, new LinkedList<>());
+      this.pipelineActions.put(endpoint, new PipelineActionMap());
       this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
       Map<String, AtomicBoolean> mp = new HashMap<>();
       fullReportTypeList.forEach(e -> {
@@ -988,4 +947,79 @@ public class StateContext {
   public String getThreadNamePrefix() {
     return threadNamePrefix;
   }
+
+  static class PipelineActionMap {
+    private final LinkedHashMap<PipelineKey, PipelineAction> map =
+        new LinkedHashMap<>();
+
+    synchronized int size() {
+      return map.size();
+    }
+
+    synchronized void putIfAbsent(PipelineKey key,
+        PipelineAction pipelineAction) {
+      map.putIfAbsent(key, pipelineAction);
+    }
+
+    synchronized List<PipelineAction> getActions(List<PipelineReport> reports,
+        int max) {
+      if (map.isEmpty()) {
+        return Collections.emptyList();
+      }
+      final List<PipelineAction> pipelineActionList = new ArrayList<>();
+      final int limit = Math.min(map.size(), max);
+      final Iterator<Map.Entry<PipelineKey, PipelineAction>> i =
+          map.entrySet().iterator();
+      for (int count = 0; count < limit && i.hasNext(); count++) {
+        final Map.Entry<PipelineKey, PipelineAction> entry = i.next();
+        final PipelineAction action = entry.getValue();
+        // Add closePipeline back to the pipelineAction queue until
+        // pipeline is closed and removed from the DN.
+        if (action.hasClosePipeline()) {
+          if (reports.stream().noneMatch(entry.getKey()::equalsId)) {
+            // pipeline is removed from the DN, this action is no longer 
needed.
+            i.remove();
+            continue;
+          }
+          // pipeline is closed but not yet removed from the DN.
+        } else {
+          i.remove();
+        }
+        pipelineActionList.add(action);
+      }
+      // add all
+      return pipelineActionList;
+    }
+  }
+
+  static class PipelineKey {
+    private final HddsProtos.PipelineID pipelineID;
+    private final PipelineAction.Action action;
+
+    PipelineKey(PipelineAction p) {
+      this.pipelineID = p.getClosePipeline().getPipelineID();
+      this.action = p.getAction();
+    }
+
+    boolean equalsId(PipelineReport report) {
+      return pipelineID.equals(report.getPipelineID());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(pipelineID);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      } else if (!(obj instanceof PipelineKey)) {
+        return false;
+      }
+      final PipelineKey that = (PipelineKey) obj;
+      return Objects.equals(this.action, that.action)
+          && Objects.equals(this.pipelineID, that.pipelineID);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to