This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ac68072619 HDDS-9959. Propagate group remove to other datanodes during 
pipeline close (#5827)
ac68072619 is described below

commit ac6807261937415b3f9c00b90b065a57aa5858c4
Author: Ivan Andika <[email protected]>
AuthorDate: Thu Jan 4 20:32:44 2024 +0700

    HDDS-9959. Propagate group remove to other datanodes during pipeline close 
(#5827)
---
 .../common/statemachine/DatanodeStateMachine.java  |   4 +-
 .../ClosePipelineCommandHandler.java               |  45 ++++++-
 .../transport/server/ratis/XceiverServerRatis.java |  10 ++
 .../TestClosePipelineCommandHandler.java           | 144 +++++++++++++++++++++
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |  65 +---------
 5 files changed, 201 insertions(+), 67 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 51290cf80d..9677144054 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -250,8 +250,8 @@ public class DatanodeStateMachine implements Closeable {
         .addHandler(new DeleteContainerCommandHandler(
             dnConf.getContainerDeleteThreads(), clock,
             dnConf.getCommandQueueLimit(), threadNamePrefix))
-        .addHandler(
-            new ClosePipelineCommandHandler(pipelineCommandExecutorService))
+        .addHandler(new ClosePipelineCommandHandler(conf,
+            pipelineCommandExecutorService))
         .addHandler(new CreatePipelineCommandHandler(conf,
             pipelineCommandExecutorService))
         .addHandler(new SetNodeOperationalStateCommandHandler(conf,
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index f332ad4f13..5242c8686d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -16,28 +16,38 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server
     .XceiverServerSpi;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
 
 /**
  * Handler for close pipeline command received from SCM.
@@ -51,11 +61,23 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
   private final AtomicInteger queuedCount = new AtomicInteger(0);
   private long totalTime;
   private final Executor executor;
+  private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
 
   /**
    * Constructs a closePipelineCommand handler.
    */
-  public ClosePipelineCommandHandler(Executor executor) {
+  public ClosePipelineCommandHandler(ConfigurationSource conf,
+                                     Executor executor) {
+    this(RatisHelper.newRaftClient(conf), executor);
+  }
+
+  /**
+   * Constructs a closePipelineCommand handler.
+   */
+  public ClosePipelineCommandHandler(
+      BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient,
+      Executor executor) {
+    this.newRaftClient = newRaftClient;
     this.executor = executor;
   }
 
@@ -84,6 +106,27 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
         XceiverServerSpi server = ozoneContainer.getWriteChannel();
         if (server.isExist(pipelineIdProto)) {
           server.removeGroup(pipelineIdProto);
+          if (server instanceof XceiverServerRatis) {
+            // TODO: Refactor Ratis logic to XceiverServerRatis
+            // Propagate the group remove to the other Raft peers in the 
pipeline
+            XceiverServerRatis ratisServer = (XceiverServerRatis) server;
+            final RaftGroupId raftGroupId = 
RaftGroupId.valueOf(pipelineID.getId());
+            final Collection<RaftPeer> peers = 
ratisServer.getRaftPeersInPipeline(pipelineID);
+            final boolean shouldDeleteRatisLogDirectory = 
ratisServer.getShouldDeleteRatisLogDirectory();
+            peers.stream()
+                .filter(peer -> 
!peer.getId().equals(ratisServer.getServer().getId()))
+                .forEach(peer -> {
+                  try (RaftClient client = newRaftClient.apply(peer, 
ozoneContainer.getTlsClientConfig())) {
+                    client.getGroupManagementApi(peer.getId())
+                        .remove(raftGroupId, shouldDeleteRatisLogDirectory, 
!shouldDeleteRatisLogDirectory);
+                  } catch (GroupMismatchException ae) {
+                    // ignore silently since this means that the group has 
been closed by earlier close pipeline
+                    // command in another datanode
+                  } catch (IOException ioe) {
+                    LOG.warn("Failed to remove group {} for peer {}", 
raftGroupId, peer.getId(), ioe);
+                  }
+                });
+          }
           LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
               dn.getUuidString());
         } else {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 4688ce4b27..6d119b17b3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -83,6 +83,7 @@ import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.protocol.ClientId;
@@ -622,6 +623,10 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
     return server.getDivision(id);
   }
 
+  public boolean getShouldDeleteRatisLogDirectory() {
+    return this.shouldDeleteRatisLogDirectory;
+  }
+
   private void processReply(RaftClientReply reply) throws IOException {
     // NotLeader exception is thrown only when the raft server to which the
     // request is submitted is not the leader. The request will be rejected
@@ -919,6 +924,11 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
     return minIndex == null ? -1 : minIndex;
   }
 
+  public Collection<RaftPeer> getRaftPeersInPipeline(PipelineID pipelineId) 
throws IOException {
+    final RaftGroupId groupId = RaftGroupId.valueOf(pipelineId.getId());
+    return server.getDivision(groupId).getGroup().getPeers();
+  }
+
   public void notifyGroupRemove(RaftGroupId gid) {
     raftGids.remove(gid);
     // Remove any entries for group leader map
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
new file mode 100644
index 0000000000..d161f5537a
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.api.GroupManagementApi;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases to verify ClosePipelineCommandHandler.
+ */
+public class TestClosePipelineCommandHandler {
+
+  private OzoneContainer ozoneContainer;
+  private StateContext stateContext;
+  private SCMConnectionManager connectionManager;
+  private RaftClient raftClient;
+  private GroupManagementApi raftClientGroupManager;
+  private OzoneConfiguration conf;
+
+  @BeforeEach
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    ozoneContainer = mock(OzoneContainer.class);
+    connectionManager = mock(SCMConnectionManager.class);
+    raftClient = mock(RaftClient.class);
+    raftClientGroupManager = mock(GroupManagementApi.class);
+    lenient().when(raftClient.getGroupManagementApi(
+        any(RaftPeerId.class))).thenReturn(raftClientGroupManager);
+  }
+
+  @Test
+  void testPipelineClose() throws IOException {
+    final List<DatanodeDetails> datanodes = getDatanodes();
+    final DatanodeDetails currentDatanode = datanodes.get(0);
+    final PipelineID pipelineID = PipelineID.randomId();
+    final SCMCommand<ClosePipelineCommandProto> command =
+        new ClosePipelineCommand(pipelineID);
+    stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf);
+
+    final boolean shouldDeleteRatisLogDirectory = true;
+    XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
+    when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
+    
when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory);
+    when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true);
+    Collection<RaftPeer> raftPeers = datanodes.stream()
+        .map(RatisHelper::toRaftPeer)
+        .collect(Collectors.toList());
+    when(writeChannel.getServer()).thenReturn(mock(RaftServer.class));
+    
when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode));
+    
when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers);
+
+    final ClosePipelineCommandHandler commandHandler =
+        new ClosePipelineCommandHandler((leader, tls) -> raftClient, 
MoreExecutors.directExecutor());
+    commandHandler.handle(command, ozoneContainer, stateContext, 
connectionManager);
+
+    verify(writeChannel, times(1))
+        .removeGroup(pipelineID.getProtobuf());
+
+    verify(raftClientGroupManager, times(2))
+        .remove(any(), eq(shouldDeleteRatisLogDirectory), 
eq(!shouldDeleteRatisLogDirectory));
+  }
+
+  @Test
+  void testCommandIdempotency() throws IOException {
+    final List<DatanodeDetails> datanodes = getDatanodes();
+    final DatanodeDetails currentDatanode = datanodes.get(0);
+    final PipelineID pipelineID = PipelineID.randomId();
+    final SCMCommand<ClosePipelineCommandProto> command =
+        new ClosePipelineCommand(pipelineID);
+    stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf);
+
+    XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
+    when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
+    // When the pipeline has been closed earlier by other datanode that 
received a close pipeline command
+    when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(false);
+
+    final ClosePipelineCommandHandler commandHandler =
+        new ClosePipelineCommandHandler(conf, MoreExecutors.directExecutor());
+    commandHandler.handle(command, ozoneContainer, stateContext, 
connectionManager);
+
+    verify(writeChannel, times(0))
+        .removeGroup(pipelineID.getProtobuf());
+
+    verify(raftClientGroupManager, times(0))
+        .remove(any(), anyBoolean(), anyBoolean());
+  }
+
+  private List<DatanodeDetails> getDatanodes() {
+    final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails();
+    final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails();
+    final DatanodeDetails dnThree = 
MockDatanodeDetails.randomDatanodeDetails();
+    return Arrays.asList(dnOne, dnTwo, dnThree);
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 6c68b80883..04c35cc1fe 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -17,29 +17,16 @@
  */
 package org.apache.hadoop.hdds.scm.pipeline;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Utility class for Ratis pipelines. Contains methods to create and destroy
- * ratis pipelines.
+ * Utility class for Ratis pipelines.
  */
 public final class RatisPipelineUtils {
 
@@ -48,56 +35,6 @@ public final class RatisPipelineUtils {
 
   private RatisPipelineUtils() {
   }
-  /**
-   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
-   * the datanodes.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @param ozoneConf       - Ozone configuration
-   * @param grpcTlsConfig
-   * @throws IOException
-   */
-  public static void destroyPipeline(Pipeline pipeline,
-      ConfigurationSource ozoneConf,
-      GrpcTlsConfig grpcTlsConfig) {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
-    }
-    for (DatanodeDetails dn : pipeline.getNodes()) {
-      try {
-        destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
-      } catch (IOException e) {
-        LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
-            pipeline.getId(), dn, e.getMessage());
-      }
-    }
-  }
-
-  /**
-   * Sends ratis command to destroy pipeline on the given datanode.
-   *
-   * @param dn         - Datanode on which pipeline needs to be destroyed
-   * @param pipelineID - ID of pipeline to be destroyed
-   * @param ozoneConf  - Ozone configuration
-   * @param grpcTlsConfig - grpc tls configuration
-   * @throws IOException
-   */
-  static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
-      ConfigurationSource ozoneConf, GrpcTlsConfig grpcTlsConfig)
-      throws IOException {
-    final String rpcType = ozoneConf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
-    final RaftPeer p = RatisHelper.toRaftPeer(dn);
-    try (RaftClient client = RatisHelper
-        .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-            retryPolicy, grpcTlsConfig, ozoneConf)) {
-      client.getGroupManagementApi(p.getId())
-          .remove(RaftGroupId.valueOf(pipelineID.getId()), true, false);
-    }
-  }
 
   /**
    * Return the list of pipelines who share the same set of datanodes


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to