This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3698-upgrade by this push:
     new bdd00a7  HDDS-4564. Prepare client should check every OM individually 
for the prepared check based on Txn ID. (#1692)
bdd00a7 is described below

commit bdd00a7bb5b4c0b8e5c49985826c984c19dc6a8f
Author: avijayanhwx <[email protected]>
AuthorDate: Wed Dec 16 09:33:18 2020 -0800

    HDDS-4564. Prepare client should check every OM individually for the 
prepared check based on Txn ID. (#1692)
---
 .../main/java/org/apache/hadoop/ozone/OmUtils.java | 27 +++++++
 .../ozone/om/protocol/OzoneManagerProtocol.java    | 16 ++++
 ...OzoneManagerProtocolClientSideTranslatorPB.java | 12 +++
 .../java/org/apache/hadoop/ozone/TestOmUtils.java  | 31 ++++++++
 .../dist/src/main/compose/ozone-ha/test.sh         |  4 +
 .../dist/src/main/smoketest/omha/om-prepare.robot  | 44 +++++++++++
 .../hadoop/ozone/om/TestOzoneManagerPrepare.java   |  2 +-
 .../src/main/proto/OmClientProtocol.proto          | 20 +++++
 .../ozone/om/request/upgrade/OMPrepareRequest.java | 21 ++++--
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  4 +-
 .../protocolPB/OzoneManagerRequestHandler.java     | 29 ++++++++
 .../hadoop/ozone/admin/om/PrepareSubCommand.java   | 87 +++++++++++++++++++++-
 12 files changed, 287 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index fc3e61b..c0235dc 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -31,10 +31,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
+import java.util.Set;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.hadoop.fs.Path;
@@ -53,6 +55,8 @@ import org.apache.hadoop.security.token.SecretManager;
 
 import com.google.common.base.Joiner;
 import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.hadoop.hdds.HddsUtils.getHostName;
 import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
@@ -260,6 +264,7 @@ public final class OmUtils {
     case DBUpdates:
     case ListMultipartUploads:
     case FinalizeUpgradeProgress:
+    case PrepareStatus:
       return true;
     case CreateVolume:
     case SetVolumeProperty:
@@ -699,4 +704,26 @@ public final class OmUtils {
 
     return keyName;
   }
+
+
+  /**
+   * For a given service ID, return th of configured OM hosts.
+   * @param conf configuration
+   * @param omServiceId service id
+   * @return Set of hosts.
+   */
+  public static Set<String> getOmHostsFromConfig(OzoneConfiguration conf,
+                                                 String omServiceId) {
+    Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf,
+        omServiceId);
+    Set<String> omHosts = new HashSet<>();
+    for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+      String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+          omServiceId, nodeId);
+      String rpcAddrStr = OmUtils.getOmRpcAddress(conf, rpcAddrKey);
+      Optional<String> hostName = getHostName(rpcAddrStr);
+      hostName.ifPresent(omHosts::add);
+    }
+    return omHosts;
+  }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 1bf9434..07e8044 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
 import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
@@ -602,4 +604,18 @@ public interface OzoneManagerProtocol
       throws IOException {
     return -1;
   }
+
+  /**
+   * Check if Ozone Manager is 'prepared' at a specific Txn Id.
+   * @param txnId passed in Txn Id
+   * @return PrepareStatus response
+   * @throws IOException on exception.
+   */
+  default PrepareStatusResponse getOzoneManagerPrepareStatus(long txnId)
+      throws IOException {
+    return PrepareStatusResponse.newBuilder()
+        .setCurrentTxnIndex(-1)
+        .setStatus(PrepareStatus.PREPARE_NOT_STARTED)
+        .build();
+  }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index cb0edcd..222883b 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -1575,6 +1575,18 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
     return prepareResponse.getTxnID();
   }
 
