hanishakoneru commented on a change in pull request #2886:
URL: https://github.com/apache/ozone/pull/2886#discussion_r766906367



##########
File path: 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMConfiguration.java
##########
@@ -84,9 +84,21 @@ public OMConfiguration build() {
    * the new conf under current serviceId.
    */
   public Map<String, OMNodeDetails> getOmNodesInNewConf() {
-    return omNodesInNewConf.stream().collect(Collectors.toMap(
-        NodeDetails::getNodeId,
-        omNodeDetails -> omNodeDetails,
-        (nodeId, omNodeDetails) -> omNodeDetails));
+    return omNodesInNewConf.stream()

Review comment:
       omNodesInNewConf include the decommissioned nodes. But we want only 
active nodes from getOmNodesInNewConf().
   I will update the function name and JavaDoc to clarify.

##########
File path: 
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/DecommissionOMSubcommand.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.admin.om;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
+import org.apache.hadoop.ozone.om.protocol.OMConfiguration;
+import org.apache.hadoop.ozone.om.protocolPB.OMAdminProtocolClientSideImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.util.Strings;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DECOMMISSIONED_NODES_KEY;
+
+/**
+ * Handler of om roles command.
+ */
[email protected](
+    name = "decommission",
+    customSynopsis = "ozone admin om decommission -id=<om-service-id> " +
+        "-nodeid=<decommission-om-node-id> " +
+        "-hostname=<decommission-om-node-address> [options]",
+    description = "Decommission an OzoneManager." +
+        "\nNote - Add the node to be decommissioned to " +
+        OZONE_OM_DECOMMISSIONED_NODES_KEY + "config in ozone-site.xml of all " 
+
+        "OzoneManagers before proceeding with decommission." +
+        "\nNOTE THAT DECOMMISSIONING AN OM MIGHT RENDER THE CLUSTER TO LOSE " +
+        "HIGH AVAILABILITY",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class DecommissionOMSubcommand implements Callable<Void> {
+
+  @CommandLine.ParentCommand
+  private OMAdmin parent;
+
+  @CommandLine.Option(names = {"-id", "--service-id"},
+      description = "OM Service ID",
+      required = true)
+  private String omServiceId;
+
+  @CommandLine.Option(names = {"-nodeid"},
+      description = "NodeID of the OM to be decommissioned.",
+      required = true)
+  private String decommNodeId;
+
+  @CommandLine.Option(names = {"-hostname", "--node-host-address"},
+      description = "Host name/address of the OM to be decommissioned.",
+      required = true)
+  private String hostname;
+
+  private InetAddress hostInetAddress;
+
+  @CommandLine.Option(
+      names = {"--force"},
+      description = "This option will skip checking whether OM configs " +
+          "have been updated with the decommissioned node added to " +
+          "ozone.om.decommissioned.nodes config in ozone-site.xml."
+  )
+  private boolean force;
+
+  private OzoneConfiguration ozoneConf;
+  private UserGroupInformation user;
+
+  @Override
+  public Void call() throws IOException {
+    ozoneConf = parent.getParent().getOzoneConf();
+    user = parent.getParent().getUser();
+
+    verifyNodeIdAndHostAddress();
+    if (!force) {
+      verifyConfigUpdatedOnAllOMs();
+    }
+
+    // Proceed with decommissioning the OM by contacting the current OM
+    // leader.
+    try (OMAdminProtocolClientSideImpl omAdminProtocolClient =
+             OMAdminProtocolClientSideImpl.createProxyForOMHA(ozoneConf, user,
+                 omServiceId)) {
+      OMNodeDetails decommNodeDetails = new OMNodeDetails.Builder()
+          .setOMNodeId(decommNodeId)
+          .setHostAddress(hostInetAddress.getHostAddress())
+          .build();
+      omAdminProtocolClient.decommission(decommNodeDetails);
+
+      System.out.println("Successfully decommissioned OM " + decommNodeId);
+    } catch (IOException e) {
+      System.out.println("Failed to decommission OM " + decommNodeId);
+      throw e;
+    }
+    return null;
+  }
+
+  /**
+   * Verify that the provided nodeId and host address correspond to the same
+   * OM in the configs.
+   */
+  private void verifyNodeIdAndHostAddress() throws IOException {
+    String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+        omServiceId, decommNodeId);
+    String rpcAddrStr = OmUtils.getOmRpcAddress(ozoneConf, rpcAddrKey);
+    if (rpcAddrStr == null || rpcAddrStr.isEmpty()) {
+      throw new IOException("There is no OM corresponding to " + decommNodeId
+          + "in the configuration.");
+    }
+
+    hostInetAddress = InetAddress.getByName(hostname);
+    InetAddress rpcAddressFromConfig = InetAddress.getByName(
+        rpcAddrStr.split(":")[0]);
+
+    if (!hostInetAddress.equals(rpcAddressFromConfig)) {
+      throw new IOException("OM " + decommNodeId + "'s host address in " +
+          "config - " + rpcAddressFromConfig.getHostAddress() + " does not " +
+          "match the provided host address " + hostInetAddress);
+    }
+  }
+
+  /**
+   * Verify that the to be decommissioned node is added to the
+   * OZONE_OM_DECOMMISSIONED_NODES_KEY.<SERVICE_ID> config in ozone-site.xml
+   * of all OMs.
+   */
+  private void verifyConfigUpdatedOnAllOMs() throws IOException {
+    String decommNodesKey = ConfUtils.addKeySuffixes(
+        OZONE_OM_DECOMMISSIONED_NODES_KEY, omServiceId);
+    Collection<String> decommNodes = ozoneConf.getTrimmedStringCollection(

Review comment:
       Yes, would have to update config on all OMs atleast before 
decommissioning.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
##########
@@ -330,14 +330,45 @@ public void addOMToRatisRing(OMNodeDetails newOMNode) 
throws IOException {
     }
   }
 
+  /**
+   * Remove decommissioned OM node from Ratis ring
+   */
+  public void removeOMFromRatisRing(OMNodeDetails removeOMNode)
+      throws IOException {
+    Preconditions.checkNotNull(removeOMNode);
+
+    String removeNodeId = removeOMNode.getNodeId();

Review comment:
       With the current patch, we cannot remove leader OM. So yes, OM can be 
stopped before decommissioning. But if there are only 2 OMs in the ring, and we 
stop the follower to decommission, then the Leader will not be able to process 
the SetConf request as it won't get the quorum.
   So, the more general rule would be that decommissioning OM should not be the 
leader.

##########
File path: 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestAddRemoveOzoneManager.java
##########
@@ -333,4 +340,78 @@ public void testForceBootstrap() throws Exception {
     // Verify that the newly bootstrapped OM is running
     Assert.assertTrue(newOM.isRunning());
   }
+
+  /**
+   * Decommissioning Tests:
+   * 1. Stop an OM and decommission it from a 3 node cluster
+   * 2. Decommission another OM without stopping it.
+   * 3.
+   */
+  @Test
+  public void testDecommission() throws Exception {
+    setupCluster(3);
+    user = UserGroupInformation.getCurrentUser();
+
+    // Stop the 3rd OM and decommission it
+    String omNodeId3 = cluster.getOzoneManager(2).getOMNodeId();
+    cluster.stopOzoneManager(omNodeId3);
+    decommissionOM(omNodeId3);
+
+    // Decommission an OM and then stop it. Stopping OM before will lead
+    // to no quorum and there will not be a elected leader OM to process the
+    // decommission request.
+    String omNodeId2 = cluster.getOzoneManager(1).getOMNodeId();
+    decommissionOM(omNodeId2);
+    cluster.stopOzoneManager(omNodeId2);
+
+    // Verify that we can read/ write to the cluster with only 1 OM.
+    OzoneVolume volume = objectStore.getVolume(VOLUME_NAME);

Review comment:
       Once we remove 2 nodes, the ratis ring becomes a single node ring.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
##########
@@ -1596,37 +1602,66 @@ public void bootstrap(OMNodeDetails newOMNode) throws 
IOException {
 
   /**
    * When OMStateMachine receives a configuration change update, it calls
-   * this function to update the peers list, if required.
-   */
-  public void updatePeerList(List<String> omNodeIds) {
-    List<String> ratisServerPeerIdsList = omRatisServer.getPeerIds();
-    for (String omNodeId : omNodeIds) {
-      // Check if the OM NodeID is already present in the peer list or its
-      // the local NodeID.
-      if (!peerNodesMap.containsKey(omNodeId) && !isCurrentNode(omNodeId)) {
+   * this function to update the peers list, if required. The configuration
+   * change could be to add or to remove an OM from the ring.
+   */
+  public void updatePeerList(List<String> newPeers) {
+    List<String> currentPeers = omRatisServer.getPeerIds();
+
+    // NodeIds present in new node list and not in current peer list are the
+    // bootstapped OMs and should be added to the peer list
+    List<String> bootstrappedOMs = new ArrayList<>();
+    bootstrappedOMs.addAll(newPeers);
+    bootstrappedOMs.removeAll(currentPeers);
+
+    // NodeIds present in current peer list but not in new node list are the
+    // decommissioned OMs and should be removed from the peer list
+    List<String> decommissionedOMs = new ArrayList<>();
+    decommissionedOMs.addAll(currentPeers);
+    decommissionedOMs.removeAll(newPeers);
+
+    // Add bootstrapped OMs to peer list
+    for (String omNodeId : bootstrappedOMs) {
+      // Check if its the local nodeId (bootstrapping OM)
+      if (isCurrentNode(omNodeId)) {
+        // For a Bootstrapping OM, none of the peers are added to it's
+        // RatisServer's peer list and it needs to be updated here after
+        // receiving the conf change notification from Ratis.
+        for (String peerNodeId : newPeers) {
+          if (peerNodeId.equals(omNodeId)) {
+            omRatisServer.addRaftPeer(omNodeDetails);
+          } else {
+            omRatisServer.addRaftPeer(peerNodesMap.get(peerNodeId));
+          }
+        }
+      } else {
+        // For other nodes, add bootstrapping OM to OM peer list (which
+        // internally adds to Ratis peer list too)
         try {
           addOMNodeToPeers(omNodeId);
         } catch (IOException e) {
           LOG.error("Fatal Error: Shutting down the system as otherwise it " +
               "could lead to OM state divergence.", e);
           exitManager.forceExit(1, e, LOG);
         }
+      }
+    }
+
+    // Remove decommissioned OMs from peer list
+    for (String omNodeId : decommissionedOMs) {
+      if (isCurrentNode(omNodeId)) {
+        // Decommissioning Node should not receive the configuration change
+        // request. Shut it down.
+        String errorMsg = "Shutting down as OM has been decommissioned.";
+        LOG.error("Fatal Error: {}", errorMsg);
+        exitManager.forceExit(1, errorMsg, LOG);
       } else {
-        // Check if the OMNodeID is present in the RatisServer's peer list
-        if (!ratisServerPeerIdsList.contains(omNodeId)) {
-          // This can happen on a bootstrapping OM. The peer information
-          // would be present in OzoneManager but OMRatisServer peer list
-          // would not have the peers list. OMRatisServer peer list of
-          // bootstrapping node should be updated after it gets the RaftConf
-          // through Ratis.
-          if (isCurrentNode(omNodeId)) {
-            // OM Ratis server has the current node also in the peer list as
-            // this is the Raft Group peers list. Hence, add the current node
-            // also to Ratis peers list if not present.
-            omRatisServer.addRaftPeer(omNodeDetails);
-          } else {
-            omRatisServer.addRaftPeer(peerNodesMap.get(omNodeId));
-          }
+        // Remove decommissioned node from peer list (which internally
+        // removed from Ratis peer list too)
+        try {
+          removeOMNodeFromPeers(omNodeId);
+        } catch (IOException e) {
+          e.printStackTrace();

Review comment:
       Yes, updated.

##########
File path: 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
##########
@@ -123,12 +175,47 @@ public OMConfiguration getOMConfiguration() throws 
IOException {
       }
       return omMedatataBuilder.build();
     } catch (ServiceException e) {
-      LOG.error("Failed to retrieve configuration of OM {}",
-          remoteOmNodeDetails.getOMPrintInfo(), e);
+      LOG.error("Failed to retrieve configuration of OM {}", omPrintInfo, e);
     }
     return null;
   }
 
+  @Override
+  public void decommission(OMNodeDetails removeOMNode) throws IOException {
+    DecommissionOMRequest decommOMRequest = DecommissionOMRequest.newBuilder()
+        .setNodeId(removeOMNode.getNodeId())
+        .setNodeAddress(removeOMNode.getHostAddress())
+        .build();
+
+    DecommissionOMResponse response;
+    try {
+      response = rpcProxy.decommission(NULL_RPC_CONTROLLER, decommOMRequest);
+    } catch (ServiceException e) {
+      OMNotLeaderException notLeaderException =
+          OMFailoverProxyProvider.getNotLeaderException(e);
+      if (notLeaderException != null) {
+        throwException(notLeaderException.getMessage());
+      }
+
+      OMLeaderNotReadyException leaderNotReadyException =
+          OMFailoverProxyProvider.getLeaderNotReadyException(e);
+      if (leaderNotReadyException != null) {
+        throwException(leaderNotReadyException.getMessage());
+      }
+      throw ProtobufHelper.getRemoteException(e);
+    }
+
+    if (!response.getSuccess()) {
+      throwException("Decommission Request to " + omPrintInfo +

Review comment:
       Done.

##########
File path: 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
##########
@@ -86,16 +101,53 @@ public OMAdminProtocolClientSideImpl(ConfigurationSource 
conf,
     OMAdminProtocolPB proxy = RPC.getProtocolProxy(
         OMAdminProtocolPB.class,
         RPC.getProtocolVersion(OMAdminProtocolPB.class),
-        remoteOmNodeDetails.getRpcAddress(), ugi, hadoopConf,
+        omNodeDetails.getRpcAddress(), ugi, hadoopConf,
         NetUtils.getDefaultSocketFactory(hadoopConf),
         (int) OmUtils.getOMClientRpcTimeOut(conf), connectionRetryPolicy)
         .getProxy();
 
     RetryPolicy retryPolicy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(
         10, 1000, TimeUnit.MILLISECONDS);
 
-    this.rpcProxy = (OMAdminProtocolPB) RetryProxy.create(
+    OMAdminProtocolPB rpcProxy = (OMAdminProtocolPB) RetryProxy.create(
         OMAdminProtocolPB.class, proxy, retryPolicy);
+
+    return new OMAdminProtocolClientSideImpl(rpcProxy,
+        omNodeDetails.getOMPrintInfo());
+  }
+
+  /**
+   * Create OM Admin Protocol Client for contacting the OM ring (failover
+   * till the current OM leader is reached). Use for admin commands such as
+   * decommissionOM which are should reach the OM leader.
+   */
+  public static OMAdminProtocolClientSideImpl createProxyForOMHA(
+      OzoneConfiguration conf, UserGroupInformation ugi, String omServiceId)
+      throws IOException {
+
+    RPC.setProtocolEngine(OzoneConfiguration.of(conf),
+        OMAdminProtocolPB.class, ProtobufRpcEngine.class);
+
+    OMFailoverProxyProvider omFailoverProxyProvider =
+        new OMFailoverProxyProvider(conf, ugi, omServiceId,
+            OMAdminProtocolPB.class);
+
+    // Multiple the max number of retries with number of OMs to calculate the

Review comment:
       Done.

##########
File path: 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
##########
@@ -86,16 +101,53 @@ public OMAdminProtocolClientSideImpl(ConfigurationSource 
conf,
     OMAdminProtocolPB proxy = RPC.getProtocolProxy(
         OMAdminProtocolPB.class,
         RPC.getProtocolVersion(OMAdminProtocolPB.class),
-        remoteOmNodeDetails.getRpcAddress(), ugi, hadoopConf,
+        omNodeDetails.getRpcAddress(), ugi, hadoopConf,
         NetUtils.getDefaultSocketFactory(hadoopConf),
         (int) OmUtils.getOMClientRpcTimeOut(conf), connectionRetryPolicy)
         .getProxy();
 
     RetryPolicy retryPolicy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(
         10, 1000, TimeUnit.MILLISECONDS);
 
-    this.rpcProxy = (OMAdminProtocolPB) RetryProxy.create(
+    OMAdminProtocolPB rpcProxy = (OMAdminProtocolPB) RetryProxy.create(
         OMAdminProtocolPB.class, proxy, retryPolicy);
+
+    return new OMAdminProtocolClientSideImpl(rpcProxy,
+        omNodeDetails.getOMPrintInfo());
+  }
+
+  /**
+   * Create OM Admin Protocol Client for contacting the OM ring (failover
+   * till the current OM leader is reached). Use for admin commands such as
+   * decommissionOM which are should reach the OM leader.

Review comment:
       Removed.

##########
File path: 
hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/cli/OzoneAdmin.java
##########
@@ -60,6 +65,16 @@ public OzoneConfiguration getOzoneConf() {
     return ozoneConf;
   }
 
+  public UserGroupInformation getUser() throws IOException {
+    if (user == null) {
+      user = Server.getRemoteUser();

Review comment:
       Updated to get current user.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to