szetszwo commented on code in PR #7090:
URL: https://github.com/apache/ozone/pull/7090#discussion_r1721222047


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -988,4 +999,39 @@ public DatanodeQueueMetrics getQueueMetrics() {
   public String getThreadNamePrefix() {
     return threadNamePrefix;
   }
+
+  static class PipelineKey {
+    private final HddsProtos.PipelineID pipelineID;
+    private final PipelineAction.Action action;
+
+    PipelineKey(HddsProtos.PipelineID pipelineID, PipelineAction.Action 
action) {

Review Comment:
   We may pass `PipelineAction` instead.
   ```java 
       PipelineKey(PipelineAction p) {
         this.pipelineID = p.getClosePipeline().getPipelineID();
         this.action = p.getAction();
       }
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -988,4 +999,39 @@ public DatanodeQueueMetrics getQueueMetrics() {
   public String getThreadNamePrefix() {
     return threadNamePrefix;
   }
+
+  static class PipelineKey {
+    private final HddsProtos.PipelineID pipelineID;
+    private final PipelineAction.Action action;
+
+    PipelineKey(HddsProtos.PipelineID pipelineID, PipelineAction.Action 
action) {
+      this.pipelineID = pipelineID;
+      this.action = action;
+    }
+
+    public HddsProtos.PipelineID getPipelineID() {
+      return pipelineID;
+    }
+
+    public PipelineAction.Action getAction() {
+      return action;
+    }

Review Comment:
   We may have an `equalsId` method instead.
   ```java
       boolean equalsId(PipelineReport report) {
         return pipelineID.equals(report.getPipelineID());
       }
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -112,7 +115,7 @@ public class StateContext {
   private final Map<InetSocketAddress, List<Message>>
       incrementalReportsQueue;
   private final Map<InetSocketAddress, Queue<ContainerAction>> 
containerActions;
-  private final Map<InetSocketAddress, Queue<PipelineAction>> pipelineActions;
+  private final Map<InetSocketAddress, LinkedHashMap<PipelineKey, 
PipelineAction>> pipelineActions;

Review Comment:
   Let's also add a new class for the inner map.  It is easier to see the 
synchronization.
   ```java
     static class ActionMap {
       private final LinkedHashMap<PipelineKey, PipelineAction> map = new 
LinkedHashMap<>();
   
       synchronized int size() {
         return map.size();
       }
   
       synchronized void putIfAbsent(PipelineKey key, PipelineAction 
pipelineAction) {
         map.putIfAbsent(key, pipelineAction);
       }
   
       synchronized List<PipelineAction> getActions(List<PipelineReport> 
reports, int max) {
         if (map.isEmpty()) {
           return Collections.emptyList();
         }
         final List<PipelineAction> pipelineActionList = new ArrayList<>();
         final int limit = Math.min(map.size(), max);
         final Iterator<Map.Entry<PipelineKey, PipelineAction>> i = 
map.entrySet().iterator();
         for (int count = 0; count < limit && i.hasNext(); count++) {
           final Map.Entry<PipelineKey, PipelineAction> entry = i.next();
           final PipelineAction action = entry.getValue();
   
           // Add closePipeline back to the pipelineAction queue until
           // pipeline is closed and removed from the DN.
           if (action.hasClosePipeline()) {
             if (reports.stream().noneMatch(entry.getKey()::equalsId)) {
               // pipeline is removed from the DN, this action is no longer 
needed.
               i.remove();
               continue;
             }
             // pipeline is closed but not yet removed from the DN.
           } else {
             i.remove();
           }
           pipelineActionList.add(action);
         }
         // add all
         return pipelineActionList;
       }
     }
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -542,21 +545,23 @@ boolean isSameClosePipelineAction(PipelineAction a1, 
PipelineAction a2) {
    * @param pipelineAction PipelineAction to be added
    */
   public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
-    synchronized (pipelineActions) {
-      /**
-       * If pipelineAction queue already contains entry for the pipeline id
-       * with same action, we should just return.
-       * Note: We should not use pipelineActions.contains(pipelineAction) here
-       * as, pipelineAction has a msg string. So even if two msgs differ though
-       * action remains same on the given pipeline, it will end up adding it
-       * multiple times here.
-       */
-      for (InetSocketAddress endpoint : endpoints) {
-        final Queue<PipelineAction> actionsForEndpoint =
-            pipelineActions.get(endpoint);
-        if (actionsForEndpoint.stream().noneMatch(
+    /**
+     * If pipelineAction queue already contains entry for the pipeline id
+     * with same action, we should just return.
+     * Note: We should not use pipelineActions.contains(pipelineAction) here
+     * as, pipelineAction has a msg string. So even if two msgs differ though
+     * action remains same on the given pipeline, it will end up adding it
+     * multiple times here.
+     */
+    for (InetSocketAddress endpoint : endpoints) {
+      final Map<PipelineKey, PipelineAction> actionsForEndpoint =
+          pipelineActions.get(endpoint);
+      synchronized (actionsForEndpoint) {
+        if (actionsForEndpoint.values().stream().noneMatch(

Review Comment:
   Since it is a map, we don't need to call stream().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to