+  @Override
+  public PrepareStatusResponse getOzoneManagerPrepareStatus(long txnId)
+      throws IOException {
+    PrepareStatusRequest prepareStatusRequest =
+        PrepareStatusRequest.newBuilder().setTxnID(txnId).build();
+    OMRequest omRequest = createOMRequest(Type.PrepareStatus)
+        .setPrepareStatusRequest(prepareStatusRequest).build();
+    PrepareStatusResponse prepareStatusResponse =
+        handleError(submitRequest(omRequest)).getPrepareStatusResponse();
+    return prepareStatusResponse;
+  }
+
   @VisibleForTesting
   public OmTransport getTransport() {
     return transport;
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
index 7cb9819..e4e0d76 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
@@ -23,11 +23,15 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 
+import static org.apache.hadoop.ozone.OmUtils.getOmHostsFromConfig;
 import static org.apache.hadoop.ozone.OmUtils.getOzoneManagerServiceId;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_INTERNAL_SERVICE_ID;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -153,5 +157,32 @@ public class TestOmUtils {
   public void checkMaxTransactionID() {
     Assert.assertEquals((long) (Math.pow(2, 54) - 2), OmUtils.MAX_TRXN_ID);
   }
+
+  @Test
+  public void testGetOmHostsFromConfig() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String serviceId = "myOmId";
+
+    conf.set(OZONE_OM_NODES_KEY  + "." + serviceId, "omA,omB,omC");
+    conf.set(OZONE_OM_ADDRESS_KEY + "." + serviceId + ".omA", "omA-host:9861");
+    conf.set(OZONE_OM_ADDRESS_KEY + "." + serviceId + ".omB", "omB-host:9861");
+    conf.set(OZONE_OM_ADDRESS_KEY + "." + serviceId + ".omC", "omC-host:9861");
+
+    String serviceId2 = "myOmId2";
+    conf.set(OZONE_OM_NODES_KEY  + "." + serviceId2, "om1");
+    conf.set(OZONE_OM_ADDRESS_KEY + "." + serviceId2 + ".om1", "om1-host");
+
+    Set<String> hosts = getOmHostsFromConfig(conf, serviceId);
+    Assert.assertEquals(3, hosts.size());
+    Assert.assertTrue(hosts.contains("omA-host"));
+    Assert.assertTrue(hosts.contains("omB-host"));
+    Assert.assertTrue(hosts.contains("omC-host"));
+
+    hosts = getOmHostsFromConfig(conf, serviceId2);
+    Assert.assertEquals(1, hosts.size());
+    Assert.assertTrue(hosts.contains("om1-host"));
+
+    Assert.assertTrue(getOmHostsFromConfig(conf, "newId").isEmpty());
+  }
 }
 
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha/test.sh 
b/hadoop-ozone/dist/src/main/compose/ozone-ha/test.sh
index a14aa9c..7e024ed 100755
--- a/hadoop-ozone/dist/src/main/compose/ozone-ha/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozone-ha/test.sh
@@ -28,6 +28,10 @@ start_docker_env
 
 execute_robot_test scm basic/ozone-shell-single.robot
 
+# prepare test should be the last test to run, until a cancel prepare test is
+# added. (TODO)
+execute_robot_test scm omha/om-prepare.robot
+
 stop_docker_env
 
 generate_report
