This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3698-upgrade by this push:
new bdd00a7 HDDS-4564. Prepare client should check every OM individually
for the prepared check based on Txn ID. (#1692)
bdd00a7 is described below
commit bdd00a7bb5b4c0b8e5c49985826c984c19dc6a8f
Author: avijayanhwx <[email protected]>
AuthorDate: Wed Dec 16 09:33:18 2020 -0800
HDDS-4564. Prepare client should check every OM individually for the
prepared check based on Txn ID. (#1692)
---
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 27 +++++++
.../ozone/om/protocol/OzoneManagerProtocol.java | 16 ++++
...OzoneManagerProtocolClientSideTranslatorPB.java | 12 +++
.../java/org/apache/hadoop/ozone/TestOmUtils.java | 31 ++++++++
.../dist/src/main/compose/ozone-ha/test.sh | 4 +
.../dist/src/main/smoketest/omha/om-prepare.robot | 44 +++++++++++
.../hadoop/ozone/om/TestOzoneManagerPrepare.java | 2 +-
.../src/main/proto/OmClientProtocol.proto | 20 +++++
.../ozone/om/request/upgrade/OMPrepareRequest.java | 21 ++++--
...OzoneManagerProtocolServerSideTranslatorPB.java | 4 +-
.../protocolPB/OzoneManagerRequestHandler.java | 29 ++++++++
.../hadoop/ozone/admin/om/PrepareSubCommand.java | 87 +++++++++++++++++++++-
12 files changed, 287 insertions(+), 10 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index fc3e61b..c0235dc 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -31,10 +31,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
+import java.util.Set;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.hadoop.fs.Path;
@@ -53,6 +55,8 @@ import org.apache.hadoop.security.token.SecretManager;
import com.google.common.base.Joiner;
import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.hadoop.hdds.HddsUtils.getHostName;
import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
@@ -260,6 +264,7 @@ public final class OmUtils {
case DBUpdates:
case ListMultipartUploads:
case FinalizeUpgradeProgress:
+ case PrepareStatus:
return true;
case CreateVolume:
case SetVolumeProperty:
@@ -699,4 +704,26 @@ public final class OmUtils {
return keyName;
}
+
+
+ /**
+ * For a given service ID, return th of configured OM hosts.
+ * @param conf configuration
+ * @param omServiceId service id
+ * @return Set of hosts.
+ */
+ public static Set<String> getOmHostsFromConfig(OzoneConfiguration conf,
+ String omServiceId) {
+ Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf,
+ omServiceId);
+ Set<String> omHosts = new HashSet<>();
+ for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+ String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ omServiceId, nodeId);
+ String rpcAddrStr = OmUtils.getOmRpcAddress(conf, rpcAddrKey);
+ Optional<String> hostName = getHostName(rpcAddrStr);
+ hostName.ifPresent(omHosts::add);
+ }
+ return omHosts;
+ }
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 1bf9434..07e8044 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
@@ -602,4 +604,18 @@ public interface OzoneManagerProtocol
throws IOException {
return -1;
}
+
+ /**
+ * Check if Ozone Manager is 'prepared' at a specific Txn Id.
+ * @param txnId passed in Txn Id
+ * @return PrepareStatus response
+ * @throws IOException on exception.
+ */
+ default PrepareStatusResponse getOzoneManagerPrepareStatus(long txnId)
+ throws IOException {
+ return PrepareStatusResponse.newBuilder()
+ .setCurrentTxnIndex(-1)
+ .setStatus(PrepareStatus.PREPARE_NOT_STARTED)
+ .build();
+ }
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index cb0edcd..222883b 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -1575,6 +1575,18 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
return prepareResponse.getTxnID();
}
+ @Override
+ public PrepareStatusResponse getOzoneManagerPrepareStatus(long txnId)
+ throws IOException {
+ PrepareStatusRequest prepareStatusRequest =
+ PrepareStatusRequest.newBuilder().setTxnID(txnId).build();
+ OMRequest omRequest = createOMRequest(Type.PrepareStatus)
+ .setPrepareStatusRequest(prepareStatusRequest).build();
+ PrepareStatusResponse prepareStatusResponse =
+ handleError(submitRequest(omRequest)).getPrepareStatusResponse();
+ return prepareStatusResponse;
+ }
+
@VisibleForTesting
public OmTransport getTransport() {
return transport;
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
index 7cb9819..e4e0d76 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
@@ -23,11 +23,15 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import static org.apache.hadoop.ozone.OmUtils.getOmHostsFromConfig;
import static org.apache.hadoop.ozone.OmUtils.getOzoneManagerServiceId;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_INTERNAL_SERVICE_ID;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -153,5 +157,32 @@ public class TestOmUtils {
public void checkMaxTransactionID() {
Assert.assertEquals((long) (Math.pow(2, 54) - 2), OmUtils.MAX_TRXN_ID);
}
+
+ @Test
+ public void testGetOmHostsFromConfig() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ String serviceId = "myOmId";
+
+ conf.set(OZONE_OM_NODES_KEY + "." + serviceId, "omA,omB,omC");
+ conf.set(OZONE_OM_ADDRESS_KEY + "." + serviceId + ".omA", "omA-host:9861");
+ conf.set(OZONE_OM_ADDRESS_KEY + "." + serviceId + ".omB", "omB-host:9861");
+ conf.set(OZONE_OM_ADDRESS_KEY + "." + serviceId + ".omC", "omC-host:9861");
+
+ String serviceId2 = "myOmId2";
+ conf.set(OZONE_OM_NODES_KEY + "." + serviceId2, "om1");
+ conf.set(OZONE_OM_ADDRESS_KEY + "." + serviceId2 + ".om1", "om1-host");
+
+ Set<String> hosts = getOmHostsFromConfig(conf, serviceId);
+ Assert.assertEquals(3, hosts.size());
+ Assert.assertTrue(hosts.contains("omA-host"));
+ Assert.assertTrue(hosts.contains("omB-host"));
+ Assert.assertTrue(hosts.contains("omC-host"));
+
+ hosts = getOmHostsFromConfig(conf, serviceId2);
+ Assert.assertEquals(1, hosts.size());
+ Assert.assertTrue(hosts.contains("om1-host"));
+
+ Assert.assertTrue(getOmHostsFromConfig(conf, "newId").isEmpty());
+ }
}
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha/test.sh
b/hadoop-ozone/dist/src/main/compose/ozone-ha/test.sh
index a14aa9c..7e024ed 100755
--- a/hadoop-ozone/dist/src/main/compose/ozone-ha/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozone-ha/test.sh
@@ -28,6 +28,10 @@ start_docker_env
execute_robot_test scm basic/ozone-shell-single.robot
+# prepare test should be the last test to run, until a cancel prepare test is
+# added. (TODO)
+execute_robot_test scm omha/om-prepare.robot
+
stop_docker_env
generate_report
diff --git a/hadoop-ozone/dist/src/main/smoketest/omha/om-prepare.robot
b/hadoop-ozone/dist/src/main/smoketest/omha/om-prepare.robot
new file mode 100644
index 0000000..9426d3e
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/omha/om-prepare.robot
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Documentation Smoke test to start cluster with docker-compose
environments.
+Library OperatingSystem
+Library String
+Library BuiltIn
+Resource ../commonlib.robot
+Test Timeout 5 minutes
+Test Setup Run Keyword if '${SECURITY_ENABLED}' == 'true' Kinit
test user testuser testuser.keytab
+Suite Setup Create Specific OM data for prepare
+
+*** Keywords ***
+Create Specific OM data for prepare
+ # Freon data to make sure there are a reasonable number of transactions in
the system.
+ Execute ozone freon rk --replication-type=RATIS
--num-of-volumes 1 --num-of-buckets 1 --num-of-keys 100
+ ${random} = Generate Random String 5 [NUMBERS]
+ Set Suite Variable ${volume_name} ${random}-volume-for-prepare
+ Set Suite Variable ${bucket_name} ${random}-bucket-for-prepare
+ Execute ozone sh volume create /${volume_name}
+ Execute ozone sh bucket create /${volume_name}/${bucket_name}
+ Execute ozone sh key put
/${volume_name}/${bucket_name}/prepare-key /opt/hadoop/NOTICE.txt
+
+** Test Cases ***
+Prepare Ozone Manager
+ ${result} = Execute ozone admin om prepare -id=omservice
+ Wait Until Keyword Succeeds 3min 10sec
Should contain ${result} OM Preparation successful!
+
+Checks if the expected data is present in OM
+ ${result} = Execute ozone sh key info
/${volume_name}/${bucket_name}/prepare-key
+ Should contain ${result} \"name\" :
\"prepare-key\"
\ No newline at end of file
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
index 20aff56..e5104a8 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
@@ -52,7 +52,7 @@ public class TestOzoneManagerPrepare extends
TestOzoneManagerHA {
private final String keyPrefix = "key";
private final int timeoutMillis = 30000;
- private final static Long PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS = 300L;
+ private final static Long PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS = 120L;
private final static Long PREPARE_FLUSH_INTERVAL_SECONDS = 5L;
/**
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index d0df304..1d29d2a 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -75,6 +75,7 @@ enum Type {
FinalizeUpgrade = 54;
FinalizeUpgradeProgress = 55;
Prepare = 56;
+ PrepareStatus = 57;
GetDelegationToken = 61;
RenewDelegationToken = 62;
@@ -147,6 +148,7 @@ message OMRequest {
optional FinalizeUpgradeRequest finalizeUpgradeRequest =
54;
optional FinalizeUpgradeProgressRequest finalizeUpgradeProgressRequest =
55;
optional PrepareRequest prepareRequest =
56;
+ optional PrepareStatusRequest prepareStatusRequest =
57;
optional hadoop.common.GetDelegationTokenRequestProto
getDelegationTokenRequest = 61;
optional hadoop.common.RenewDelegationTokenRequestProto
renewDelegationTokenRequest= 62;
@@ -223,6 +225,7 @@ message OMResponse {
optional FinalizeUpgradeResponse finalizeUpgradeResponse =
54;
optional FinalizeUpgradeProgressResponse finalizeUpgradeProgressResponse =
55;
optional PrepareResponse prepareResponse =
56;
+ optional PrepareStatusResponse prepareStatusResponse =
57;
optional GetDelegationTokenResponseProto getDelegationTokenResponse = 61;
optional RenewDelegationTokenResponseProto renewDelegationTokenResponse = 62;
@@ -1078,6 +1081,23 @@ message PrepareResponse {
required uint64 txnID = 1;
}
+message PrepareStatusRequest {
+ required uint64 txnID = 1;
+}
+
+message PrepareStatusResponse {
+ enum PrepareStatus {
+ // TODO
+ // HDDS-4569 may introduce new states here, like marker file found
+ // but with different txn id. We can add them as make sense.
+ PREPARE_NOT_STARTED = 1;
+ PREPARE_IN_PROGRESS = 2;
+ PREPARE_COMPLETED = 3;
+ }
+ required PrepareStatus status = 1;
+ optional uint64 currentTxnIndex = 2;
+}
+
message ServicePort {
enum Type {
RPC = 1;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index 84d7122..87b7657 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -116,14 +116,19 @@ public class OMPrepareRequest extends OMClientRequest {
// TODO: Create marker file with txn index.
- LOG.info("OM prepared at log index {}. Returning response {}",
+ LOG.info("OM {} prepared at log index {}. Returning response {}",
+ ozoneManager.getOMNodeId(),
ozoneManager.getRatisSnapshotIndex(), omResponse);
} catch (OMException e) {
+ LOG.error("Prepare Request Apply failed in {}. ",
+ ozoneManager.getOMNodeId(), e);
response = new OMPrepareResponse(
createErrorOMResponse(responseBuilder, e));
} catch (InterruptedException | IOException e) {
// Set error code so that prepare failure does not cause the OM to
// terminate.
+ LOG.error("Prepare Request Apply failed in {}. ",
+ ozoneManager.getOMNodeId(), e);
response = new OMPrepareResponse(
createErrorOMResponse(responseBuilder, new OMException(e,
OMException.ResultCodes.PREPARE_FAILED)));
@@ -195,11 +200,15 @@ public class OMPrepareRequest extends OMClientRequest {
RaftLog raftLog = impl.getState().getLog();
long raftLogIndex = raftLog.getLastEntryTermIndex().getIndex();
- // Ensure that Ratis's in memory snapshot index is the same as the index
- // of its last log entry.
- if (snapshotIndex != raftLogIndex) {
- throw new IOException("Snapshot index " + snapshotIndex + " does not " +
- "match last log index " + raftLogIndex);
+ // We can have a case where the log has a meta transaction after the
+ // prepare request or another prepare request. If there is another
+ // prepare request, this one will end up purging that request.
+ // This means that an OM cannot support 2 prepare requests in the
+ // transaction pipeline (un-applied) at the same time.
+ if (raftLogIndex > snapshotIndex) {
+ LOG.warn("Snapshot index {} does not " +
+ "match last log index {}.", snapshotIndex, raftLogIndex);
+ snapshotIndex = raftLogIndex;
}
CompletableFuture<Long> purgeFuture =
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index a1ae676..9d77e50 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.protocolPB;
import static
org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.getRequest;
+import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.PrepareStatus;
import java.io.IOException;
import java.util.Optional;
@@ -185,7 +186,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
private OMResponse submitReadRequestToOM(OMRequest request)
throws ServiceException {
// Check if this OM is the leader.
- if (omRatisServer.isLeader()) {
+ if (omRatisServer.isLeader() ||
+ request.getCmdType().equals(PrepareStatus)) {
return handler.handleReadRequest(request);
} else {
throw createNotLeaderException();
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index c2f9822..e0e7abb 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -74,6 +74,9 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Multipa
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadListPartsResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
@@ -95,6 +98,8 @@ import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadInfo;
import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartInfo;
+import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus.PREPARE_COMPLETED;
+import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus.PREPARE_IN_PROGRESS;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
import org.slf4j.Logger;
@@ -214,6 +219,11 @@ public class OzoneManagerRequestHandler implements
RequestHandler {
responseBuilder
.setFinalizeUpgradeProgressResponse(upgradeProgressResponse);
break;
+ case PrepareStatus:
+ PrepareStatusResponse prepareStatusResponse =
+ getPrepareStatus(request.getPrepareStatusRequest());
+ responseBuilder.setPrepareStatusResponse(prepareStatusResponse);
+ break;
default:
responseBuilder.setSuccess(false);
responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
@@ -623,6 +633,25 @@ public class OzoneManagerRequestHandler implements
RequestHandler {
.build();
}
+ private PrepareStatusResponse getPrepareStatus(PrepareStatusRequest request)
+ throws IOException {
+ // TODO After HDDS-4569,
+ // When there is a global "prepared" state in OM, we can return
+ // PREPARE_NOT_STARTED instead of PREPARE_IN_PROGRESS appropriately.
+ PrepareStatus prepareStatus = null;
+ long txnID = request.getTxnID();
+ long ratisSnapshotIndex = impl.getRatisSnapshotIndex();
+ if (ratisSnapshotIndex != txnID) {
+ LOG.info("Last Txn Id = {}, PrepareStatusRequest Txn Id = {}",
+ ratisSnapshotIndex, request.getTxnID());
+ prepareStatus = PREPARE_IN_PROGRESS;
+ } else {
+ prepareStatus = PREPARE_COMPLETED;
+ }
+ return PrepareStatusResponse.newBuilder().setStatus(prepareStatus)
+ .setCurrentTxnIndex(ratisSnapshotIndex).build();
+ }
+
protected OzoneManager getOzoneManager() {
return impl;
}
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/PrepareSubCommand.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/PrepareSubCommand.java
index 9bc0737..e8e9dfb 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/PrepareSubCommand.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/PrepareSubCommand.java
@@ -17,10 +17,21 @@
package org.apache.hadoop.ozone.admin.om;
+import static org.apache.hadoop.ozone.OmUtils.getOmHostsFromConfig;
+import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus.PREPARE_COMPLETED;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
import picocli.CommandLine;
@@ -53,7 +64,7 @@ public class PrepareSubCommand implements Callable<Void> {
names = {"-tawt", "--transaction-apply-wait-timeout"},
description = "Max time in SECONDS to wait for all transactions before" +
"the prepare request to be applied to the OM DB.",
- defaultValue = "300",
+ defaultValue = "120",
hidden = true
)
private long txnApplyWaitTimeSeconds;
@@ -67,13 +78,85 @@ public class PrepareSubCommand implements Callable<Void> {
)
private long txnApplyCheckIntervalSeconds;
+ @CommandLine.Option(
+ names = {"-pct", "--prepare-check-interval"},
+ description = "Time in SECONDS to wait between successive checks for OM"
+
+ " preparation.",
+ defaultValue = "10",
+ hidden = true
+ )
+ private long prepareCheckInterval;
+
+ @CommandLine.Option(
+ names = {"-pt", "--prepare-timeout"},
+ description = "Max time in SECONDS to wait for all OMs to be prepared",
+ defaultValue = "300",
+ hidden = true
+ )
+ private long prepareTimeOut;
+
@Override
public Void call() throws Exception {
OzoneManagerProtocol client = parent.createOmClient(omServiceId);
long prepareTxnId = client.prepareOzoneManager(txnApplyWaitTimeSeconds,
txnApplyCheckIntervalSeconds);
System.out.println("Ozone Manager Prepare Request successfully returned " +
- "with Txn Id " + prepareTxnId);
+ "with Transaction Id : [" + prepareTxnId + "].");
+
+ Map<String, Boolean> omPreparedStatusMap = new HashMap<>();
+ Set<String> omHosts = getOmHostsFromConfig(
+ parent.getParent().getOzoneConf(), omServiceId);
+ omHosts.forEach(h -> omPreparedStatusMap.put(h, false));
+ Duration pTimeout = Duration.of(prepareTimeOut, ChronoUnit.SECONDS);
+ Duration pInterval = Duration.of(prepareCheckInterval, ChronoUnit.SECONDS);
+
+ System.out.println();
+ System.out.println("Checking individual OM instances for prepare request "
+
+ "completion...");
+ long endTime = System.currentTimeMillis() + pTimeout.toMillis();
+ int expectedNumPreparedOms = omPreparedStatusMap.size();
+ int currentNumPreparedOms = 0;
+ while (System.currentTimeMillis() < endTime &&
+ currentNumPreparedOms < expectedNumPreparedOms) {
+ for (Map.Entry<String, Boolean> e : omPreparedStatusMap.entrySet()) {
+ if (!e.getValue()) {
+ String omHost = e.getKey();
+ try (OzoneManagerProtocol singleOmClient =
+ parent.createOmClient(omServiceId, omHost, false)) {
+ PrepareStatusResponse response =
+ singleOmClient.getOzoneManagerPrepareStatus(prepareTxnId);
+ PrepareStatus status = response.getStatus();
+ System.out.println("OM : [" + omHost + "], Prepare " +
+ "Status : [" + status.name() + "], Current Transaction Id : ["
+
+ response.getCurrentTxnIndex() + "]");
+ if (status.equals(PREPARE_COMPLETED)) {
+ e.setValue(true);
+ currentNumPreparedOms++;
+ }
+ } catch (IOException ioEx) {
+ System.out.println("Exception while checking preparation " +
+ "completeness for [" + omHost +
+ "], Error : [" + ioEx.getMessage() + "]");
+ }
+ }
+ }
+ if (currentNumPreparedOms < expectedNumPreparedOms) {
+ System.out.println("Waiting for " + prepareCheckInterval +
+ " seconds before retrying...");
+ Thread.sleep(pInterval.toMillis());
+ }
+ }
+ if (currentNumPreparedOms < expectedNumPreparedOms) {
+ throw new Exception("OM Preparation failed since all OMs are not " +
+ "prepared yet.");
+ } else {
+ System.out.println();
+ System.out.println("OM Preparation successful! ");
+ System.out.println("No new write requests will be allowed until " +
+ "preparation is cancelled or upgrade/downgrade is done.");
+ }
+
return null;
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]