This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a710a5f8fe HDDS-9811. Follower SCM should not process Pipeline Action 
(#5712)
a710a5f8fe is described below

commit a710a5f8fe912d639a981e8e41d8a5b8a024669f
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Fri Dec 1 19:03:32 2023 +0530

    HDDS-9811. Follower SCM should not process Pipeline Action (#5712)
---
 .../hdds/scm/pipeline/PipelineActionHandler.java   | 60 ++++++++++----
 .../scm/pipeline/TestPipelineActionHandler.java    | 95 +++++++++++++++++-----
 2 files changed, 118 insertions(+), 37 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 2f1785cf17..db6d6e77e0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
 
@@ -78,33 +79,56 @@ public class PipelineActionHandler
     final ClosePipelineInfo info = pipelineAction.getClosePipeline();
     final PipelineAction.Action action = pipelineAction.getAction();
     final PipelineID pid = PipelineID.getFromProtobuf(info.getPipelineID());
-    try {
-      LOG.info("Received pipeline action {} for {} from datanode {}. " +
-          "Reason : {}", action, pid, datanode.getUuidString(),
-          info.getDetailedReason());
 
+    final String logMsg = "Received pipeline action " + action + " for " + pid 
+
+        " from datanode " + datanode.getUuidString() + "." +
+        " Reason : " + info.getDetailedReason();
+
+    // We can skip processing Pipeline Action if the current SCM is not leader.
+    if (!scmContext.isLeader()) {
+      LOG.debug(logMsg);
+      LOG.debug("Cannot process Pipeline Action for pipeline {} as " +
+          "current SCM is not leader.", pid);
+      return;
+    }
+
+    LOG.info(logMsg);
+    try {
       if (action == PipelineAction.Action.CLOSE) {
         pipelineManager.closePipeline(pid);
       } else {
-        LOG.error("unknown pipeline action:{}", action);
+        LOG.error("Received unknown pipeline action {}, for pipeline {} ",
+            action, pid);
       }
     } catch (PipelineNotFoundException e) {
-      LOG.warn("Pipeline action {} received for unknown pipeline {}, " +
-          "firing close pipeline event.", action, pid);
-      SCMCommand<?> command = new ClosePipelineCommand(pid);
-      try {
-        command.setTerm(scmContext.getTermOfLeader());
-      } catch (NotLeaderException nle) {
-        LOG.warn("Skip sending ClosePipelineCommand for pipeline {}," +
-            " since not leader SCM.", pid);
-        return;
+      closeUnknownPipeline(publisher, datanode, pid);
+    }  catch (SCMException e) {
+      if (e.getResult() == SCMException.ResultCodes.SCM_NOT_LEADER) {
+        LOG.info("Cannot process Pipeline Action for pipeline {} as " +
+            "current SCM is not leader anymore.", pid);
+      } else {
+        LOG.error("Exception while processing Pipeline Action for Pipeline {}",
+            pid, e);
       }
+    } catch (IOException e) {
+      LOG.error("Exception while processing Pipeline Action for Pipeline {}",
+          pid, e);
+    }
+  }
+
+  private void closeUnknownPipeline(final EventPublisher publisher,
+                                    final DatanodeDetails datanode,
+                                    final PipelineID pid) {
+    try {
+      LOG.warn("Pipeline action received for unknown Pipeline {}, " +
+          "firing close pipeline event.", pid);
+      SCMCommand<?> command = new ClosePipelineCommand(pid);
+      command.setTerm(scmContext.getTermOfLeader());
       publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
           new CommandForDatanode<>(datanode.getUuid(), command));
-    } catch (IOException ioe) {
-      LOG.error("Could not execute pipeline action={} pipeline={}",
-          action, pid, ioe);
+    } catch (NotLeaderException nle) {
+      LOG.info("Cannot process Pipeline Action for pipeline {} as " +
+          "current SCM is not leader anymore.", pid);
     }
   }
-
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
index 791220f670..0546d87617 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
@@ -18,10 +18,11 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -30,42 +31,98 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.UUID;
 
 /**
  * Test-cases to verify the functionality of PipelineActionHandler.
  */
 public class TestPipelineActionHandler {
 
+
+  @Test
+  public void testPipelineActionHandlerForValidPipeline() throws IOException {
+
+    final PipelineManager manager = Mockito.mock(PipelineManager.class);
+    final EventQueue queue = Mockito.mock(EventQueue.class);
+    final PipelineActionHandler actionHandler = new PipelineActionHandler(
+        manager, SCMContext.emptyContext(), null);
+    final Pipeline pipeline = HddsTestUtils.getRandomPipeline();
+
+    actionHandler.onMessage(getPipelineActionsFromDatanode(
+        pipeline.getId()), queue);
+    Mockito.verify(manager, Mockito.times(1))
+        .closePipeline(pipeline.getId());
+  }
+
   @Test
-  public void testCloseActionForMissingPipeline()
+  public void testPipelineActionHandlerForValidPipelineInFollower()
       throws IOException {
     final PipelineManager manager = Mockito.mock(PipelineManager.class);
     final EventQueue queue = Mockito.mock(EventQueue.class);
+    final SCMContext context = SCMContext.emptyContext();
+    final PipelineActionHandler actionHandler = new PipelineActionHandler(
+        manager, context, null);
+    final Pipeline pipeline = HddsTestUtils.getRandomPipeline();
+
+    context.updateLeaderAndTerm(false, 1);
+    actionHandler.onMessage(getPipelineActionsFromDatanode(
+        pipeline.getId()), queue);
+    Mockito.verify(manager, Mockito.times(0))
+        .closePipeline(pipeline.getId());
+    Mockito.verify(queue, Mockito.times(0))
+        .fireEvent(Mockito.eq(SCMEvents.DATANODE_COMMAND),
+            Mockito.any(CommandForDatanode.class));
+  }
+
+  @Test
+  public void testPipelineActionHandlerForUnknownPipeline() throws IOException 
{
+    final PipelineManager manager = Mockito.mock(PipelineManager.class);
+    final EventQueue queue = Mockito.mock(EventQueue.class);
+    final PipelineActionHandler actionHandler = new PipelineActionHandler(
+        manager, SCMContext.emptyContext(), null);
+    final Pipeline pipeline = HddsTestUtils.getRandomPipeline();
 
     Mockito.doThrow(new PipelineNotFoundException())
-        .when(manager).closePipeline(Mockito.any(PipelineID.class));
+        .when(manager).closePipeline(pipeline.getId());
+    actionHandler.onMessage(getPipelineActionsFromDatanode(
+        pipeline.getId()), queue);
+    Mockito.verify(queue, Mockito.times(1))
+        .fireEvent(Mockito.eq(SCMEvents.DATANODE_COMMAND),
+            Mockito.any(CommandForDatanode.class));
+  }
+
+  @Test
+  public void testPipelineActionHandlerForUnknownPipelineInFollower()
+      throws IOException {
+
+    final PipelineManager manager = Mockito.mock(PipelineManager.class);
+    final EventQueue queue = Mockito.mock(EventQueue.class);
+    final SCMContext context = SCMContext.emptyContext();
+    final PipelineActionHandler actionHandler = new PipelineActionHandler(
+        manager, context, null);
+    final Pipeline pipeline = HddsTestUtils.getRandomPipeline();
 
-    final PipelineActionHandler actionHandler =
-        new PipelineActionHandler(manager, SCMContext.emptyContext(), null);
+    context.updateLeaderAndTerm(false, 1);
+    Mockito.doThrow(new PipelineNotFoundException())
+        .when(manager).closePipeline(pipeline.getId());
+    actionHandler.onMessage(getPipelineActionsFromDatanode(
+        pipeline.getId()), queue);
+    Mockito.verify(queue, Mockito.times(0))
+        .fireEvent(Mockito.eq(SCMEvents.DATANODE_COMMAND),
+            Mockito.any(CommandForDatanode.class));
+
+  }
 
+  private PipelineActionsFromDatanode getPipelineActionsFromDatanode(
+      PipelineID pipelineID) {
     final PipelineActionsProto actionsProto = PipelineActionsProto.newBuilder()
         .addPipelineActions(PipelineAction.newBuilder()
-        .setClosePipeline(ClosePipelineInfo.newBuilder()
-            .setPipelineID(HddsProtos.PipelineID.newBuilder()
-                .setId(UUID.randomUUID().toString()).build())
-            .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED))
+            .setClosePipeline(ClosePipelineInfo.newBuilder()
+                .setPipelineID(pipelineID.getProtobuf())
+                .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED))
             .setAction(PipelineAction.Action.CLOSE).build())
         .build();
-    final PipelineActionsFromDatanode pipelineActions =
-        new PipelineActionsFromDatanode(
-            MockDatanodeDetails.randomDatanodeDetails(), actionsProto);
-
-    actionHandler.onMessage(pipelineActions, queue);
-
-    Mockito.verify(queue, Mockito.times(1))
-        .fireEvent(Mockito.any(), Mockito.any(CommandForDatanode.class));
-
+    return new PipelineActionsFromDatanode(
+        MockDatanodeDetails.randomDatanodeDetails(), actionsProto);
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to