diff --git a/hadoop-ozone/dist/src/main/smoketest/omha/om-prepare.robot 
b/hadoop-ozone/dist/src/main/smoketest/omha/om-prepare.robot
new file mode 100644
index 0000000..9426d3e
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/omha/om-prepare.robot
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Documentation       Smoke test to start cluster with docker-compose 
environments.
+Library             OperatingSystem
+Library             String
+Library             BuiltIn
+Resource            ../commonlib.robot
+Test Timeout        5 minutes
+Test Setup          Run Keyword if    '${SECURITY_ENABLED}' == 'true'    Kinit 
test user     testuser     testuser.keytab
+Suite Setup         Create Specific OM data for prepare
+
+*** Keywords ***
+Create Specific OM data for prepare
+    # Freon data to make sure there are a reasonable number of transactions in 
the system.
+    Execute             ozone freon rk --replication-type=RATIS 
--num-of-volumes 1 --num-of-buckets 1 --num-of-keys 100
+    ${random} =         Generate Random String  5  [NUMBERS]
+    Set Suite Variable  ${volume_name}  ${random}-volume-for-prepare
+    Set Suite Variable  ${bucket_name}  ${random}-bucket-for-prepare
+    Execute             ozone sh volume create /${volume_name}
+    Execute             ozone sh bucket create /${volume_name}/${bucket_name}
+    Execute             ozone sh key put 
/${volume_name}/${bucket_name}/prepare-key /opt/hadoop/NOTICE.txt
+
+** Test Cases ***
+Prepare Ozone Manager
+    ${result} =        Execute      ozone admin om prepare -id=omservice
+                       Wait Until Keyword Succeeds      3min       10sec     
Should contain   ${result}   OM Preparation successful!
+
+Checks if the expected data is present in OM
+    ${result} =         Execute             ozone sh key info 
/${volume_name}/${bucket_name}/prepare-key
+                        Should contain      ${result}       \"name\" : 
\"prepare-key\"
\ No newline at end of file
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
index 20aff56..e5104a8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
@@ -52,7 +52,7 @@ public class TestOzoneManagerPrepare extends 
TestOzoneManagerHA {
 
   private final String keyPrefix = "key";
   private final int timeoutMillis = 30000;
-  private final static Long PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS = 300L;
+  private final static Long PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS = 120L;
   private final static Long PREPARE_FLUSH_INTERVAL_SECONDS = 5L;
 
   /**
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index d0df304..1d29d2a 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -75,6 +75,7 @@ enum Type {
   FinalizeUpgrade = 54;
   FinalizeUpgradeProgress = 55;
   Prepare = 56;
+  PrepareStatus = 57;
 
   GetDelegationToken = 61;
   RenewDelegationToken = 62;
@@ -147,6 +148,7 @@ message OMRequest {
   optional FinalizeUpgradeRequest           finalizeUpgradeRequest         = 
54;
   optional FinalizeUpgradeProgressRequest   finalizeUpgradeProgressRequest = 
55;
   optional PrepareRequest                   prepareRequest                 = 
56;
+  optional PrepareStatusRequest             prepareStatusRequest           = 
57;
 
   optional hadoop.common.GetDelegationTokenRequestProto 
getDelegationTokenRequest = 61;
   optional hadoop.common.RenewDelegationTokenRequestProto 
renewDelegationTokenRequest= 62;
@@ -223,6 +225,7 @@ message OMResponse {
   optional FinalizeUpgradeResponse           finalizeUpgradeResponse       = 
54;
   optional FinalizeUpgradeProgressResponse finalizeUpgradeProgressResponse = 
55;
   optional PrepareResponse                 prepareResponse                 = 
56;
+  optional PrepareStatusResponse           prepareStatusResponse           = 
57;
 
   optional GetDelegationTokenResponseProto getDelegationTokenResponse = 61;
   optional RenewDelegationTokenResponseProto renewDelegationTokenResponse = 62;
@@ -1078,6 +1081,23 @@ message PrepareResponse {
     required uint64 txnID = 1;
 }
 
+message PrepareStatusRequest {
+    required uint64 txnID = 1;
+}
+
+message PrepareStatusResponse {
+    enum PrepareStatus {
+        // TODO
+        // HDDS-4569 may introduce new states here, like marker file found
+        // but with different txn id. We can add them as make sense.
+        PREPARE_NOT_STARTED = 1;
+        PREPARE_IN_PROGRESS = 2;
+        PREPARE_COMPLETED = 3;
+    }
+    required PrepareStatus status = 1;
+    optional uint64 currentTxnIndex = 2;
+}
+
 message ServicePort {
     enum Type {
         RPC = 1;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index 84d7122..87b7657 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -116,14 +116,19 @@ public class OMPrepareRequest extends OMClientRequest {
 
       // TODO: Create marker file with txn index.
 
-      LOG.info("OM prepared at log index {}. Returning response {}",
+      LOG.info("OM {} prepared at log index {}. Returning response {}",
+          ozoneManager.getOMNodeId(),
           ozoneManager.getRatisSnapshotIndex(), omResponse);
     } catch (OMException e) {
+      LOG.error("Prepare Request Apply failed in {}. ",
+          ozoneManager.getOMNodeId(), e);
       response = new OMPrepareResponse(
           createErrorOMResponse(responseBuilder, e));
     } catch (InterruptedException | IOException e) {
       // Set error code so that prepare failure does not cause the OM to
       // terminate.
+      LOG.error("Prepare Request Apply failed in {}. ",
+          ozoneManager.getOMNodeId(), e);
       response = new OMPrepareResponse(
           createErrorOMResponse(responseBuilder, new OMException(e,
               OMException.ResultCodes.PREPARE_FAILED)));
@@ -195,11 +200,15 @@ public class OMPrepareRequest extends OMClientRequest {
     RaftLog raftLog = impl.getState().getLog();
     long raftLogIndex = raftLog.getLastEntryTermIndex().getIndex();
 
-    // Ensure that Ratis's in memory snapshot index is the same as the index
-    // of its last log entry.
-    if (snapshotIndex != raftLogIndex) {
-      throw new IOException("Snapshot index " + snapshotIndex + " does not " +
-          "match last log index " + raftLogIndex);
+    // We can have a case where the log has a meta transaction after the
+    // prepare request or another prepare request. If there is another
+    // prepare request, this one will end up purging that request.
+    // This means that an OM cannot support 2 prepare requests in the
+    // transaction pipeline (un-applied) at the same time.
+    if (raftLogIndex > snapshotIndex) {
+      LOG.warn("Snapshot index {} does not " +
+          "match last log index {}.", snapshotIndex, raftLogIndex);
+      snapshotIndex = raftLogIndex;
     }
 
     CompletableFuture<Long> purgeFuture =
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index a1ae676..9d77e50 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone.protocolPB;
 
 import static 
org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.getRequest;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.PrepareStatus;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -185,7 +186,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
   private OMResponse submitReadRequestToOM(OMRequest request)
       throws ServiceException {
     // Check if this OM is the leader.
-    if (omRatisServer.isLeader()) {
+    if (omRatisServer.isLeader() ||
+        request.getCmdType().equals(PrepareStatus)) {
       return handler.handleReadRequest(request);
     } else {
       throw createNotLeaderException();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index c2f9822..e0e7abb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -74,6 +74,9 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Multipa
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadListPartsResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
@@ -95,6 +98,8 @@ import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadInfo;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartInfo;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus.PREPARE_COMPLETED;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus.PREPARE_IN_PROGRESS;
 
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import org.slf4j.Logger;
@@ -214,6 +219,11 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
         responseBuilder
             .setFinalizeUpgradeProgressResponse(upgradeProgressResponse);
         break;
+      case PrepareStatus:
+        PrepareStatusResponse prepareStatusResponse =
+            getPrepareStatus(request.getPrepareStatusRequest());
+        responseBuilder.setPrepareStatusResponse(prepareStatusResponse);
+        break;
       default:
         responseBuilder.setSuccess(false);
         responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
@@ -623,6 +633,25 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
         .build();
   }
 
+  private PrepareStatusResponse getPrepareStatus(PrepareStatusRequest request)
+      throws IOException {
+    // TODO After HDDS-4569,
+    // When there is a global "prepared" state in OM, we can return
+    // PREPARE_NOT_STARTED instead of PREPARE_IN_PROGRESS appropriately.
+    PrepareStatus prepareStatus = null;
+    long txnID = request.getTxnID();
+    long ratisSnapshotIndex = impl.getRatisSnapshotIndex();
+    if (ratisSnapshotIndex != txnID) {
+      LOG.info("Last Txn Id = {}, PrepareStatusRequest Txn Id = {}",
+          ratisSnapshotIndex, request.getTxnID());
+      prepareStatus =  PREPARE_IN_PROGRESS;
+    } else {
+      prepareStatus = PREPARE_COMPLETED;
+    }
+    return PrepareStatusResponse.newBuilder().setStatus(prepareStatus)
+        .setCurrentTxnIndex(ratisSnapshotIndex).build();
+  }
+
   protected OzoneManager getOzoneManager() {
     return impl;
   }
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/PrepareSubCommand.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/PrepareSubCommand.java
index 9bc0737..e8e9dfb 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/PrepareSubCommand.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/PrepareSubCommand.java
@@ -17,10 +17,21 @@
 
 package org.apache.hadoop.ozone.admin.om;
 
+import static org.apache.hadoop.ozone.OmUtils.getOmHostsFromConfig;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus.PREPARE_COMPLETED;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hdds.cli.HddsVersionProvider;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
 
 import picocli.CommandLine;
 
@@ -53,7 +64,7 @@ public class PrepareSubCommand implements Callable<Void> {
       names = {"-tawt", "--transaction-apply-wait-timeout"},
       description = "Max time in SECONDS to wait for all transactions before" +
           "the prepare request to be applied to the OM DB.",
-      defaultValue = "300",
+      defaultValue = "120",
       hidden = true
   )
   private long txnApplyWaitTimeSeconds;
@@ -67,13 +78,85 @@ public class PrepareSubCommand implements Callable<Void> {
   )
   private long txnApplyCheckIntervalSeconds;
 
+  @CommandLine.Option(
+      names = {"-pct", "--prepare-check-interval"},
+      description = "Time in SECONDS to wait between successive checks for OM" 
+
+          " preparation.",
+      defaultValue = "10",
+      hidden = true
+  )
+  private long prepareCheckInterval;
+
+  @CommandLine.Option(
+      names = {"-pt", "--prepare-timeout"},
+      description = "Max time in SECONDS to wait for all OMs to be prepared",
+      defaultValue = "300",
+      hidden = true
+  )
+  private long prepareTimeOut;
+
   @Override
   public Void call() throws Exception {
     OzoneManagerProtocol client = parent.createOmClient(omServiceId);
     long prepareTxnId = client.prepareOzoneManager(txnApplyWaitTimeSeconds,
         txnApplyCheckIntervalSeconds);
     System.out.println("Ozone Manager Prepare Request successfully returned " +
-        "with Txn Id " + prepareTxnId);
+        "with Transaction Id : [" + prepareTxnId + "].");
+
+    Map<String, Boolean> omPreparedStatusMap = new HashMap<>();
+    Set<String> omHosts = getOmHostsFromConfig(
+        parent.getParent().getOzoneConf(), omServiceId);
+    omHosts.forEach(h -> omPreparedStatusMap.put(h, false));
+    Duration pTimeout = Duration.of(prepareTimeOut, ChronoUnit.SECONDS);
+    Duration pInterval = Duration.of(prepareCheckInterval, ChronoUnit.SECONDS);
+
+    System.out.println();
+    System.out.println("Checking individual OM instances for prepare request " 
+
+        "completion...");
+    long endTime = System.currentTimeMillis() + pTimeout.toMillis();
+    int expectedNumPreparedOms = omPreparedStatusMap.size();
+    int currentNumPreparedOms = 0;
+    while (System.currentTimeMillis() < endTime &&
+        currentNumPreparedOms < expectedNumPreparedOms) {
+      for (Map.Entry<String, Boolean> e : omPreparedStatusMap.entrySet()) {
+        if (!e.getValue()) {
+          String omHost = e.getKey();
+          try (OzoneManagerProtocol singleOmClient =
+                    parent.createOmClient(omServiceId, omHost, false)) {
+            PrepareStatusResponse response =
+                singleOmClient.getOzoneManagerPrepareStatus(prepareTxnId);
+            PrepareStatus status = response.getStatus();
+            System.out.println("OM : [" + omHost + "], Prepare " +
+                "Status : [" + status.name() + "], Current Transaction Id : [" 
+
+                response.getCurrentTxnIndex() + "]");
+            if (status.equals(PREPARE_COMPLETED)) {
+              e.setValue(true);
+              currentNumPreparedOms++;
+            }
+          } catch (IOException ioEx) {
+            System.out.println("Exception while checking preparation " +
+                "completeness for [" + omHost +
+                "], Error : [" + ioEx.getMessage() + "]");
+          }
+        }
+      }
+      if (currentNumPreparedOms < expectedNumPreparedOms) {
+        System.out.println("Waiting for " + prepareCheckInterval +
+            " seconds before retrying...");
+        Thread.sleep(pInterval.toMillis());
+      }
+    }
+    if (currentNumPreparedOms < expectedNumPreparedOms) {
+      throw new Exception("OM Preparation failed since all OMs are not " +
+          "prepared yet.");
+    } else {
+      System.out.println();
+      System.out.println("OM Preparation successful! ");
+      System.out.println("No new write requests will be allowed until " +
+          "preparation is cancelled or upgrade/downgrade is done.");
+    }
+
     return null;
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to