This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a710a5f8fe HDDS-9811. Follower SCM should not process Pipeline Action
(#5712)
a710a5f8fe is described below
commit a710a5f8fe912d639a981e8e41d8a5b8a024669f
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Fri Dec 1 19:03:32 2023 +0530
HDDS-9811. Follower SCM should not process Pipeline Action (#5712)
---
.../hdds/scm/pipeline/PipelineActionHandler.java | 60 ++++++++++----
.../scm/pipeline/TestPipelineActionHandler.java | 95 +++++++++++++++++-----
2 files changed, 118 insertions(+), 37 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 2f1785cf17..db6d6e77e0 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -78,33 +79,56 @@ public class PipelineActionHandler
final ClosePipelineInfo info = pipelineAction.getClosePipeline();
final PipelineAction.Action action = pipelineAction.getAction();
final PipelineID pid = PipelineID.getFromProtobuf(info.getPipelineID());
- try {
- LOG.info("Received pipeline action {} for {} from datanode {}. " +
- "Reason : {}", action, pid, datanode.getUuidString(),
- info.getDetailedReason());
+ final String logMsg = "Received pipeline action " + action + " for " + pid
+
+ " from datanode " + datanode.getUuidString() + "." +
+ " Reason : " + info.getDetailedReason();
+
+ // We can skip processing Pipeline Action if the current SCM is not leader.
+ if (!scmContext.isLeader()) {
+ LOG.debug(logMsg);
+ LOG.debug("Cannot process Pipeline Action for pipeline {} as " +
+ "current SCM is not leader.", pid);
+ return;
+ }
+
+ LOG.info(logMsg);
+ try {
if (action == PipelineAction.Action.CLOSE) {
pipelineManager.closePipeline(pid);
} else {
- LOG.error("unknown pipeline action:{}", action);
+ LOG.error("Received unknown pipeline action {}, for pipeline {} ",
+ action, pid);
}
} catch (PipelineNotFoundException e) {
- LOG.warn("Pipeline action {} received for unknown pipeline {}, " +
- "firing close pipeline event.", action, pid);
- SCMCommand<?> command = new ClosePipelineCommand(pid);
- try {
- command.setTerm(scmContext.getTermOfLeader());
- } catch (NotLeaderException nle) {
- LOG.warn("Skip sending ClosePipelineCommand for pipeline {}," +
- " since not leader SCM.", pid);
- return;
+ closeUnknownPipeline(publisher, datanode, pid);
+ } catch (SCMException e) {
+ if (e.getResult() == SCMException.ResultCodes.SCM_NOT_LEADER) {
+ LOG.info("Cannot process Pipeline Action for pipeline {} as " +
+ "current SCM is not leader anymore.", pid);
+ } else {
+ LOG.error("Exception while processing Pipeline Action for Pipeline {}",
+ pid, e);
}
+ } catch (IOException e) {
+ LOG.error("Exception while processing Pipeline Action for Pipeline {}",
+ pid, e);
+ }
+ }
+
+ private void closeUnknownPipeline(final EventPublisher publisher,
+ final DatanodeDetails datanode,
+ final PipelineID pid) {
+ try {
+ LOG.warn("Pipeline action received for unknown Pipeline {}, " +
+ "firing close pipeline event.", pid);
+ SCMCommand<?> command = new ClosePipelineCommand(pid);
+ command.setTerm(scmContext.getTermOfLeader());
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(datanode.getUuid(), command));
- } catch (IOException ioe) {
- LOG.error("Could not execute pipeline action={} pipeline={}",
- action, pid, ioe);
+ } catch (NotLeaderException nle) {
+ LOG.info("Cannot process Pipeline Action for pipeline {} as " +
+ "current SCM is not leader anymore.", pid);
}
}
-
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
index 791220f670..0546d87617 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
@@ -18,10 +18,11 @@
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -30,42 +31,98 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
-import java.util.UUID;
/**
* Test-cases to verify the functionality of PipelineActionHandler.
*/
public class TestPipelineActionHandler {
+
+ @Test
+ public void testPipelineActionHandlerForValidPipeline() throws IOException {
+
+ final PipelineManager manager = Mockito.mock(PipelineManager.class);
+ final EventQueue queue = Mockito.mock(EventQueue.class);
+ final PipelineActionHandler actionHandler = new PipelineActionHandler(
+ manager, SCMContext.emptyContext(), null);
+ final Pipeline pipeline = HddsTestUtils.getRandomPipeline();
+
+ actionHandler.onMessage(getPipelineActionsFromDatanode(
+ pipeline.getId()), queue);
+ Mockito.verify(manager, Mockito.times(1))
+ .closePipeline(pipeline.getId());
+ }
+
@Test
- public void testCloseActionForMissingPipeline()
+ public void testPipelineActionHandlerForValidPipelineInFollower()
throws IOException {
final PipelineManager manager = Mockito.mock(PipelineManager.class);
final EventQueue queue = Mockito.mock(EventQueue.class);
+ final SCMContext context = SCMContext.emptyContext();
+ final PipelineActionHandler actionHandler = new PipelineActionHandler(
+ manager, context, null);
+ final Pipeline pipeline = HddsTestUtils.getRandomPipeline();
+
+ context.updateLeaderAndTerm(false, 1);
+ actionHandler.onMessage(getPipelineActionsFromDatanode(
+ pipeline.getId()), queue);
+ Mockito.verify(manager, Mockito.times(0))
+ .closePipeline(pipeline.getId());
+ Mockito.verify(queue, Mockito.times(0))
+ .fireEvent(Mockito.eq(SCMEvents.DATANODE_COMMAND),
+ Mockito.any(CommandForDatanode.class));
+ }
+
+ @Test
+ public void testPipelineActionHandlerForUnknownPipeline() throws IOException
{
+ final PipelineManager manager = Mockito.mock(PipelineManager.class);
+ final EventQueue queue = Mockito.mock(EventQueue.class);
+ final PipelineActionHandler actionHandler = new PipelineActionHandler(
+ manager, SCMContext.emptyContext(), null);
+ final Pipeline pipeline = HddsTestUtils.getRandomPipeline();
Mockito.doThrow(new PipelineNotFoundException())
- .when(manager).closePipeline(Mockito.any(PipelineID.class));
+ .when(manager).closePipeline(pipeline.getId());
+ actionHandler.onMessage(getPipelineActionsFromDatanode(
+ pipeline.getId()), queue);
+ Mockito.verify(queue, Mockito.times(1))
+ .fireEvent(Mockito.eq(SCMEvents.DATANODE_COMMAND),
+ Mockito.any(CommandForDatanode.class));
+ }
+
+ @Test
+ public void testPipelineActionHandlerForUnknownPipelineInFollower()
+ throws IOException {
+
+ final PipelineManager manager = Mockito.mock(PipelineManager.class);
+ final EventQueue queue = Mockito.mock(EventQueue.class);
+ final SCMContext context = SCMContext.emptyContext();
+ final PipelineActionHandler actionHandler = new PipelineActionHandler(
+ manager, context, null);
+ final Pipeline pipeline = HddsTestUtils.getRandomPipeline();
- final PipelineActionHandler actionHandler =
- new PipelineActionHandler(manager, SCMContext.emptyContext(), null);
+ context.updateLeaderAndTerm(false, 1);
+ Mockito.doThrow(new PipelineNotFoundException())
+ .when(manager).closePipeline(pipeline.getId());
+ actionHandler.onMessage(getPipelineActionsFromDatanode(
+ pipeline.getId()), queue);
+ Mockito.verify(queue, Mockito.times(0))
+ .fireEvent(Mockito.eq(SCMEvents.DATANODE_COMMAND),
+ Mockito.any(CommandForDatanode.class));
+
+ }
+ private PipelineActionsFromDatanode getPipelineActionsFromDatanode(
+ PipelineID pipelineID) {
final PipelineActionsProto actionsProto = PipelineActionsProto.newBuilder()
.addPipelineActions(PipelineAction.newBuilder()
- .setClosePipeline(ClosePipelineInfo.newBuilder()
- .setPipelineID(HddsProtos.PipelineID.newBuilder()
- .setId(UUID.randomUUID().toString()).build())
- .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED))
+ .setClosePipeline(ClosePipelineInfo.newBuilder()
+ .setPipelineID(pipelineID.getProtobuf())
+ .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED))
.setAction(PipelineAction.Action.CLOSE).build())
.build();
- final PipelineActionsFromDatanode pipelineActions =
- new PipelineActionsFromDatanode(
- MockDatanodeDetails.randomDatanodeDetails(), actionsProto);
-
- actionHandler.onMessage(pipelineActions, queue);
-
- Mockito.verify(queue, Mockito.times(1))
- .fireEvent(Mockito.any(), Mockito.any(CommandForDatanode.class));
-
+ return new PipelineActionsFromDatanode(
+ MockDatanodeDetails.randomDatanodeDetails(), actionsProto);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]