This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-4944
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-4944 by this push:
     new 8d1d957  HDDS-6022. [Multi-Tenant] Implement DeleteTenant: `ozone 
tenant delete` (#2857)
8d1d957 is described below

commit 8d1d957ee09756b47ce83b2e60fdb58fb8c856f5
Author: Siyao Meng <[email protected]>
AuthorDate: Thu Dec 9 14:40:50 2021 -0800

    HDDS-6022. [Multi-Tenant] Implement DeleteTenant: `ozone tenant delete` 
(#2857)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   1 +
 .../apache/hadoop/ozone/client/ObjectStore.java    |  68 ++++----
 .../apache/hadoop/ozone/client/OzoneVolume.java    |  30 ++++
 .../org/apache/hadoop/ozone/client/TenantArgs.java |  82 ++++++++++
 .../org/apache/hadoop/ozone/client/VolumeArgs.java |   2 +
 .../ozone/client/protocol/ClientProtocol.java      |  59 +++----
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  78 ++++++---
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   1 -
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   1 -
 .../hadoop/ozone/om/exceptions/OMException.java    |   5 +-
 .../hadoop/ozone/om/helpers/OmDBTenantInfo.java    |  10 +-
 .../hadoop/ozone/om/helpers/OmTenantArgs.java      |  59 ++++++-
 .../hadoop/ozone/om/helpers/OmVolumeArgs.java      |  55 ++++++-
 .../hadoop/ozone/om/lock/OzoneManagerLock.java     |   1 -
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  23 ++-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  23 ++-
 .../smoketest/security/ozone-secure-tenant.robot   | 102 +++++++++---
 .../hadoop/ozone/shell/TestOzoneTenantShell.java   | 166 +++++++++++++++----
 .../src/main/proto/OmClientProtocol.proto          |  43 +++--
 .../hadoop/ozone/om/OMMultiTenantManager.java      |  11 ++
 .../hadoop/ozone/om/OMMultiTenantManagerImpl.java  |  13 ++
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   2 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  86 +++++-----
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   3 -
 .../om/request/s3/security/OMSetSecretRequest.java |  41 +++--
 .../s3/tenant/OMAssignUserToTenantRequest.java     |  86 +++++-----
 .../s3/tenant/OMTenantAssignAdminRequest.java      |  22 +--
 .../request/s3/tenant/OMTenantCreateRequest.java   |  96 ++++++-----
 .../request/s3/tenant/OMTenantDeleteRequest.java   | 175 ++++++++++++++++++++-
 .../request/s3/tenant/OMTenantModifyRequest.java   |  50 ------
 .../request/s3/tenant/OMTenantRequestHelper.java   |  86 ++++++++--
 .../s3/tenant/OMTenantRevokeAdminRequest.java      |  18 ++-
 .../tenant/OMTenantRevokeUserAccessIdRequest.java  |  40 ++---
 .../om/request/volume/OMVolumeDeleteRequest.java   |  10 ++
 .../response/s3/tenant/OMTenantCreateResponse.java |   4 +-
 .../response/s3/tenant/OMTenantDeleteResponse.java |  98 ++++++++++++
 .../ozone/security/acl/OzoneNativeAuthorizer.java  |   3 +
 .../s3/security/TestS3GetSecretRequest.java        |   1 +
 .../shell/tenant/TenantAssignAdminHandler.java     |   3 -
 .../ozone/shell/tenant/TenantCreateHandler.java    |  35 ++---
 .../ozone/shell/tenant/TenantDeleteHandler.java    |  35 ++++-
 .../ozone/shell/tenant/TenantSetSecretHandler.java |   3 +-
 42 files changed, 1291 insertions(+), 439 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 7bc326a..21906cd 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -191,6 +191,7 @@ public final class OzoneConsts {
   public static final String OM_S3_VOLUME_PREFIX = "s3";
   public static final String OM_S3_SECRET = "S3Secret:";
   public static final String OM_PREFIX = "Prefix:";
+  public static final String OM_TENANT = "Tenant:";
 
   /**
    *   Max chunk size limit.
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
index 2639208..5b5b664 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.TenantInfoList;
 import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
 import org.apache.hadoop.ozone.om.helpers.TenantUserList;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -195,37 +196,46 @@ public class ObjectStore {
   }
 
   /**
-   * Create tenant.
-   * @param tenantName tenant name.
+   * Create a tenant.
+   * @param tenantId tenant name.
    * @throws IOException
    */
-  public void createTenant(String tenantName) throws IOException {
-    proxy.createTenant(tenantName);
-  }
-
-  // TODO: createTenant with tenantArgs
-//  /**
-//   * Create tenant.
-//   * @param tenantName tenant name.
-//   * @param tenantArgs tenant arguments.
-//   * @throws IOException
-//   */
-//  public void createTenant(String tenantName, OmTenantArgs tenantArgs)
-//      throws IOException {
-//    proxy.createTenant(tenantName, tenantArgs);
-//  }
+  public void createTenant(String tenantId) throws IOException {
+    proxy.createTenant(tenantId);
+  }
+
+  /**
+   * Create a tenant with extra arguments.
+   *
+   * @param tenantId tenant name.
+   * @param tenantArgs extra tenant arguments like volume name.
+   * @throws IOException
+   */
+  public void createTenant(String tenantId, TenantArgs tenantArgs)
+      throws IOException {
+    proxy.createTenant(tenantId, tenantArgs);
+  }
+
+  /**
+   * Delete a tenant.
+   * @param tenantId tenant name.
+   * @throws IOException
+   */
+  public DeleteTenantResponse deleteTenant(String tenantId) throws IOException 
{
+    return proxy.deleteTenant(tenantId);
+  }
 
   /**
    * Assign user accessId to tenant.
    * @param username user name to be assigned.
-   * @param tenantName tenant name.
+   * @param tenantId tenant name.
    * @param accessId Specified accessId.
    * @throws IOException
    */
   // TODO: Rename this to tenantAssignUserAccessId ?
   public S3SecretValue tenantAssignUserAccessId(
-      String username, String tenantName, String accessId) throws IOException {
-    return proxy.tenantAssignUserAccessId(username, tenantName, accessId);
+      String username, String tenantId, String accessId) throws IOException {
+    return proxy.tenantAssignUserAccessId(username, tenantId, accessId);
   }
 
   /**
@@ -240,29 +250,29 @@ public class ObjectStore {
   /**
    * Assign admin role to an accessId in a tenant.
    * @param accessId access ID.
-   * @param tenantName tenant name.
+   * @param tenantId tenant name.
    * @param delegated true if making delegated admin.
    * @throws IOException
    */
-  public void tenantAssignAdmin(String accessId, String tenantName,
-      boolean delegated) throws IOException {
-    proxy.tenantAssignAdmin(accessId, tenantName, delegated);
+  public void tenantAssignAdmin(String accessId, String tenantId,
+                                boolean delegated) throws IOException {
+    proxy.tenantAssignAdmin(accessId, tenantId, delegated);
   }
 
   /**
    * Revoke admin role of an accessId from a tenant.
    * @param accessId access ID.
-   * @param tenantName tenant name.
+   * @param tenantId tenant name.
    * @throws IOException
    */
-  public void tenantRevokeAdmin(String accessId, String tenantName)
+  public void tenantRevokeAdmin(String accessId, String tenantId)
       throws IOException {
-    proxy.tenantRevokeAdmin(accessId, tenantName);
+    proxy.tenantRevokeAdmin(accessId, tenantId);
   }
 
-  public TenantUserList listUsersInTenant(String tenantName, String userPrefix)
+  public TenantUserList listUsersInTenant(String tenantId, String userPrefix)
       throws IOException {
-    return proxy.listUsersInTenant(tenantName, userPrefix);
+    return proxy.listUsersInTenant(tenantId, userPrefix);
   }
 
   /**
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
index 3847b12..5c11088 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
@@ -93,6 +93,19 @@ public class OzoneVolume extends WithMetadata {
   private int listCacheSize;
 
   private OzoneObj ozoneObj;
+  /**
+   * Reference count on this Ozone volume.
+   *
+   * When reference count is larger than zero, it indicates that at least one
+   * "lock" is held on the volume by some Ozone feature (e.g. multi-tenancy).
+   * Volume delete operation will be denied in this case, and user should be
+   * prompted to release the lock first via the interface provided by that
+   * feature.
+   *
+   * Volumes created using CLI, ObjectStore API or upgraded from older OM DB
+   * will have reference count set to zero by default.
+   */
+  private long refCount;
 
   /**
    * Constructs OzoneVolume instance.
@@ -134,6 +147,20 @@ public class OzoneVolume extends WithMetadata {
   }
 
   /**
+   * @param refCount volume reference count.
+   */
+  @SuppressWarnings("parameternumber")
+  public OzoneVolume(ConfigurationSource conf, ClientProtocol proxy,
+      String name, String admin, String owner, long quotaInBytes,
+      long quotaInNamespace, long usedNamespace, long creationTime,
+      long modificationTime, List<OzoneAcl> acls,
+      Map<String, String> metadata, long refCount) {
+    this(conf, proxy, name, admin, owner, quotaInBytes, quotaInNamespace,
+        usedNamespace, creationTime, modificationTime, acls, metadata);
+    this.refCount = refCount;
+  }
+
+  /**
    * @param modificationTime modification time of the volume.
    */
   @SuppressWarnings("parameternumber")
@@ -444,6 +471,9 @@ public class OzoneVolume extends WithMetadata {
     proxy.deleteBucket(name, bucketName);
   }
 
+  public long getRefCount() {
+    return refCount;
+  }
 
   /**
    * An Iterator to iterate over {@link OzoneBucket} list.
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/TenantArgs.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/TenantArgs.java
new file mode 100644
index 0000000..f1b67a5
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/TenantArgs.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class encapsulates the arguments for creating a tenant.
+ */
+public final class TenantArgs {
+
+  /**
+   * Name of the volume to be created for the tenant.
+   */
+  private final String volumeName;
+
+  /**
+   * Private constructor, constructed via builder.
+   * @param volumeName Volume name.
+   */
+  private TenantArgs(String volumeName) {
+    this.volumeName = volumeName;
+  }
+
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  /**
+   * Returns new builder class that builds a TenantArgs.
+   *
+   * @return Builder
+   */
+  public static TenantArgs.Builder newBuilder() {
+    return new TenantArgs.Builder();
+  }
+
+  /**
+   * Builder for TenantArgs.
+   */
+  @SuppressWarnings("checkstyle:hiddenfield")
+  public static class Builder {
+    private String volumeName;
+
+    /**
+     * Constructs a builder.
+     */
+    public Builder() {
+    }
+
+    public TenantArgs.Builder setVolumeName(String volumeName) {
+      this.volumeName = volumeName;
+      return this;
+    }
+
+    /**
+     * Constructs a TenantArgs.
+     * @return TenantArgs.
+     */
+    public TenantArgs build() {
+      Preconditions.checkNotNull(volumeName);
+      return new TenantArgs(volumeName);
+    }
+  }
+
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java
index 35fcdfa..9d683c5 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java
@@ -106,6 +106,7 @@ public final class VolumeArgs {
   public List<OzoneAcl> getAcls() {
     return acls;
   }
+
   /**
    * Returns new builder class that builds a OmVolumeArgs.
    *
@@ -118,6 +119,7 @@ public final class VolumeArgs {
   /**
    * Builder for OmVolumeArgs.
    */
+  @SuppressWarnings("checkstyle:hiddenfield")
   public static class Builder {
     private String adminName;
     private String ownerName;
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 5d2a420..616a5dc 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.TenantArgs;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -51,6 +52,7 @@ import org.apache.hadoop.ozone.om.helpers.TenantInfoList;
 import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
 import org.apache.hadoop.ozone.om.helpers.TenantUserList;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
@@ -588,39 +590,40 @@ public interface ClientProtocol {
   void revokeS3Secret(String kerberosID) throws IOException;
 
   /**
-   * Create tenant.
-   * @param tenantName tenant name.
+   * Create a tenant.
+   * @param tenantId tenant name.
    * @throws IOException
    */
-  void createTenant(String tenantName) throws IOException;
+  void createTenant(String tenantId) throws IOException;
 
-  // TODO
-//  /**
-//   * Modify tenant.
-//   * @param tenantName tenant name.
-//   * @throws IOException
-//   */
-//  void modifyTenant(String tenantName) throws IOException;
-//
-//  /**
-//   * Delete tenant.
-//   * @param tenantName tenant name.
-//   * @throws IOException
-//   */
-//  void deleteTenant(String tenantName) throws IOException;
+  /**
+   * Create a tenant with args.
+   *
+   * @param tenantId
+   * @param tenantArgs extra arguments e.g. volume name
+   * @throws IOException
+   */
+  void createTenant(String tenantId, TenantArgs tenantArgs) throws IOException;
 
   /**
-   * Assign user to tenant.
+   * Delete a tenant.
+   * @param tenantId tenant name.
+   * @throws IOException
+   */
+  DeleteTenantResponse deleteTenant(String tenantId) throws IOException;
+
+  /**
+   * Assign a user to a tenant.
    * @param username user name to be assigned.
-   * @param tenantName tenant name.
+   * @param tenantId tenant name.
    * @param accessId access ID.
    * @throws IOException
    */
-  S3SecretValue tenantAssignUserAccessId(String username, String tenantName,
+  S3SecretValue tenantAssignUserAccessId(String username, String tenantId,
       String accessId) throws IOException;
 
   /**
-   * Revoke user accessId to tenant.
+   * Revoke a user accessId previously assign to a tenant.
    * @param accessId accessId to be revoked.
    * @throws IOException
    */
@@ -629,20 +632,20 @@ public interface ClientProtocol {
   /**
    * Assign admin role to an accessId in a tenant.
    * @param accessId access ID.
-   * @param tenantName tenant name.
+   * @param tenantId tenant name.
    * @param delegated true if making delegated admin.
    * @throws IOException
    */
-  void tenantAssignAdmin(String accessId, String tenantName,
-      boolean delegated) throws IOException;
+  void tenantAssignAdmin(String accessId, String tenantId, boolean delegated)
+      throws IOException;
 
   /**
    * Revoke admin role of an accessId from a tenant.
    * @param accessId access ID.
-   * @param tenantName tenant name.
+   * @param tenantId tenant name.
    * @throws IOException
    */
-  void tenantRevokeAdmin(String accessId, String tenantName) throws 
IOException;
+  void tenantRevokeAdmin(String accessId, String tenantId) throws IOException;
 
   /**
    * Get tenant info for a user.
@@ -655,12 +658,12 @@ public interface ClientProtocol {
 
   /**
    * Get List of users in a tenant.
-   * @param tenantName tenant name
+   * @param tenantId tenant name
    * @param prefix optional prefix
    * @return List of username, accessIds in tenant.
    * @throws IOException on server error.
    */
-  TenantUserList listUsersInTenant(String tenantName, String prefix)
+  TenantUserList listUsersInTenant(String tenantId, String prefix)
       throws IOException;
 
   /**
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 7e81ba4..830a707 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUpload;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.TenantArgs;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
@@ -96,6 +97,7 @@ import 
org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmRenameKeys;
+import org.apache.hadoop.ozone.om.helpers.OmTenantArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
@@ -111,6 +113,7 @@ import 
org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
 import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
 import org.apache.hadoop.ozone.security.GDPRSymmetricKey;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -401,7 +404,8 @@ public class RpcClient implements ClientProtocol {
         volume.getCreationTime(),
         volume.getModificationTime(),
         volume.getAcls(),
-        volume.getMetadata());
+        volume.getMetadata(),
+        volume.getRefCount());
   }
 
   @Override
@@ -668,29 +672,63 @@ public class RpcClient implements ClientProtocol {
    * {@inheritDoc}
    */
   @Override
-  public void createTenant(String tenantName) throws IOException {
-    Preconditions.checkArgument(Strings.isNotBlank(tenantName),
-        "tenantName cannot be null or empty.");
-    ozoneManagerClient.createTenant(tenantName);
+  public void createTenant(String tenantId) throws IOException {
+    createTenant(tenantId, TenantArgs.newBuilder()
+        .setVolumeName(tenantId).build());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void createTenant(String tenantId, TenantArgs tenantArgs)
+      throws IOException {
+    Preconditions.checkArgument(Strings.isNotBlank(tenantId),
+        "tenantId cannot be null or empty.");
+    Preconditions.checkNotNull(tenantArgs);
+
+    final String volumeName = tenantArgs.getVolumeName();
+    verifyVolumeName(volumeName);
+
+    OmTenantArgs.Builder builder = OmTenantArgs.newBuilder();
+    builder.setTenantId(tenantId);
+    builder.setVolumeName(volumeName);
+    // TODO: Add more fields
+    // TODO: Include OmVolumeArgs in (Om)TenantArgs as well for volume 
creation?
+
+    LOG.info("Creating Tenant: '{}', with new volume: '{}'",
+        tenantId, volumeName);
+
+    ozoneManagerClient.createTenant(builder.build());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public DeleteTenantResponse deleteTenant(String tenantId) throws IOException 
{
+    Preconditions.checkArgument(Strings.isNotBlank(tenantId),
+        "tenantId cannot be null or empty.");
+    return ozoneManagerClient.deleteTenant(tenantId);
   }
 
   /**
    * Assign user to tenant.
    * @param username user name to be assigned.
-   * @param tenantName tenant name.
+   * @param tenantId tenant name.
    * @throws IOException
    */
   @Override
   public S3SecretValue tenantAssignUserAccessId(
-      String username, String tenantName, String accessId) throws IOException {
+      String username, String tenantId, String accessId) throws IOException {
     Preconditions.checkArgument(Strings.isNotBlank(username),
         "username can't be null or empty.");
-    Preconditions.checkArgument(Strings.isNotBlank(tenantName),
-        "tenantName can't be null or empty.");
+    Preconditions.checkArgument(Strings.isNotBlank(tenantId),
+        "tenantId can't be null or empty.");
     Preconditions.checkArgument(Strings.isNotBlank(accessId),
         "accessId can't be null or empty.");
     return ozoneManagerClient.tenantAssignUserAccessId(
-        username, tenantName, accessId);
+        username, tenantId, accessId);
   }
 
   /**
@@ -708,33 +746,33 @@ public class RpcClient implements ClientProtocol {
   /**
    * Assign admin role to an accessId in a tenant.
    * @param accessId access ID.
-   * @param tenantName tenant name.
+   * @param tenantId tenant name.
    * @param delegated true if making delegated admin.
    * @throws IOException
    */
   @Override
-  public void tenantAssignAdmin(String accessId, String tenantName,
+  public void tenantAssignAdmin(String accessId, String tenantId,
       boolean delegated)
       throws IOException {
     Preconditions.checkArgument(Strings.isNotBlank(accessId),
         "accessId can't be null or empty.");
-    // tenantName can be empty
-    ozoneManagerClient.tenantAssignAdmin(accessId, tenantName, delegated);
+    // tenantId can be empty
+    ozoneManagerClient.tenantAssignAdmin(accessId, tenantId, delegated);
   }
 
   /**
    * Revoke admin role of an accessId from a tenant.
    * @param accessId access ID.
-   * @param tenantName tenant name.
+   * @param tenantId tenant name.
    * @throws IOException
    */
   @Override
-  public void tenantRevokeAdmin(String accessId, String tenantName)
+  public void tenantRevokeAdmin(String accessId, String tenantId)
       throws IOException {
     Preconditions.checkArgument(Strings.isNotBlank(accessId),
         "accessId can't be null or empty.");
-    // tenantName can be empty
-    ozoneManagerClient.tenantRevokeAdmin(accessId, tenantName);
+    // tenantId can be empty
+    ozoneManagerClient.tenantRevokeAdmin(accessId, tenantId);
   }
 
   /**
@@ -762,9 +800,9 @@ public class RpcClient implements ClientProtocol {
   }
 
   @Override
-  public TenantUserList listUsersInTenant(String tenantName, String prefix)
+  public TenantUserList listUsersInTenant(String tenantId, String prefix)
       throws IOException {
-    return ozoneManagerClient.listUsersInTenant(tenantName, prefix);
+    return ozoneManagerClient.listUsersInTenant(tenantId, prefix);
   }
 
   @Override
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 047878f..43bb2bd 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -306,7 +306,6 @@ public final class OmUtils {
     case RevokeS3Secret:
     case PurgePaths:
     case CreateTenant:
-    case ModifyTenant:
     case DeleteTenant:
     case TenantAssignUserAccessId:
     case TenantRevokeUserAccessId:
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 6f83395..b985dd3 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -75,7 +75,6 @@ public enum OMAction implements AuditAction {
   REVOKE_S3_SECRET,
 
   CREATE_TENANT,
-  MODIFY_TENANT,
   DELETE_TENANT,
   LIST_TENANT,
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 8c2f38f..0d164a9 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -252,6 +252,9 @@ public class OMException extends IOException {
     TENANT_USER_ACCESSID_ALREADY_EXISTS,
     INVALID_TENANT_USER_NAME,
     INVALID_ACCESSID,
-    TENANT_AUTHORIZER_ERROR
+    TENANT_AUTHORIZER_ERROR,
+
+    VOLUME_IS_REFERENCED,
+    TENANT_NOT_EMPTY
   }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDBTenantInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDBTenantInfo.java
index 302d403..7345863 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDBTenantInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmDBTenantInfo.java
@@ -29,11 +29,11 @@ public final class OmDBTenantInfo {
    */
   private final String tenantId;
   /**
-   * Name of the tenant's bucket namespace.
+   * Name of the bucket namespace (volume name).
    */
   private final String bucketNamespaceName;
   /**
-   * Name of the tenant's account namespace.
+   * Name of the account namespace.
    */
   private final String accountNamespaceName;
   /**
@@ -101,6 +101,12 @@ public final class OmDBTenantInfo {
     return new OmDBTenantInfo(tInfo);
   }
 
+  /**
+   * Returns the bucket namespace name. a.k.a. volume name.
+   *
+   * Note: This returns an empty string ("") if the tenant is somehow not
+   * associated with a volume. Should never return null.
+   */
   public String getBucketNamespaceName() {
     return bucketNamespaceName;
   }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmTenantArgs.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmTenantArgs.java
index 8c500d2..518a08b 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmTenantArgs.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmTenantArgs.java
@@ -18,18 +18,75 @@
  */
 package org.apache.hadoop.ozone.om.helpers;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class is used for storing Ozone tenant arguments.
  */
 public class OmTenantArgs {
-  /* Tenant name */
+
+  /**
+   * Tenant name.
+   */
   private final String tenantId;
 
+  /**
+   * Volume name to be created for this tenant.
+   * Default volume name would be the same as tenant name if unspecified.
+   */
+  private final String volumeName;
+
   public OmTenantArgs(String tenantId) {
     this.tenantId = tenantId;
+    this.volumeName = this.tenantId;
+  }
+
+  public OmTenantArgs(String tenantId, String volumeName) {
+    this.tenantId = tenantId;
+    this.volumeName = volumeName;
   }
 
   public String getTenantId() {
     return tenantId;
   }
+
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  public static OmTenantArgs.Builder newBuilder() {
+    return new OmTenantArgs.Builder();
+  }
+
+  /**
+   * Builder for OmTenantArgs.
+   */
+  @SuppressWarnings("checkstyle:hiddenfield")
+  public static class Builder {
+    private String tenantId;
+    private String volumeName;
+
+    /**
+     * Constructs a builder.
+     */
+    public Builder() {
+    }
+
+    public Builder setTenantId(String tenantId) {
+      this.tenantId = tenantId;
+      return this;
+    }
+
+    public Builder setVolumeName(String volumeName) {
+      this.volumeName = volumeName;
+      return this;
+    }
+
+    public OmTenantArgs build() {
+      Preconditions.checkNotNull(tenantId);
+      Preconditions.checkNotNull(volumeName);
+      return new OmTenantArgs(tenantId, volumeName);
+    }
+  }
+
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
index 85165d6..c59200a 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.ozone.om.helpers;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
@@ -48,6 +47,19 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
   private long quotaInNamespace;
   private long usedNamespace;
   private List<OzoneAcl> acls;
+  /**
+   * Reference count on this Ozone volume.
+   *
+   * When reference count is larger than zero, it indicates that at least one
+   * "lock" is held on the volume by some Ozone feature (e.g. multi-tenancy).
+   * Volume delete operation will be denied in this case, and user should be
+   * prompted to release the lock first via the interface provided by that
+   * feature.
+   *
+   * Volumes created using CLI, ObjectStore API or upgraded from older OM DB
+   * will have reference count set to zero by default.
+   */
+  private long refCount;
 
   /**
    * Private constructor, constructed via builder.
@@ -69,7 +81,7 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
   private OmVolumeArgs(String adminName, String ownerName, String volume,
       long quotaInBytes, long quotaInNamespace, long usedNamespace,
       Map<String, String> metadata, List<OzoneAcl> acls, long creationTime,
-      long modificationTime, long objectID, long updateID) {
+      long modificationTime, long objectID, long updateID, long refCount) {
     this.adminName = adminName;
     this.ownerName = ownerName;
     this.volume = volume;
@@ -82,8 +94,29 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
     this.modificationTime = modificationTime;
     this.objectID = objectID;
     this.updateID = updateID;
+    this.refCount = refCount;
   }
 
+  public long getRefCount() {
+    Preconditions.checkState(refCount >= 0L, "refCount should not be 
negative");
+    return refCount;
+  }
+
+  /**
+   * Increase refCount by 1.
+   */
+  public void incRefCount() {
+    this.refCount++;
+  }
+
+  /**
+   * Decrease refCount by 1.
+   */
+  public void decRefCount() {
+    Preconditions.checkState(this.refCount > 0L,
+        "refCount should not become negative");
+    this.refCount--;
+  }
 
   public void setOwnerName(String newOwner) {
     this.ownerName = newOwner;
@@ -263,6 +296,7 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
     private List<OzoneAcl> acls;
     private long objectID;
     private long updateID;
+    private long refCount;
 
     /**
      * Sets the Object ID for this Object.
@@ -347,11 +381,17 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
       return this;
     }
 
-    public Builder addOzoneAcls(OzoneAcl acl) throws IOException {
+    public Builder addOzoneAcls(OzoneAcl acl) {
       OzoneAclUtil.addAcl(acls, acl);
       return this;
     }
 
+    public void setRefCount(long refCount) {
+      Preconditions.checkState(refCount >= 0L,
+          "refCount should not be negative");
+      this.refCount = refCount;
+    }
+
     /**
      * Constructs a CreateVolumeArgument.
      * @return CreateVolumeArgs.
@@ -362,7 +402,7 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
       Preconditions.checkNotNull(volume);
       return new OmVolumeArgs(adminName, ownerName, volume, quotaInBytes,
           quotaInNamespace, usedNamespace, metadata, acls, creationTime,
-          modificationTime, objectID, updateID);
+          modificationTime, objectID, updateID, refCount);
     }
 
   }
@@ -383,6 +423,7 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
         .setModificationTime(modificationTime)
         .setObjectID(objectID)
         .setUpdateID(updateID)
+        .setRefCount(refCount)
         .build();
   }
 
@@ -401,7 +442,8 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
         volInfo.getCreationTime(),
         volInfo.getModificationTime(),
         volInfo.getObjectID(),
-        volInfo.getUpdateID());
+        volInfo.getUpdateID(),
+        volInfo.getRefCount());
   }
 
   @Override
@@ -413,6 +455,7 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
         ", creationTime='" + creationTime + '\'' +
         ", quotaInBytes='" + quotaInBytes + '\'' +
         ", usedNamespace='" + usedNamespace + '\'' +
+        ", refCount='" + refCount + '\'' +
         '}';
   }
 
@@ -433,6 +476,6 @@ public final class OmVolumeArgs extends WithObjectID 
implements Auditable {
 
     return new OmVolumeArgs(adminName, ownerName, volume, quotaInBytes,
         quotaInNamespace, usedNamespace, cloneMetadata, cloneAcls,
-        creationTime, modificationTime, objectID, updateID);
+        creationTime, modificationTime, objectID, updateID, refCount);
   }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 2c03573..ca3de18 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -390,7 +390,6 @@ public class OzoneManagerLock {
 
     S3_SECRET_LOCK((byte) 4, "S3_SECRET_LOCK"), // 31
     PREFIX_LOCK((byte) 5, "PREFIX_LOCK"); //63
-//    TENANT_LOCK((byte) 6, "TENANT_LOCK"); // 127
 
     // level of the resource
     private byte lockLevel;
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 63c1000..0c8c833 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmRenameKeys;
+import org.apache.hadoop.ozone.om.helpers.OmTenantArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
@@ -51,6 +52,7 @@ import org.apache.hadoop.ozone.om.helpers.TenantInfoList;
 import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
 import org.apache.hadoop.ozone.om.helpers.TenantUserList;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
@@ -479,19 +481,26 @@ public interface OzoneManagerProtocol
   void revokeS3Secret(String kerberosID) throws IOException;
 
   /**
-   * Create tenant.
-   * @param tenantName tenant name.
+   * Create a tenant.
+   * @param omTenantArgs OmTenantArgs
    * @throws IOException
    */
-  void createTenant(String tenantName) throws IOException;
+  void createTenant(OmTenantArgs omTenantArgs) throws IOException;
 
-  // TODO: modify, delete
+  /**
+   * Delete a tenant.
+   * @param tenantId tenant name.
+   * @return DeleteTenantResponse
+   * @throws IOException
+   */
+  DeleteTenantResponse deleteTenant(String tenantId) throws IOException;
 
   /**
-   * Assign user to tenant.
+   * Assign user to a tenant.
    * @param username user name to be assigned.
    * @param tenantName tenant name.
    * @param accessId access ID.
+   * @return S3SecretValue
    * @throws IOException
    */
   S3SecretValue tenantAssignUserAccessId(String username, String tenantName,
@@ -501,7 +510,7 @@ public interface OzoneManagerProtocol
 
   // TODO: modify, delete
   /**
-   * Revoke user accessId to tenant.
+   * Revoke user accessId to a tenant.
    * @param accessId accessId to be revoked.
    * @throws IOException
    */
@@ -518,7 +527,7 @@ public interface OzoneManagerProtocol
       boolean delegated) throws IOException;
 
   /**
-   * Revoke admin role of an accessId from a tenant.
+   * Revoke admin role of an accessId in a tenant.
    * @param accessId access ID.
    * @param tenantName tenant name.
    * @throws IOException
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 97e3c2f..b861b94 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmRenameKeys;
+import org.apache.hadoop.ozone.om.helpers.OmTenantArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
@@ -928,14 +929,13 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
 
   /**
    * {@inheritDoc}
-   *
-   * TODO: Add a variant that uses OmTenantArgs?
    */
   @Override
-  public void createTenant(String tenantArgs)
-      throws IOException {
+  public void createTenant(OmTenantArgs omTenantArgs) throws IOException {
     final CreateTenantRequest request = CreateTenantRequest.newBuilder()
-        .setTenantName(tenantArgs)
+        .setTenantName(omTenantArgs.getTenantId())
+        .setVolumeName(omTenantArgs.getVolumeName())
+        // TODO: Add more args like policy names later
         .build();
     final OMRequest omRequest = createOMRequest(Type.CreateTenant)
         .setCreateTenantRequest(request)
@@ -944,6 +944,19 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
     handleError(omResponse);
   }
 
+  @Override
+  public DeleteTenantResponse deleteTenant(String tenantId) throws IOException 
{
+    final DeleteTenantRequest request = DeleteTenantRequest.newBuilder()
+        .setTenantId(tenantId)
+        .build();
+    final OMRequest omRequest = createOMRequest(Type.DeleteTenant)
+        .setDeleteTenantRequest(request)
+        .build();
+    final OMResponse omResponse = submitRequest(omRequest);
+
+    return handleError(omResponse).getDeleteTenantResponse();
+  }
+
   /**
    * {@inheritDoc}
    *
diff --git 
a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot 
b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
index 3ad8c10..507be54 100644
--- a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
@@ -19,10 +19,12 @@ Library             OperatingSystem
 Library             String
 Library             BuiltIn
 Resource            ../commonlib.robot
+Resource            ../s3/commonawslib.robot
 Test Timeout        5 minutes
 
 *** Variables ***
 ${RANGER_ENDPOINT_URL}  https://ranger:6182
+${S3G_ENDPOINT_URL}     http://s3g:9878
 
 *** Keywords ***
 Init Ranger MockServer
@@ -32,43 +34,103 @@ Init Ranger MockServer
 *** Test Cases ***
 Secure Tenant Create Tenant Success with Cluster Admin
     Run Keyword         Init Ranger MockServer
-    Run Keyword   Kinit test user     testuser     testuser.keytab
+    Run Keyword         Kinit test user     testuser     testuser.keytab
     ${output} =         Execute          ozone tenant create tenantone
                         Should contain   ${output}         Created tenant 
'tenantone'
 
-Secure Tenant Assign User Success
-    ${output} =         Execute          ozone tenant user assign bob 
--tenant=tenantone
-                        Should contain   ${output}         Assigned 'bob' to 
'tenantone'
+Secure Tenant Assign User Success with Cluster Admin
+    ${output} =         Execute          ozone tenant user assign testuser 
--tenant=tenantone
+                        Should contain   ${output}         Assigned 'testuser' 
to 'tenantone'
+    ${accessId} =       Get Regexp Matches   ${output}     (?<=export 
AWS_ACCESS_KEY_ID=).*
+    ${secretKey} =      Get Regexp Matches   ${output}     (?<=export 
AWS_SECRET_ACCESS_KEY=).*
+    ${accessId} =       Set Variable         ${accessId[0]}
+    ${secretKey} =      Set Variable         ${secretKey[0]}
+                        Set Global Variable  ${ACCESS_ID}   ${accessId}
+                        Set Global Variable  ${SECRET_KEY}  ${secretKey}
+
+Secure Tenant Assign User Failure to Non-existent Tenant
+    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user assign 
testuser --tenant=thistenantdoesnotexist
+                        Should contain   ${output}         Tenant 
'thistenantdoesnotexist' doesn't exist
 
 Secure Tenant GetUserInfo Success
-    ${output} =         Execute          ozone tenant user info bob
-                        Should contain   ${output}         Tenant 'tenantone' 
with accessId 'tenantone$bob'
+    ${output} =         Execute          ozone tenant user info testuser
+                        Should contain   ${output}         Tenant 'tenantone' 
with accessId 'tenantone$testuser'
+
+Secure Tenant Create Bucket 1 Success via S3 API
+                        Execute          aws configure set aws_access_key_id 
${ACCESS_ID}
+                        Execute          aws configure set 
aws_secret_access_key ${SECRET_KEY}
+    ${output} =         Execute          aws s3api --endpoint-url 
${S3G_ENDPOINT_URL} create-bucket --bucket bucket-test1
+                        Should contain   ${output}         bucket-test1
+    ${output} =         Execute          aws s3api --endpoint-url 
${S3G_ENDPOINT_URL} list-buckets
+                        Should contain   ${output}         bucket-test1
 
 Secure Tenant SetSecret Success with Cluster Admin
-    ${output} =         Execute          ozone tenant user setsecret 
'tenantone$bob' --secret=somesecret1 --export
+    ${output} =         Execute          ozone tenant user setsecret 
'tenantone$testuser' --secret=somesecret1 --export
                         Should contain   ${output}         export 
AWS_SECRET_ACCESS_KEY='somesecret1'
 
-Secure Tenant SetSecret Failure For Invalid Secret Input 1
-    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user 
setsecret 'tenantone$bob' --secret='' --export
+Secure Tenant SetSecret Failure For Invalid Secret 1
+    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user 
setsecret 'tenantone$testuser' --secret='' --export
                         Should contain   ${output}         secretKey cannot be 
null or empty.
 
-Secure Tenant SetSecret Failure For Invalid Secret Input 2
-    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user 
setsecret 'tenantone$bob' --secret=short --export
+Secure Tenant SetSecret Failure For Invalid Secret 2
+    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user 
setsecret 'tenantone$testuser' --secret=short --export
                         Should contain   ${output}         Secret key length 
should be at least 8 characters
 
 Secure Tenant GetSecret Success
-    ${output} =         Execute          ozone tenant user getsecret 
'tenantone$bob' --export
+    ${output} =         Execute          ozone tenant user getsecret 
'tenantone$testuser' --export
                         Should contain   ${output}         export 
AWS_SECRET_ACCESS_KEY='somesecret1'
 
-Secure Tenant Assign User Failure
-    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user assign 
bob --tenant=thistenantdoesnotexist
-                        Should contain   ${output}         Tenant 
'thistenantdoesnotexist' doesn't exist
+Secure Tenant Delete Bucket 1 Failure With Old SecretKey via S3 API
+    ${rc}  ${output} =  Run And Return Rc And Output  aws s3api --endpoint-url 
${S3G_ENDPOINT_URL} delete-bucket --bucket bucket-test1
+                        Should Be True ${rc} > 0
 
-Secure Tenant Create Tenant Failure with Regular (non-admin) user
-    Run Keyword   Kinit test user     testuser2    testuser2.keytab
+Secure Tenant Delete Bucket 1 Success With Newly Set SecretKey via S3 API
+                        Execute          aws configure set 
aws_secret_access_key 'somesecret1'
+    ${output} =         Execute          aws s3api --endpoint-url 
${S3G_ENDPOINT_URL} delete-bucket --bucket bucket-test1
+
+Secure Tenant Delete Tenant Failure Tenant Not Empty
+    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant delete 
tenantone
+                        Should contain   ${output}         TENANT_NOT_EMPTY 
Tenant 'tenantone' is not empty. All accessIds associated to this tenant must 
be revoked before the tenant can be deleted. See `ozone tenant user revoke`
+
+Secure Tenant Create Tenant Failure with Regular User
+    Run Keyword         Kinit test user     testuser2    testuser2.keytab
     ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant create 
tenanttwo
-                        Should contain   ${output}         Failed to create 
tenant 'tenanttwo': User 'testuser2/[email protected]' is not an Ozone admin.
+                        Should contain   ${output}         PERMISSION_DENIED 
User 'testuser2/[email protected]' is not an Ozone admin.
 
-Secure Tenant SetSecret Failure with Regular (non-admin) user
-    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user 
set-secret 'tenantone$bob' --secret=somesecret2 --export
+Secure Tenant SetSecret Failure with Regular User
+    ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user 
set-secret 'tenantone$testuser' --secret=somesecret2 --export
                         Should contain   ${output}         Permission denied. 
Requested accessId
+
+Secure Tenant Create Bucket 2 Success with somesecret1 via S3 API
+    ${output} =         Execute          aws s3api --endpoint-url 
${S3G_ENDPOINT_URL} create-bucket --bucket bucket-test2
+                        Should contain   ${output}         bucket-test2
+
+Secure Tenant Delete Bucket 2 Failure with somesecret2 via S3 API
+                        Execute          aws configure set 
aws_secret_access_key 'somesecret2'
+    ${rc}  ${output} =  Run And Return Rc And Output  aws s3api --endpoint-url 
${S3G_ENDPOINT_URL} delete-bucket --bucket bucket-test2
+                        Should Be True ${rc} > 0
+
+Secure Tenant Delete Bucket 2 Success with somesecret1 via S3 API
+                        Execute          aws configure set 
aws_secret_access_key 'somesecret1'
+    ${output} =         Execute          aws s3api --endpoint-url 
${S3G_ENDPOINT_URL} delete-bucket --bucket bucket-test2
+
+Secure Tenant Revoke User AccessId Success with Cluster Admin
+    Run Keyword         Kinit test user     testuser     testuser.keytab
+    ${output} =         Execute          ozone tenant user revoke 
'tenantone$testuser'
+                        Should contain   ${output}         Revoked accessId 
'tenantone$testuser'.
+
+Secure Tenant Create Bucket 3 Failure with Revoked AccessId via S3 API
+    ${rc}  ${output} =  Run And Return Rc And Output  aws s3api --endpoint-url 
${S3G_ENDPOINT_URL} create-bucket --bucket bucket-test3
+                        Should Be True ${rc} > 0
+
+Secure Tenant Delete Tenant Success with Cluster Admin
+    ${output} =         Execute          ozone tenant delete tenantone
+                        Should contain   ${output}         Deleted tenant 
'tenantone'.
+
+Secure Tenant Delete Volume Success with Cluster Admin
+    ${output} =         Execute          ozone sh volume delete tenantone
+                        Should contain   ${output}         Volume tenantone is 
deleted
+
+Secure Tenant List Tenant Expect Empty Result
+    ${output} =         Execute          ozone tenant list
+                        Should not contain   ${output}     tenantone
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
index cacd212..5c2a345 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
@@ -74,8 +74,6 @@ public class TestOzoneTenantShell {
 
   static {
     System.setProperty("log4j.configurationFile", "auditlog.properties");
-    System.setProperty("log4j2.contextSelector",
-        "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
   }
 
   private static final String DEFAULT_ENCODING = UTF_8.name();
@@ -92,6 +90,7 @@ public class TestOzoneTenantShell {
 
   private static OzoneConfiguration conf = null;
   private static MiniOzoneCluster cluster = null;
+  private static OzoneShell ozoneSh = null;
   private static TenantShell tenantShell = null;
 
   private final ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -141,6 +140,7 @@ public class TestOzoneTenantShell {
     testFile.getParentFile().mkdirs();
     testFile.createNewFile();
 
+    ozoneSh = new OzoneShell();
     tenantShell = new TenantShell();
 
     // Init cluster
@@ -170,6 +170,10 @@ public class TestOzoneTenantShell {
     if (baseDir != null) {
       FileUtil.fullyDelete(baseDir, true);
     }
+
+    if (AUDIT_LOG_FILE.exists()) {
+      AUDIT_LOG_FILE.delete();
+    }
   }
 
   @Before
@@ -197,13 +201,16 @@ public class TestOzoneTenantShell {
     System.setErr(OLD_ERR);
   }
 
-  private void execute(GenericCli shell, String[] args) {
+  /**
+   * Returns exit code.
+   */
+  private int execute(GenericCli shell, String[] args) {
     LOG.info("Executing shell command with args {}", Arrays.asList(args));
     CommandLine cmd = shell.getCmd();
 
     CommandLine.IExecutionExceptionHandler exceptionHandler =
         (ex, commandLine, parseResult) -> {
-          commandLine.getErr().println(ex.getMessage());
+          new PrintStream(err, true, 
DEFAULT_ENCODING).println(ex.getMessage());
           return commandLine.getCommandSpec().exitCodeOnExecutionException();
         };
 
@@ -212,17 +219,17 @@ public class TestOzoneTenantShell {
     String[] argsWithHAConf = getHASetConfStrings(args);
 
     cmd.setExecutionExceptionHandler(exceptionHandler);
-    cmd.execute(argsWithHAConf);
+    return cmd.execute(argsWithHAConf);
   }
 
   /**
    * Helper that appends HA service id to args.
    */
-  private void executeHA(GenericCli shell, String[] args) {
+  private int executeHA(GenericCli shell, String[] args) {
     final String[] newArgs = new String[args.length + 1];
     System.arraycopy(args, 0, newArgs, 0, args.length);
     newArgs[args.length] = "--om-service-id=" + omServiceId;
-    execute(shell, newArgs);
+    return execute(shell, newArgs);
   }
 
   /**
@@ -333,6 +340,14 @@ public class TestOzoneTenantShell {
     }
   }
 
+  private void deleteVolume(String volumeName) throws IOException {
+    int exitC = execute(ozoneSh, new String[] {"volume", "delete", 
volumeName});
+    checkOutput(out, "Volume " + volumeName + " is deleted\n", true);
+    checkOutput(err, "", true);
+    // Exit code should be 0
+    Assert.assertEquals(0, exitC);
+  }
+
   @Test
   public void testAssignAdmin() throws IOException {
 
@@ -343,8 +358,8 @@ public class TestOzoneTenantShell {
     checkOutput(out, "Created tenant", false);
     checkOutput(err, "", true);
 
-    // Loop assign-revoke 3 times
-    for (int i = 0; i < 3; i++) {
+    // Loop assign-revoke 4 times
+    for (int i = 0; i < 4; i++) {
       executeHA(tenantShell, new String[] {
           "user", "assign", userName, "--tenant=" + tenantName});
       checkOutput(out, "export AWS_ACCESS_KEY_ID=", false);
@@ -369,8 +384,16 @@ public class TestOzoneTenantShell {
       checkOutput(err, "Revoked accessId", false);
     }
 
-    // TODO: delete tenant is not implemented yet
+    // Clean up
     executeHA(tenantShell, new String[] {"delete", tenantName});
+    checkOutput(out, "Deleted tenant '" + tenantName + "'.\n", false);
+    checkOutput(err, "", true);
+    deleteVolume(tenantName);
+
+    // Sanity check: tenant list should be empty
+    executeHA(tenantShell, new String[] {"list"});
+    checkOutput(out, "", true);
+    checkOutput(err, "", true);
   }
 
   /**
@@ -394,11 +417,13 @@ public class TestOzoneTenantShell {
     checkOutput(out, "Created tenant 'finance'.\n", true);
     checkOutput(err, "", true);
 
-//    lines = FileUtils.readLines(AUDIT_LOG_FILE, (String)null);
-    // FIXME: The check below is unstable.
-    //  Occasionally lines.size() == 0 leads to ArrayIndexOutOfBoundsException
-    //  Likely due to audit log not flushed in time at time of check.
-//    checkOutput(lines.get(lines.size() - 1), "ret=SUCCESS", false);
+    executeHA(tenantShell, new String[] {"list"});
+    checkOutput(out, "finance\n", true);
+    checkOutput(err, "", true);
+
+    lines = FileUtils.readLines(AUDIT_LOG_FILE, (String)null);
+    Assert.assertTrue(lines.size() > 0);
+    checkOutput(lines.get(lines.size() - 1), "ret=SUCCESS", false);
 
     // Check volume creation
     OmVolumeArgs volArgs = cluster.getOzoneManager().getVolumeInfo("finance");
@@ -407,8 +432,7 @@ public class TestOzoneTenantShell {
     // Creating the tenant with the same name again should fail
     executeHA(tenantShell, new String[] {"create", "finance"});
     checkOutput(out, "", true);
-    checkOutput(err, "Failed to create tenant 'finance':"
-        + " Tenant already exists\n", true);
+    checkOutput(err, "Failed to create tenant 'finance'", false);
 
     executeHA(tenantShell, new String[] {"create", "research"});
     checkOutput(out, "Created tenant 'research'.\n", true);
@@ -539,13 +563,73 @@ public class TestOzoneTenantShell {
         + "more than once. User 'bob' is already assigned to tenant 'research' 
"
         + "with accessId 'research$bob'.\n", true);
 
+    executeHA(tenantShell, new String[] {"list"});
+    checkOutput(out, "dev\nfinance\nresearch\n", true);
+    checkOutput(err, "", true);
+
     // Clean up
     executeHA(tenantShell, new String[] {
         "user", "revoke", "research$bob"});
     checkOutput(out, "", true);
     checkOutput(err, "Revoked accessId", false);
 
-    // TODO: Clean up: remove tenant when tenant remove CLI is implemented
+    executeHA(tenantShell, new String[] {"delete", "research"});
+    checkOutput(out, "Deleted tenant 'research'.\n", false);
+    checkOutput(err, "", true);
+    deleteVolume("research");
+
+    executeHA(tenantShell, new String[] {
+        "user", "revoke", "finance$bob"});
+    checkOutput(out, "", true);
+    checkOutput(err, "Revoked accessId", false);
+
+    executeHA(tenantShell, new String[] {"list"});
+    checkOutput(out, "dev\nfinance\n", true);
+    checkOutput(err, "", true);
+
+    executeHA(tenantShell, new String[] {"delete", "finance"});
+    checkOutput(out, "Deleted tenant 'finance'.\n", false);
+    checkOutput(err, "", true);
+    deleteVolume("finance");
+
+    executeHA(tenantShell, new String[] {"list"});
+    checkOutput(out, "dev\n", true);
+    checkOutput(err, "", true);
+
+    // Attempt to delete tenant with accessIds still assigned to it, should 
fail
+    int exitCode = executeHA(tenantShell, new String[] {"delete", "dev"});
+    Assert.assertTrue("Tenant delete should fail!", exitCode != 0);
+    checkOutput(out, "", true);
+    checkOutput(err, "Failed to delete tenant 'dev'", false);
+
+    // Delete dev volume should fail because the volume reference count > 0L
+    exitCode = execute(ozoneSh, new String[] {"volume", "delete", "dev"});
+    Assert.assertTrue("Volume delete should fail!", exitCode != 0);
+    checkOutput(out, "", true);
+    checkOutput(err, "Volume reference count is not zero (1). "
+        + "Ozone features are enabled on this volume. "
+        + "Try `ozone tenant delete <tenantId>` first.\n", true);
+
+    executeHA(tenantShell, new String[] {"list"});
+    checkOutput(out, "dev\n", true);
+    checkOutput(err, "", true);
+
+    // Revoke accessId first
+    executeHA(tenantShell, new String[] {
+        "user", "revoke", "dev$bob"});
+    checkOutput(out, "", true);
+    checkOutput(err, "Revoked accessId", false);
+
+    // Then delete tenant, should succeed
+    executeHA(tenantShell, new String[] {"delete", "dev"});
+    checkOutput(out, "Deleted tenant 'dev'.\n", false);
+    checkOutput(err, "", true);
+    deleteVolume("dev");
+
+    // Sanity check: tenant list should be empty
+    executeHA(tenantShell, new String[] {"list"});
+    checkOutput(out, "", true);
+    checkOutput(err, "", true);
   }
 
   @Test
@@ -591,7 +675,20 @@ public class TestOzoneTenantShell {
     checkOutput(out, "", true);
     checkOutput(err, "Revoked accessId", false);
 
-    // TODO: Clean up: remove tenant when tenant remove CLI is implemented
+    executeHA(tenantShell, new String[] {
+        "user", "revoke", "tenant1$bob"});
+    checkOutput(out, "", true);
+    checkOutput(err, "Revoked accessId", false);
+
+    executeHA(tenantShell, new String[] {"delete", "tenant1"});
+    checkOutput(out, "Deleted tenant 'tenant1'.\n", false);
+    checkOutput(err, "", true);
+    deleteVolume("tenant1");
+
+    // Sanity check: tenant list should be empty
+    executeHA(tenantShell, new String[] {"list"});
+    checkOutput(out, "", true);
+    checkOutput(err, "", true);
   }
 
   @Test
@@ -628,13 +725,13 @@ public class TestOzoneTenantShell {
     checkOutput(err, "", true);
 
     // Set empty secret key should fail
-    executeHA(tenantShell, new String[] {
+    int exitCode = executeHA(tenantShell, new String[] {
         "user", "setsecret", tenantName + "$alice",
         "--secret=short", "--export"});
+    Assert.assertTrue("Command should have non-zero exit code!", exitCode != 
0);
     checkOutput(out, "", true);
-    checkOutput(err, "", true);
-    // Note: Exception thrown from OM to the client stderr is somehow not
-    //  captured in err, but is printed to the console output.
+    checkOutput(err, "Secret key length should be at least 8 characters\n",
+        true);
 
     // Get secret should still give the previous secret key
     executeHA(tenantShell, new String[] {
@@ -668,13 +765,18 @@ public class TestOzoneTenantShell {
         .createUserForTesting("bob",  new String[] {"usergroup"});
 
     ugiBob.doAs((PrivilegedExceptionAction<Void>) () -> {
-      executeHA(tenantShell, new String[] {
+      int exitC = executeHA(tenantShell, new String[] {
           "user", "setsecret", tenantName + "$alice",
           "--secret=somesecret2", "--export"});
+      Assert.assertTrue("Should return non-zero exit code!", exitC != 0);
       checkOutput(out, "", true);
-      checkOutput(err, "", true);
-      // Note: Exception thrown from OM to the client stderr is somehow not
-      //  captured in err, but is printed to the console output.
+      checkOutput(err, "Permission denied. Requested accessId "
+          + "'tenant-test-set-secret$alice' and user doesn't satisfy any of:\n"
+          + "1) accessId match current username: 'bob';\n"
+          + "2) is an OM admin;\n"
+          + "3) user is assigned to a tenant under this accessId;\n"
+          + "4) user is an admin of the tenant where the accessId is "
+          + "assigned\n", true);
       return null;
     });
 
@@ -711,6 +813,14 @@ public class TestOzoneTenantShell {
     checkOutput(out, "", true);
     checkOutput(err, "Revoked accessId", false);
 
-    // TODO: Clean up: remove tenant when tenant remove CLI is implemented
+    executeHA(tenantShell, new String[] {"delete", tenantName});
+    checkOutput(out, "Deleted tenant '" + tenantName + "'.\n", false);
+    checkOutput(err, "", true);
+    deleteVolume(tenantName);
+
+    // Sanity check: tenant list should be empty
+    executeHA(tenantShell, new String[] {"list"});
+    checkOutput(out, "", true);
+    checkOutput(err, "", true);
   }
 }
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index b0ff9a9..de4a497 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -104,9 +104,8 @@ enum Type {
   PurgePaths = 94;
 
   CreateTenant = 95;
-  ModifyTenant = 96;
-  DeleteTenant = 97;
-  ListTenant = 103;  // TODO: Renumber this when rebasing
+  DeleteTenant = 97;  // TODO: Renumber when rebasing
+  ListTenant = 103;
 
   TenantGetUserInfo = 98;
   TenantAssignUserAccessId = 99;
@@ -203,7 +202,6 @@ message OMRequest {
   optional PurgePathsRequest                purgePathsRequest              = 
94;
 
   optional CreateTenantRequest              CreateTenantRequest            = 
95;
-  optional ModifyTenantRequest              ModifyTenantRequest            = 
96;
   optional DeleteTenantRequest              DeleteTenantRequest            = 
97;
   optional ListTenantRequest                ListTenantRequest              = 
103;  // TODO: Renumber this when rebasing
 
@@ -297,7 +295,6 @@ message OMResponse {
 
   // Skipped 94 to align with OMRequest
   optional CreateTenantResponse              CreateTenantResponse          = 
95;
-  optional ModifyTenantResponse              ModifyTenantResponse          = 
96;
   optional DeleteTenantResponse              DeleteTenantResponse          = 
97;
   optional ListTenantResponse                ListTenantResponse            = 
103;  // TODO: Renumber this when rebasing
 
@@ -412,10 +409,13 @@ enum Status {
     INVALID_TENANT_NAME = 77;
 
     ACCESSID_NOT_FOUND = 78;
-    TENANT_USER_ACCESSID_ALREADY_EXISTS = 79;  // TODO: Remove if not used
+    TENANT_USER_ACCESSID_ALREADY_EXISTS = 79;
     INVALID_TENANT_USER_NAME = 80;
     INVALID_ACCESSID = 81;
     TENANT_AUTHORIZER_ERROR = 82;
+
+    VOLUME_IS_REFERENCED = 83;
+    TENANT_NOT_EMPTY = 84;
 }
 
 /**
@@ -463,6 +463,7 @@ message VolumeInfo {
     optional uint64 modificationTime = 10;
     optional int64 quotaInNamespace = 11 [default = -2];
     optional uint64 usedNamespace = 12;
+    optional int64 refCount = 13;
 }
 
 /**
@@ -1420,7 +1421,7 @@ message ListTenantRequest {
 }
 
 message ListTenantResponse {
-    optional bool success = 1;
+    optional bool success = 1;  // TODO: Remove this field
     repeated TenantInfo tenantInfo = 2;
 }
 
@@ -1434,12 +1435,12 @@ message TenantListUserRequest {
 }
 
 message TenantGetUserInfoResponse {
-    optional bool success = 1;
+    optional bool success = 1;  // TODO: Remove this field
     optional TenantUserInfo tenantUserInfo = 2;
 }
 
 message TenantListUserResponse {
-    optional bool success = 1;
+    optional bool success = 1;  // TODO: Remove this field
     optional string tenantName = 2;
     repeated TenantUserAccessId userAccessIdInfo = 3;
 }
@@ -1468,14 +1469,11 @@ message RevokeS3SecretRequest {
 message CreateTenantRequest {
     optional string tenantName = 1;
     optional string tenantDefaultPolicyName = 2;
-}
-
-message ModifyTenantRequest {
-    optional string tenantName = 1;
+    optional string volumeName = 3;
 }
 
 message DeleteTenantRequest {
-    optional string tenantName = 1;
+    optional string tenantId = 1;
 }
 
 message TenantAssignUserAccessIdRequest {
@@ -1505,32 +1503,29 @@ message GetS3VolumeRequest {
 }
 
 message CreateTenantResponse {
-    optional bool success = 1;
-}
-
-message ModifyTenantResponse {
-    optional bool success = 1;
+    optional bool success = 1;  // TODO: Remove this field
 }
 
 message DeleteTenantResponse {
-    optional bool success = 1;
+    optional string volumeName = 1;
+    optional int64 volRefCount = 2;
 }
 
 message TenantAssignUserAccessIdResponse {
-    optional bool success = 1;
+    optional bool success = 1;  // TODO: Remove this field
     optional S3Secret s3Secret = 2;
 }
 
 message TenantRevokeUserAccessIdResponse {
-    optional bool success = 1;
+    optional bool success = 1;  // TODO: Remove this field
 }
 
 message TenantAssignAdminResponse {
-    optional bool success = 1;
+    optional bool success = 1;  // TODO: Remove this field
 }
 
 message TenantRevokeAdminResponse {
-    optional bool success = 1;
+    optional bool success = 1;  // TODO: Remove this field
 }
 
 message GetS3VolumeResponse {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
index c4d1793..f44dafc 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
@@ -128,6 +128,17 @@ public interface OMMultiTenantManager {
   void revokeUserAccessId(String accessID) throws IOException;
 
   /**
+   * A placeholder method to remove a failed-to-assign accessId from
+   * tenantCache.
+   * Triggered in OMAssignUserToTenantRequest#handleRequestFailure.
+   * Most likely becomes unnecessary if we move OMMTM call to the end of the
+   * request (current it runs in preExecute).
+   * TODO: Remove this if unneeded when Ranger thread patch lands.
+   */
+  void removeUserAccessIdFromCache(String accessID, String userPrincipal,
+                                   String tenantName);
+
+  /**
    * Given an accessId, return kerberos user name for the tenant user.
    */
   String getUserNameGivenAccessId(String accessId);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
index c9aa568..b073936 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
@@ -318,6 +318,19 @@ public class OMMultiTenantManagerImpl implements 
OMMultiTenantManager {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  public void removeUserAccessIdFromCache(String accessID, String 
userPrincipal,
+                                          String tenantName) {
+    try {
+      tenantCache.get(tenantName).getTenantUsers().remove(
+          new ImmutablePair<>(userPrincipal, accessID));
+    } catch (NullPointerException e) {
+      // tenantCache is somehow empty. Ignore for now.
+      // But how?
+    }
+  }
 
   @Override
   public String getUserNameGivenAccessId(String accessId) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 5f1a43a..6b08935 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -141,7 +141,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
    * |----------------------------------------------------------------------|
    * | principalToAccessIdsTable | Principal -> OmDBKerberosPrincipalInfo   |
    * |----------------------------------------------------------------------|
-   * | tenantStateTable          | tenant name -> OmDBTenantInfo            |
+   * | tenantStateTable          | tenantId -> OmDBTenantInfo               |
    * |----------------------------------------------------------------------|
    * | tenantGroupTable          | accessId -> [tenant group A, B, ...]     |
    * |----------------------------------------------------------------------|
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index e8b1ea0..899749a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -50,7 +50,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Optional;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -131,6 +130,7 @@ import 
org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmRenameKeys;
+import org.apache.hadoop.ozone.om.helpers.OmTenantArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
@@ -156,6 +156,7 @@ import 
org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.apache.hadoop.ozone.om.upgrade.OMUpgradeFinalizer;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
@@ -3081,51 +3082,54 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
   public S3SecretValue getS3Secret(String kerberosID, boolean createIfNotExist)
           throws IOException {
-    throw new NotImplementedException(
-            "non-Ratis getS3Secret(String, boolean) is not implemented");
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented. As write requests use a new approach");
   }
 
-  /**
-   * Create tenant.
-   */
-  public void createTenant(String tenantName) throws IOException {
-    throw new NotImplementedException(
-        "non-Ratis createTenant() is not implemented");
+  @Override
+  public void createTenant(OmTenantArgs omTenantArgs) {
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented. As write requests use a new approach");
+  }
+
+  @Override
+  public DeleteTenantResponse deleteTenant(String tenantId) {
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented. As write requests use a new approach");
   }
 
   /**
    * Assign user accessId to tenant.
    */
   public S3SecretValue tenantAssignUserAccessId(
-      String username, String tenantName, String accessId) throws IOException {
-    throw new NotImplementedException(
-        "non-Ratis tenantAssignUserAccessId() is not implemented");
+      String username, String tenantId, String accessId) throws IOException {
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented. As write requests use a new approach");
   }
 
   /**
    * Revoke user accessId to tenant.
    */
   public void tenantRevokeUserAccessId(String accessId) throws IOException {
-    throw new NotImplementedException(
-        "non-Ratis tenantRevokeUserAccessId() is not implemented");
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented. As write requests use a new approach");
   }
 
   /**
-   * Assign admin role to an accessId in a tenant.
+   * Assign admin role to a user by an accessId in a tenant.
    */
-  public void tenantAssignAdmin(String accessId, String tenantName,
-      boolean delegated) throws IOException {
-    throw new NotImplementedException(
-        "non-Ratis tenantAssignAdmin() is not implemented");
+  public void tenantAssignAdmin(String accessId, String tenantId,
+                                boolean delegated) {
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented. As write requests use a new approach");
   }
 
   /**
    * Revoke admin role of an accessId from a tenant.
    */
-  public void tenantRevokeAdmin(String accessId, String tenantName)
-      throws IOException {
-    throw new NotImplementedException(
-        "non-Ratis tenantRevokeAdmin() is not implemented");
+  public void tenantRevokeAdmin(String accessId, String tenantId) {
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented. As write requests use a new approach");
   }
 
   /**
@@ -3144,6 +3148,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
     final List<TenantInfo> tenantInfoList = new ArrayList<>();
 
+    // TODO: Iterate cache first. See KeyManagerImpl#listStatus
+
     TableIterator<String, ? extends Table.KeyValue<String, OmDBTenantInfo>>
         iterator = metadataManager.getTenantStateTable().iterator();
 
@@ -3225,26 +3231,26 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   @Override
-  public TenantUserList listUsersInTenant(String tenantName, String prefix)
+  public TenantUserList listUsersInTenant(String tenantId, String prefix)
       throws IOException {
 
-    if (StringUtils.isEmpty(tenantName)) {
+    if (StringUtils.isEmpty(tenantId)) {
       return null;
     }
 
     final Map<String, String> auditMap = new LinkedHashMap<>();
-    auditMap.put(OzoneConsts.TENANT, tenantName);
+    auditMap.put(OzoneConsts.TENANT, tenantId);
     auditMap.put(OzoneConsts.USER_PREFIX, prefix);
     try {
       String userName = getRemoteUser().getUserName();
-      if (!multiTenantManagr.isTenantAdmin(userName, tenantName)
+      if (!multiTenantManagr.isTenantAdmin(userName, tenantId)
           && !omAdminUsernames.contains(userName)) {
         throw new IOException("Only tenant and ozone admins can access this " +
             "API. '" + userName + "' is not an admin.");
       }
 
       final TenantUserList userList =
-          multiTenantManagr.listUsersInTenant(tenantName, prefix);
+          multiTenantManagr.listUsersInTenant(tenantId, prefix);
       AUDIT.logReadSuccess(buildAuditMessageForSuccess(
           OMAction.TENANT_LIST_USER, auditMap));
       return userList;
@@ -3258,16 +3264,20 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   @Override
   public OmVolumeArgs getS3Volume(String accessID) throws IOException {
 
-    String tenantId;
+    final String tenantId;
     try {
       tenantId = multiTenantManagr.getTenantForAccessID(accessID);
+      // TODO: Get volume name from DB. Do not assume the same. e.g.
+      //metadataManager.getTenantStateTable().get(tenantId)
+      //    .getBucketNamespaceName();
+      final String volumeName = tenantId;
       if (LOG.isDebugEnabled()) {
         LOG.debug("Get S3 volume request for access ID {} belonging to tenant" 
+
                 " {} is directed to the volume {}.", accessID, tenantId,
-            tenantId);  // TODO: Get volume name from DB. Do not assume the 
same
+            volumeName);
       }
       // This call performs acl checks and checks volume existence.
-      return getVolumeInfo(tenantId);
+      return getVolumeInfo(volumeName);
 
     } catch (OMException ex) {
       if (ex.getResult().equals(INVALID_ACCESSID)) {
@@ -4179,24 +4189,24 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   public boolean isTenantAdmin(UserGroupInformation callerUgi,
-      String tenantName, Boolean delegated) {
+                               String tenantId, Boolean delegated) {
     if (callerUgi == null) {
       return false;
     } else {
-      return isTenantAdmin(callerUgi.getShortUserName(), tenantName, delegated)
-          || isTenantAdmin(callerUgi.getUserName(), tenantName, delegated);
+      return isTenantAdmin(callerUgi.getShortUserName(), tenantId, delegated)
+          || isTenantAdmin(callerUgi.getUserName(), tenantId, delegated);
     }
   }
 
   /**
    * Returns true if user is a tenant's admin, false otherwise.
    * @param username User name string.
-   * @param tenantName Tenant name string.
+   * @param tenantId Tenant name string.
    * @param delegated True if operation requires delegated admin permission.
    */
   public boolean isTenantAdmin(String username,
-      String tenantName, Boolean delegated) {
-    if (StringUtils.isEmpty(username) || StringUtils.isEmpty(tenantName)) {
+      String tenantId, Boolean delegated) {
+    if (StringUtils.isEmpty(username) || StringUtils.isEmpty(tenantId)) {
       return false;
     }
 
@@ -4213,7 +4223,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       for (final String accessId : principalInfo.getAccessIds()) {
         final OmDBAccessIdInfo accessIdInfo =
             getMetadataManager().getTenantAccessIdTable().get(accessId);
-        if (tenantName.equals(accessIdInfo.getTenantId())) {
+        if (tenantId.equals(accessIdInfo.getTenantId())) {
           if (!delegated) {
             return accessIdInfo.getIsAdmin();
           } else {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 0ce987b..057c994 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -84,7 +84,6 @@ import 
org.apache.hadoop.ozone.om.request.s3.tenant.OMAssignUserToTenantRequest;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantAssignAdminRequest;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantCreateRequest;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantDeleteRequest;
-import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantModifyRequest;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantRevokeAdminRequest;
 import 
org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantRevokeUserAccessIdRequest;
 import 
org.apache.hadoop.ozone.om.request.security.OMCancelDelegationTokenRequest;
@@ -264,8 +263,6 @@ public final class OzoneManagerRatisUtils {
       return new S3RevokeSecretRequest(omRequest);
     case CreateTenant:
       return new OMTenantCreateRequest(omRequest);
-    case ModifyTenant:
-      return new OMTenantModifyRequest(omRequest);
     case DeleteTenant:
       return new OMTenantDeleteRequest(omRequest);
     case TenantAssignUserAccessId:
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
index d183015..423e18d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
@@ -146,35 +146,41 @@ public class OMSetSecretRequest extends OMClientRequest {
       final S3SecretValue newS3SecretValue;
       final OmDBAccessIdInfo newDBAccessIdInfo;
 
-      // Get accessId entry from multi-tenant TenantAccessIdTable
-      final OmDBAccessIdInfo omDBAccessIdInfo =
-          omMetadataManager.getTenantAccessIdTable().get(accessId);
-
-      // Check accessId existence in TenantAccessIdTable
-      if (omDBAccessIdInfo == null) {
-        // accessId doesn't exist in TenantAccessIdTable, check S3SecretTable
-        if (omMetadataManager.getS3SecretTable().get(accessId) == null) {
-          throw new OMException("accessId '" + accessId + "' not found.",
-              OMException.ResultCodes.ACCESSID_NOT_FOUND);
-        }
+      // Update legacy S3SecretTable, if the accessId entry exists
+      if (omMetadataManager.getS3SecretTable().get(accessId) == null) {
+        // S3SecretTable will be deprecated.
+        // It is acceptable to not have an accessId entry in it.
+        LOG.debug("accessId '{}' not found in S3SecretTable", accessId);
+        newS3SecretValue = null;
 
+      } else {
         // accessId found in S3SecretTable. Update S3SecretTable
         LOG.debug("Updating S3SecretTable cache entry");
         // Update S3SecretTable cache entry in this case
         newS3SecretValue = new S3SecretValue(accessId, secretKey);
-        newDBAccessIdInfo = null;
 
         omMetadataManager.getS3SecretTable().addCacheEntry(
             new CacheKey<>(accessId),
             new CacheValue<>(Optional.of(newS3SecretValue),
                 transactionLogIndex));
+      }
 
-      } else {
+      // Get accessId entry from multi-tenant TenantAccessIdTable
+      final OmDBAccessIdInfo omDBAccessIdInfo =
+          omMetadataManager.getTenantAccessIdTable().get(accessId);
+
+      // Check accessId existence in TenantAccessIdTable
+      if (omDBAccessIdInfo == null) {
+        // At some point we need to migrate entries from S3SecretTable
+        //  to TenantAccessIdTable, and S3SecretTable should eventually become
+        //  empty.
+        LOG.warn("accessId '{}' not found in TenantAccessIdTable", accessId);
+        newDBAccessIdInfo = null;
 
+      } else {
         // Update TenantAccessIdTable
         // Build new OmDBAccessIdInfo with updated secret
         LOG.debug("Updating TenantAccessIdTable cache entry");
-        newS3SecretValue = null;
         newDBAccessIdInfo = new OmDBAccessIdInfo.Builder()
             .setTenantId(omDBAccessIdInfo.getTenantId())
             .setKerberosPrincipal(omDBAccessIdInfo.getUserPrincipal())
@@ -190,6 +196,13 @@ public class OMSetSecretRequest extends OMClientRequest {
                 transactionLogIndex));
       }
 
+      // If neither S3SecretTable nor TenantAccessIdTable is updated, throw
+      //  ACCESSID_NOT_FOUND exception.
+      if (newS3SecretValue == null && newDBAccessIdInfo == null) {
+        throw new OMException("accessId '" + accessId + "' not found.",
+            OMException.ResultCodes.ACCESSID_NOT_FOUND);
+      }
+
       // Compose response
       final SetS3SecretResponse.Builder setSecretResponse =
           SetS3SecretResponse.newBuilder()
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMAssignUserToTenantRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMAssignUserToTenantRequest.java
index a2af9ca..ff74225 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMAssignUserToTenantRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMAssignUserToTenantRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -120,11 +121,13 @@ public class OMAssignUserToTenantRequest extends 
OMClientRequest {
     // Caller should be an Ozone admin or tenant delegated admin
     checkTenantAdmin(ozoneManager, tenantName);
 
-    // Note: Tenant username _is_ the Kerberos principal of the user
+    // Note: Tenant username _is_ the user principal (short name)
     final String tenantUsername = request.getTenantUsername();
     final String accessId = request.getAccessId();
 
-    // Check tenantUsername (user's Kerberos principal) validity. TODO: Check
+    // Check tenantUsername (user principal) validity.
+    // TODO: Rename tenantUsername to userPrincipal,
+    //  INVALID_TENANT_USER_NAME to INVALID_TENANT_USER_PRINCIPAL, ...
     if (tenantUsername.contains(OzoneConsts.TENANT_NAME_USER_NAME_DELIMITER)) {
       throw new OMException("Invalid tenant username '" + tenantUsername +
           "'. Tenant username shouldn't contain delimiter.",
@@ -191,8 +194,16 @@ public class OMAssignUserToTenantRequest extends 
OMClientRequest {
       // Undo Authorizer states established in preExecute
       ozoneManager.getMultiTenantManager().revokeUserAccessId(
           request.getAccessId());
+    } catch (IOException ioEx) {
+      final String userPrincipal = request.getTenantUsername();
+      final String tenantName = request.getTenantName();
+      final String accessId = request.getAccessId();
+      ozoneManager.getMultiTenantManager().removeUserAccessIdFromCache(
+          accessId, userPrincipal, tenantName);
     } catch (Exception e) {
       // TODO: Ignore for now. See OMTenantCreateRequest#handleRequestFailure
+      // TODO: Temporary solution for remnant tenantCache entry. Might becomes
+      //  useless with Ranger thread impl. Can remove.
     }
   }
 
@@ -218,23 +229,25 @@ public class OMAssignUserToTenantRequest extends 
OMClientRequest {
 
     final TenantAssignUserAccessIdRequest request =
         getOmRequest().getTenantAssignUserAccessIdRequest();
-    final String tenantName = request.getTenantName();
+    final String tenantId = request.getTenantName();
     final String principal = request.getTenantUsername();
 
     assert(accessId.equals(request.getAccessId()));
     IOException exception = null;
 
-    final String volumeName = OMTenantRequestHelper.getTenantVolumeName(
-        omMetadataManager, tenantName);
+    String volumeName = null;
 
     try {
+      volumeName = OMTenantRequestHelper.getTenantVolumeName(
+          omMetadataManager, tenantId);
+
       acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
           VOLUME_LOCK, volumeName);
 
       // Expect tenant existence in tenantStateTable
-      if (!omMetadataManager.getTenantStateTable().isExist(tenantName)) {
-        LOG.error("tenant {} doesn't exist", tenantName);
-        throw new OMException("tenant '" + tenantName + "' doesn't exist",
+      if (!omMetadataManager.getTenantStateTable().isExist(tenantId)) {
+        LOG.error("tenant {} doesn't exist", tenantId);
+        throw new OMException("tenant '" + tenantId + "' doesn't exist",
             OMException.ResultCodes.TENANT_NOT_FOUND);
       }
 
@@ -259,42 +272,22 @@ public class OMAssignUserToTenantRequest extends 
OMClientRequest {
                 + "Ignoring.", existingAccId);
             throw new NullPointerException("accessIdInfo is null");
           }
-          if (tenantName.equals(accessIdInfo.getTenantId())) {
+          if (tenantId.equals(accessIdInfo.getTenantId())) {
             throw new OMException("The same user is not allowed to be assigned 
"
                 + "to the same tenant more than once. User '" + principal
-                + "' is already assigned to tenant '" + tenantName + "' with "
+                + "' is already assigned to tenant '" + tenantId + "' with "
                 + "accessId '" + existingAccId + "'.",
                 OMException.ResultCodes.TENANT_USER_ACCESSID_ALREADY_EXISTS);
           }
         }
       }
 
-      // Add to S3SecretTable. TODO: Remove later.
-      acquiredS3SecretLock = omMetadataManager.getLock()
-          .acquireWriteLock(S3_SECRET_LOCK, accessId);
-
-      // Expect accessId absence from S3SecretTable
-      // TODO: This table might be merged with tenantAccessIdTable later.
-      if (omMetadataManager.getS3SecretTable().isExist(accessId)) {
-        LOG.error("accessId '{}' already exists in S3SecretTable", accessId);
-        throw new OMException("accessId '" + accessId +
-            "' already exists in S3SecretTable",
-            OMException.ResultCodes.INVALID_REQUEST);
-      }
-
       final S3SecretValue s3SecretValue =
           new S3SecretValue(accessId, awsSecret);
-      // Add S3SecretTable cache entry
-      omMetadataManager.getS3SecretTable().addCacheEntry(
-          new CacheKey<>(accessId),
-          new CacheValue<>(Optional.of(s3SecretValue), transactionLogIndex));
-
-      omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
-      acquiredS3SecretLock = false;
 
       // Add to tenantAccessIdTable
       final OmDBAccessIdInfo omDBAccessIdInfo = new OmDBAccessIdInfo.Builder()
-          .setTenantId(tenantName)
+          .setTenantId(tenantId)
           .setKerberosPrincipal(principal)
           .setSharedSecret(s3SecretValue.getAwsSecret())
           .setIsAdmin(false)
@@ -320,7 +313,7 @@ public class OMAssignUserToTenantRequest extends 
OMClientRequest {
       // Add to tenantGroupTable
       // TODO: DOUBLE CHECK GROUP NAME USAGE
       final String defaultGroupName =
-          tenantName + OzoneConsts.DEFAULT_TENANT_USER_GROUP_SUFFIX;
+          tenantId + OzoneConsts.DEFAULT_TENANT_USER_GROUP_SUFFIX;
       omMetadataManager.getTenantGroupTable().addCacheEntry(
           new CacheKey<>(accessId),
           new CacheValue<>(Optional.of(defaultGroupName), 
transactionLogIndex));
@@ -332,6 +325,28 @@ public class OMAssignUserToTenantRequest extends 
OMClientRequest {
           new CacheKey<>(accessId),
           new CacheValue<>(Optional.of(roleName), transactionLogIndex));
 
+      // Add to S3SecretTable.
+      // Note: S3SecretTable will be deprecated in the future.
+      acquiredS3SecretLock = omMetadataManager.getLock()
+          .acquireWriteLock(S3_SECRET_LOCK, accessId);
+
+      // Expect accessId absence from S3SecretTable
+      // TODO: This table might be merged with tenantAccessIdTable later.
+      if (omMetadataManager.getS3SecretTable().isExist(accessId)) {
+        LOG.error("accessId '{}' already exists in S3SecretTable", accessId);
+        throw new OMException("accessId '" + accessId +
+            "' already exists in S3SecretTable",
+            OMException.ResultCodes.INVALID_REQUEST);
+      }
+
+      // Add S3SecretTable cache entry
+      omMetadataManager.getS3SecretTable().addCacheEntry(
+          new CacheKey<>(accessId),
+          new CacheValue<>(Optional.of(s3SecretValue), transactionLogIndex));
+
+      omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
+      acquiredS3SecretLock = false;
+
       // Generate response
       omResponse.setTenantAssignUserAccessIdResponse(
           TenantAssignUserAccessIdResponse.newBuilder().setSuccess(true)
@@ -359,12 +374,13 @@ public class OMAssignUserToTenantRequest extends 
OMClientRequest {
         omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
       }
       if (acquiredVolumeLock) {
+        Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
     }
 
     // Audit
-    auditMap.put(OzoneConsts.TENANT, tenantName);
+    auditMap.put(OzoneConsts.TENANT, tenantId);
     auditMap.put("user", principal);
     auditMap.put("accessId", accessId);
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
@@ -373,11 +389,11 @@ public class OMAssignUserToTenantRequest extends 
OMClientRequest {
 
     if (exception == null) {
       LOG.info("Assigned user '{}' to tenant '{}' with accessId '{}'",
-          principal, tenantName, accessId);
+          principal, tenantId, accessId);
       // TODO: omMetrics.incNumTenantAssignUser()
     } else {
       LOG.error("Failed to assign '{}' to tenant '{}' with accessId '{}': {}",
-          principal, tenantName, accessId, exception.getMessage());
+          principal, tenantId, accessId, exception.getMessage());
       // TODO: Check if the exception message is sufficient.
       // TODO: omMetrics.incNumTenantAssignUserFails()
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
index 9e04d55..aebd924 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -158,16 +159,18 @@ public class OMTenantAssignAdminRequest extends 
OMClientRequest {
     final TenantAssignAdminRequest request =
         getOmRequest().getTenantAssignAdminRequest();
     final String accessId = request.getAccessId();
-    final String tenantName = request.getTenantName();
+    final String tenantId = request.getTenantName();
     final boolean delegated = request.getDelegated();
 
-    boolean acquiredVolumeLock = false;  // TODO: use tenant lock instead, 
maybe
+    boolean acquiredVolumeLock = false;
     IOException exception = null;
 
-    final String volumeName = OMTenantRequestHelper.getTenantVolumeName(
-        omMetadataManager, tenantName);
+    String volumeName = null;
 
     try {
+      volumeName = OMTenantRequestHelper.getTenantVolumeName(
+          omMetadataManager, tenantId);
+
       acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
           VOLUME_LOCK, volumeName);
 
@@ -179,7 +182,7 @@ public class OMTenantAssignAdminRequest extends 
OMClientRequest {
             + accessId + "'.", OMException.ResultCodes.METADATA_ERROR);
       }
 
-      assert(oldAccessIdInfo.getTenantId().equals(tenantName));
+      assert(oldAccessIdInfo.getTenantId().equals(tenantId));
 
       // Update tenantAccessIdTable
       final OmDBAccessIdInfo newOmDBAccessIdInfo =
@@ -195,7 +198,7 @@ public class OMTenantAssignAdminRequest extends 
OMClientRequest {
           new CacheValue<>(Optional.of(newOmDBAccessIdInfo),
               transactionLogIndex));
 
-      // Update tenantRoleTable?
+      // TODO: Update tenantRoleTable?
 //      final String roleName = "role_admin";
 //      omMetadataManager.getTenantRoleTable().addCacheEntry(
 //          new CacheKey<>(accessId),
@@ -221,24 +224,25 @@ public class OMTenantAssignAdminRequest extends 
OMClientRequest {
             .add(omClientResponse, transactionLogIndex));
       }
       if (acquiredVolumeLock) {
+        Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
     }
 
     // Audit
-    auditMap.put(OzoneConsts.TENANT, tenantName);
+    auditMap.put(OzoneConsts.TENANT, tenantId);
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
         OMAction.TENANT_ASSIGN_ADMIN, auditMap, exception,
         getOmRequest().getUserInfo()));
 
     if (exception == null) {
       LOG.info("Assigned admin to accessId '{}' in tenant '{}', "
-              + "delegated: {}", accessId, tenantName, delegated);
+              + "delegated: {}", accessId, tenantId, delegated);
       // TODO: omMetrics.incNumTenantAssignAdmin()
     } else {
       LOG.error("Failed to assign admin to accessId '{}' in tenant '{}', "
               + "delegated: {}: {}",
-          accessId, tenantName, delegated, exception.getMessage());
+          accessId, tenantId, delegated, exception.getMessage());
       // TODO: omMetrics.incNumTenantAssignAdminFails()
     }
     return omClientResponse;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
index d0c0de4..1d4777f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -76,7 +77,7 @@ import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_L
   - tenantStateTable: New entry
     - Key: tenant name. e.g. finance
     - Value: new OmDBTenantInfo for the tenant
-      - tenantName: finance
+      - tenantId: finance
       - bucketNamespaceName: finance
       - accountNamespaceName: finance
       - userPolicyGroupName: finance-users
@@ -110,12 +111,13 @@ public class OMTenantCreateRequest extends 
OMVolumeRequest {
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    final CreateTenantRequest request = 
getOmRequest().getCreateTenantRequest();
-    final String tenantId = request.getTenantName();
 
-    // Check Ozone admin privilege
+    // Check Ozone cluster admin privilege
     OMTenantRequestHelper.checkAdmin(ozoneManager);
 
+    final CreateTenantRequest request = 
getOmRequest().getCreateTenantRequest();
+    final String tenantId = request.getTenantName();
+
     // Check tenantId validity
     if (tenantId.contains(OzoneConsts.TENANT_NAME_USER_NAME_DELIMITER)) {
       throw new OMException("Invalid tenant name " + tenantId +
@@ -127,7 +129,7 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
     if (ozoneManager.getMetadataManager().getTenantStateTable()
         .isExist(tenantId)) {
       LOG.debug("tenant: {} already exists", tenantId);
-      throw new OMException("Tenant already exists",
+      throw new OMException("Tenant '" + tenantId + "' already exists",
           TENANT_ALREADY_EXISTS);
     }
 
@@ -141,7 +143,8 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
     // A caveat is that this assumes OM's auth_to_local is the same as
     //  the client's. Maybe move this logic to the client and pass VolumeArgs?
     final String owner = ugi.getShortUserName();
-    final String volumeName = tenantId;  // TODO: Configurable
+    // Volume name defaults to tenant name if unspecified in the request
+    final String volumeName = request.getVolumeName();
     // Validate volume name
     OmUtils.validateVolumeName(volumeName);
     // TODO: Refactor this and OMVolumeCreateRequest to improve 
maintainability.
@@ -150,8 +153,6 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
         .setAdminName(owner)
         .setOwnerName(owner)
         .build();
-    // Verify volume name
-    OmUtils.validateVolumeName(volumeInfo.getVolume());
 
     // TODO: Shall we check volume existence here as well?
 
@@ -180,6 +181,7 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
                 .setTenantName(tenantId))
         .setCreateVolumeRequest(
             CreateVolumeRequest.newBuilder().setVolumeInfo(updatedVolumeInfo))
+        // TODO: Can the three lines below be ignored?
         .setUserInfo(getUserInfo())
         .setCmdType(getOmRequest().getCmdType())
         .setClientId(getOmRequest().getClientId());
@@ -213,14 +215,16 @@ public class OMTenantCreateRequest extends 
OMVolumeRequest {
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
     OMClientResponse omClientResponse = null;
-    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
-        getOmRequest());
+    final OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
     OmVolumeArgs omVolumeArgs;
-    boolean acquiredVolumeLock = false, acquiredUserLock = false;
+    boolean acquiredVolumeLock = false;
+    boolean acquiredUserLock = false;
+    boolean acquiredTenantLock = false;
     final String owner = getOmRequest().getUserInfo().getUserName();
     Map<String, String> auditMap = new HashMap<>();
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    CreateTenantRequest request = getOmRequest().getCreateTenantRequest();
+    final CreateTenantRequest request = 
getOmRequest().getCreateTenantRequest();
     final String tenantId = request.getTenantName();
     final VolumeInfo volumeInfo =
         getOmRequest().getCreateVolumeRequest().getVolumeInfo();
@@ -231,8 +235,7 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
     final String tenantDefaultPolicies = request.getTenantDefaultPolicyName();
 
     try {
-      // Check ACL: requires volume create permission.
-      // TODO: do we need a tenant create permission ? probably not
+      // Check ACL: requires volume CREATE permission.
       if (ozoneManager.getAclsEnabled()) {
         checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
             OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.CREATE,
@@ -241,20 +244,50 @@ public class OMTenantCreateRequest extends 
OMVolumeRequest {
 
       acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
           VOLUME_LOCK, volumeName);
+
       // Check volume existence
       if (omMetadataManager.getVolumeTable().isExist(volumeName)) {
-        LOG.debug("volume: {} already exists", volumeName);
+        LOG.debug("volume: '{}' already exists", volumeName);
         throw new OMException("Volume already exists", VOLUME_ALREADY_EXISTS);
       }
+
+      // Create volume
+      acquiredUserLock = 
omMetadataManager.getLock().acquireWriteLock(USER_LOCK,
+          owner);
+
+      // TODO: dedup OMVolumeCreateRequest
+      omVolumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo);
+      omVolumeArgs.setObjectID(
+          ozoneManager.getObjectIdFromTxId(transactionLogIndex));
+      omVolumeArgs.setUpdateID(transactionLogIndex,
+          ozoneManager.isRatisEnabled());
+      // Set volume reference count to 1
+      omVolumeArgs.incRefCount();
+      Preconditions.checkState(omVolumeArgs.getRefCount() == 1,
+          "refCount should have been set to 1");
+      // Audit
+      auditMap = omVolumeArgs.toAuditMap();
+
+      PersistedUserVolumeInfo volumeList;
+      final String dbUserKey = omMetadataManager.getUserKey(owner);
+      volumeList = omMetadataManager.getUserTable().get(dbUserKey);
+      volumeList = addVolumeToOwnerList(volumeList, volumeName, owner,
+          ozoneManager.getMaxUserVolumeCount(), transactionLogIndex);
+      createVolume(omMetadataManager, omVolumeArgs, volumeList, dbVolumeKey,
+          dbUserKey, transactionLogIndex);
+      LOG.debug("volume: '{}' successfully created", dbVolumeKey);
+
+
       // Check tenant existence in tenantStateTable
       if (omMetadataManager.getTenantStateTable().isExist(tenantId)) {
-        LOG.debug("tenant: {} already exists", tenantId);
+        LOG.debug("tenant: '{}' already exists", tenantId);
         throw new OMException("Tenant already exists", TENANT_ALREADY_EXISTS);
       }
 
+      // Create tenant
       // Add to tenantStateTable. Redundant assignment for clarity
-      final String bucketNamespaceName = tenantId;
-      final String accountNamespaceName = volumeName;
+      final String bucketNamespaceName = volumeName;
+      final String accountNamespaceName = tenantId;  // TODO: Double check
       final String userPolicyGroupName =
           tenantId + OzoneConsts.DEFAULT_TENANT_USER_POLICY_SUFFIX;
       final String bucketPolicyGroupName =
@@ -277,28 +310,6 @@ public class OMTenantCreateRequest extends OMVolumeRequest 
{
           new CacheKey<>(bucketPolicyGroupName),
           new CacheValue<>(Optional.of(bucketPolicyId), transactionLogIndex));
 
-      // Create volume
-      acquiredUserLock = 
omMetadataManager.getLock().acquireWriteLock(USER_LOCK,
-          owner);
-
-      // TODO: dedup OMVolumeCreateRequest
-      omVolumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo);
-      omVolumeArgs.setObjectID(
-          ozoneManager.getObjectIdFromTxId(transactionLogIndex));
-      omVolumeArgs.setUpdateID(transactionLogIndex,
-          ozoneManager.isRatisEnabled());
-      // Audit
-      auditMap = omVolumeArgs.toAuditMap();
-
-      PersistedUserVolumeInfo volumeList;
-      String dbUserKey = omMetadataManager.getUserKey(owner);
-      volumeList = omMetadataManager.getUserTable().get(dbUserKey);
-      volumeList = addVolumeToOwnerList(volumeList, volumeName, owner,
-          ozoneManager.getMaxUserVolumeCount(), transactionLogIndex);
-      createVolume(omMetadataManager, omVolumeArgs, volumeList, dbVolumeKey,
-          dbUserKey, transactionLogIndex);
-      LOG.debug("volume:{} successfully created", dbVolumeKey);
-
       omResponse.setCreateTenantResponse(
           CreateTenantResponse.newBuilder().setSuccess(true).build()
       );
@@ -338,6 +349,7 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
         omMetadataManager.getLock().releaseWriteLock(USER_LOCK, owner);
       }
       if (acquiredVolumeLock) {
+        Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
     }
@@ -348,6 +360,10 @@ public class OMTenantCreateRequest extends OMVolumeRequest 
{
     auditLog(ozoneManager.getAuditLogger(),
         buildAuditMessage(OMAction.CREATE_TENANT, auditMap, exception,
             getOmRequest().getUserInfo()));
+    // Log CREATE_VOLUME as well since a volume is created
+    auditLog(ozoneManager.getAuditLogger(),
+        buildAuditMessage(OMAction.CREATE_VOLUME, auditMap, exception,
+            getOmRequest().getUserInfo()));
 
     if (exception == null) {
       LOG.info("Created tenant '{}' and volume '{}'", tenantId, volumeName);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
index 679e002..ff88449 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
@@ -18,18 +18,45 @@
  */
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantDeleteResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_EMPTY;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 
 /**
  * Handles OMTenantDelete request.
  */
 public class OMTenantDeleteRequest extends OMVolumeRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMTenantDeleteRequest.class);
 
   public OMTenantDeleteRequest(OMRequest omRequest) {
     super(omRequest);
@@ -37,14 +64,158 @@ public class OMTenantDeleteRequest extends OMVolumeRequest 
{
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    return getOmRequest();
+
+    // Check Ozone cluster admin privilege
+    OMTenantRequestHelper.checkAdmin(ozoneManager);
+
+    // TODO: TBD: Call ozoneManager.getMultiTenantManager().deleteTenant() ?
+
+    return getOmRequest().toBuilder().setUserInfo(getUserInfo()).build();
   }
 
   @Override
+  @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
-    return null;
+    OMClientResponse omClientResponse = null;
+    final OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    boolean acquiredVolumeLock = false;
+    final Map<String, String> auditMap = new HashMap<>();
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    final DeleteTenantRequest request = 
getOmRequest().getDeleteTenantRequest();
+    final String tenantId = request.getTenantId();
+    String volumeName = null;
+    boolean decVolumeRefCount = true;
+
+    IOException exception = null;
+    OmVolumeArgs omVolumeArgs = null;
+
+    try {
+      // Check tenant existence in tenantStateTable
+      if (!omMetadataManager.getTenantStateTable().isExist(tenantId)) {
+        LOG.debug("tenant: {} does not exist", tenantId);
+        throw new OMException("Tenant '" + tenantId + "' does not exist",
+            TENANT_NOT_FOUND);
+      }
+
+      // Reading the TenantStateTable without lock as we don't have or need
+      // a TENANT_LOCK. The assumption is that OmDBTenantInfo is read-only
+      // once it is set during tenant creation.
+      final OmDBTenantInfo dbTenantInfo =
+          omMetadataManager.getTenantStateTable().get(tenantId);
+      volumeName = dbTenantInfo.getBucketNamespaceName();
+      assert(volumeName != null);
+
+      LOG.debug("Tenant '{}' has volume '{}'", tenantId, volumeName);
+      // decVolumeRefCount is true if volumeName is not empty string
+      decVolumeRefCount = volumeName.length() > 0;
+
+      // Acquire the volume lock
+      acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
+          VOLUME_LOCK, volumeName);
+
+      // Check if there are any accessIds in the tenant
+      if (!OMTenantRequestHelper.isTenantEmpty(omMetadataManager, tenantId)) {
+        LOG.warn("tenant: '{}' is not empty. Unable to delete the tenant",
+            tenantId);
+        throw new OMException("Tenant '" + tenantId + "' is not empty. " +
+            "All accessIds associated to this tenant must be revoked before " +
+            "the tenant can be deleted. See `ozone tenant user revoke`",
+            TENANT_NOT_EMPTY);
+      }
+
+      // Invalidate cache entries
+      omMetadataManager.getTenantStateTable().addCacheEntry(
+          new CacheKey<>(tenantId),
+          new CacheValue<>(Optional.absent(), transactionLogIndex));
+
+      final String userPolicyGroupName = dbTenantInfo.getUserPolicyGroupName();
+      omMetadataManager.getTenantPolicyTable().addCacheEntry(
+          new CacheKey<>(userPolicyGroupName),
+          new CacheValue<>(Optional.absent(), transactionLogIndex));
+
+      final String bucketPolicyGroupName =
+          dbTenantInfo.getBucketPolicyGroupName();
+      omMetadataManager.getTenantPolicyTable().addCacheEntry(
+          new CacheKey<>(bucketPolicyGroupName),
+          new CacheValue<>(Optional.absent(), transactionLogIndex));
+
+      // Decrement volume refCount
+      if (decVolumeRefCount) {
+        // Check Acl
+        if (ozoneManager.getAclsEnabled()) {
+          checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+              OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE_ACL,
+              volumeName, null, null);
+        }
+
+        omVolumeArgs = getVolumeInfo(omMetadataManager, volumeName);
+        // Decrement volume ref count
+        omVolumeArgs.decRefCount();
+
+        // Update omVolumeArgs
+        final String dbVolumeKey = omMetadataManager.getVolumeKey(volumeName);
+        omMetadataManager.getVolumeTable().addCacheEntry(
+            new CacheKey<>(dbVolumeKey),
+            new CacheValue<>(Optional.of(omVolumeArgs), transactionLogIndex));
+
+        // TODO: Set response dbVolumeKey?
+      }
+
+      // Compose response
+
+      // If decVolumeRefCount is false, return -1 to the client, otherwise
+      // return the actual volume refCount. Note if the actual volume refCount
+      // becomes negative somehow, omVolumeArgs.decRefCount() would have thrown
+      // earlier.
+      final DeleteTenantResponse.Builder deleteTenantResponse =
+          DeleteTenantResponse.newBuilder()
+              .setVolumeName(volumeName)
+              .setVolRefCount(omVolumeArgs == null ? -1 :
+                  omVolumeArgs.getRefCount());
+
+      omClientResponse = new OMTenantDeleteResponse(
+          omResponse.setDeleteTenantResponse(deleteTenantResponse).build(),
+          volumeName, omVolumeArgs, tenantId, userPolicyGroupName,
+          bucketPolicyGroupName);
+
+    } catch (IOException ex) {
+      exception = ex;
+      omClientResponse = new OMTenantDeleteResponse(
+          createErrorOMResponse(omResponse, exception));
+    } finally {
+      addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
+          ozoneManagerDoubleBufferHelper);
+      if (acquiredVolumeLock) {
+        Preconditions.checkNotNull(volumeName);
+        omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
+      }
+    }
+
+    // Perform audit logging
+    auditMap.put(OzoneConsts.TENANT, tenantId);
+    // Audit volume ref count update
+    if (decVolumeRefCount) {
+      auditLog(ozoneManager.getAuditLogger(),
+          buildAuditMessage(OMAction.UPDATE_VOLUME,
+              buildVolumeAuditMap(volumeName),
+              exception, getOmRequest().getUserInfo()));
+    }
+    // Audit tenant deletion
+    auditLog(ozoneManager.getAuditLogger(),
+        buildAuditMessage(OMAction.DELETE_TENANT,
+            auditMap, exception, getOmRequest().getUserInfo()));
+
+    if (exception == null) {
+      LOG.info("Deleted tenant '{}' and volume '{}'", tenantId, volumeName);
+      // TODO: omMetrics.decNumTenants()
+    } else {
+      LOG.error("Failed to delete tenant '{}'", tenantId, exception);
+      // TODO: omMetrics.incNumTenantDeleteFails()
+    }
+    return omClientResponse;
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantModifyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantModifyRequest.java
deleted file mode 100644
index e75e4ad..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantModifyRequest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.hadoop.ozone.om.request.s3.tenant;
-
-import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
-import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
-import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-
-import java.io.IOException;
-
-/**
- * Handles OMTenantModify request.
- */
-public class OMTenantModifyRequest extends OMVolumeRequest {
-
-  public OMTenantModifyRequest(OMRequest omRequest) {
-    super(omRequest);
-  }
-
-  @Override
-  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    return getOmRequest();
-  }
-
-  @Override
-  public OMClientResponse validateAndUpdateCache(
-      OzoneManager ozoneManager, long transactionLogIndex,
-      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
-
-    return null;
-  }
-}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRequestHelper.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRequestHelper.java
index 190861f..2d67bd3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRequestHelper.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRequestHelper.java
@@ -19,15 +19,21 @@
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantUserAccessId;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Utility class that contains helper methods for OM tenant requests.
@@ -38,8 +44,8 @@ public final class OMTenantRequestHelper {
   }
 
   /**
-   * Passes check only when caller is an Ozone admin,
-   * throws OMException otherwise.
+   * Passes check only when caller is an Ozone (cluster) admin, throws
+   * OMException otherwise.
    * @throws OMException PERMISSION_DENIED
    */
   static void checkAdmin(OzoneManager ozoneManager) throws OMException {
@@ -94,26 +100,23 @@ public final class OMTenantRequestHelper {
    * Retrieve volume name of the tenant.
    */
   static String getTenantVolumeName(OMMetadataManager omMetadataManager,
-      String tenantName) {
+      String tenantId) throws IOException {
 
-    final OmDBTenantInfo tenantInfo;
-    try {
-      tenantInfo = omMetadataManager.getTenantStateTable().get(tenantName);
-    } catch (IOException e) {
-      throw new RuntimeException("Potential DB error. Unable to retrieve "
-          + "OmDBTenantInfo entry for tenant '" + tenantName + "'.");
-    }
+    final OmDBTenantInfo tenantInfo =
+        omMetadataManager.getTenantStateTable().get(tenantId);
 
     if (tenantInfo == null) {
-      throw new RuntimeException("Potential DB error or race condition. "
-          + "OmDBTenantInfo entry is missing for tenant '" + tenantName + 
"'.");
+      throw new OMException("Potential DB error or race condition. "
+          + "OmDBTenantInfo entry is missing for tenant '" + tenantId + "'.",
+          ResultCodes.TENANT_NOT_FOUND);
     }
 
-    final String volumeName = tenantInfo.getAccountNamespaceName();
+    final String volumeName = tenantInfo.getBucketNamespaceName();
 
-    if (StringUtils.isEmpty(tenantName)) {
-      throw new RuntimeException("Potential DB error. volumeName "
-          + "field is null or empty for tenantId '" + tenantName + "'.");
+    if (volumeName == null) {
+      throw new OMException("Potential DB error. volumeName "
+          + "field is null for tenantId '" + tenantId + "'.",
+          ResultCodes.VOLUME_NOT_FOUND);
     }
 
     return volumeName;
@@ -182,4 +185,55 @@ public final class OMTenantRequestHelper {
     return false;
   }
 
+  /**
+   * Scans (Slow!) TenantAccessIdTable for the given tenantId.
+   * Returns true if the tenant doesn't have any accessIds assigned to it
+   * (i.e. the tenantId is not found in this table for any existing accessIds);
+   * Returns false otherwise.
+   *
+   * @param metadataManager
+   * @param tenantId
+   * @return
+   * @throws IOException
+   */
+  static boolean isTenantEmpty(OMMetadataManager metadataManager,
+                               String tenantId) throws IOException {
+
+    // TODO: Do we need to iterate cache here as well? Very cumbersome if so.
+    //  This helper function is a placeholder for the isTenantEmpty check,
+    //  once tenantCache/Ranger is fixed this will be removed.
+    try (TableIterator<String,
+        ? extends Table.KeyValue<String, OmDBAccessIdInfo>> iter =
+             metadataManager.getTenantAccessIdTable().iterator()) {
+      while (iter.hasNext()) {
+        final OmDBAccessIdInfo accessIdInfo = iter.next().getValue();
+        if (accessIdInfo.getTenantId().equals(tenantId)) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Return true if the tenant doesn't have any accessIds assigned to it,
+   * false otherwise. Uses in-memory mapping tenantCache which can be seen as
+   * a reverse-mapping of tenantAccessIdTable (Fast).
+   * @param tenantManager
+   * @param tenantId
+   * @return
+   * @throws IOException
+   */
+  static boolean isTenantEmpty(OMMultiTenantManager tenantManager,
+                               String tenantId) throws IOException {
+    // TODO: OMMultiTenantManager#listUsersInTenant relies on the tenantCache
+    //  mapping which I believe is only updated on leader node in preExecute
+    //  (apart from it being populated on OM startup) right now.
+    //  So unless tenantCache is updated on follower nodes later as well,
+    //  we can't use listUsersInTenant to check tenant emptiness in followers.
+    final List<TenantUserAccessId> tenantUserAccessIdsList =
+        tenantManager.listUsersInTenant(tenantId, "").getUserAccessIds();
+    return tenantUserAccessIdsList.size() == 0;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
index d441a54..06f7c72 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -139,15 +140,17 @@ public class OMTenantRevokeAdminRequest extends 
OMClientRequest {
     final TenantRevokeAdminRequest request =
         getOmRequest().getTenantRevokeAdminRequest();
     final String accessId = request.getAccessId();
-    final String tenantName = request.getTenantName();
+    final String tenantId = request.getTenantName();
 
     boolean acquiredVolumeLock = false;  // TODO: use tenant lock instead, 
maybe
     IOException exception = null;
 
-    final String volumeName = OMTenantRequestHelper.getTenantVolumeName(
-        omMetadataManager, tenantName);
+    String volumeName = null;
 
     try {
+      volumeName = OMTenantRequestHelper.getTenantVolumeName(
+          omMetadataManager, tenantId);
+
       acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
           VOLUME_LOCK, volumeName);
 
@@ -159,7 +162,7 @@ public class OMTenantRevokeAdminRequest extends 
OMClientRequest {
             + accessId + "'.", OMException.ResultCodes.METADATA_ERROR);
       }
 
-      assert(oldAccessIdInfo.getTenantId().equals(tenantName));
+      assert(oldAccessIdInfo.getTenantId().equals(tenantId));
 
       // Update tenantAccessIdTable
       final OmDBAccessIdInfo newOmDBAccessIdInfo =
@@ -203,23 +206,24 @@ public class OMTenantRevokeAdminRequest extends 
OMClientRequest {
             .add(omClientResponse, transactionLogIndex));
       }
       if (acquiredVolumeLock) {
+        Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
     }
 
     // Audit
-    auditMap.put(OzoneConsts.TENANT, tenantName);
+    auditMap.put(OzoneConsts.TENANT, tenantId);
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
         OMAction.TENANT_REVOKE_ADMIN, auditMap, exception,
         getOmRequest().getUserInfo()));
 
     if (exception == null) {
       LOG.info("Revoked admin of accessId '{}' from tenant '{}'",
-          accessId, tenantName);
+          accessId, tenantId);
       // TODO: omMetrics.incNumTenantRevokeAdmin()
     } else {
       LOG.error("Failed to revoke admin of accessId '{}' from tenant '{}': {}",
-          accessId, tenantName, exception.getMessage());
+          accessId, tenantId, exception.getMessage());
       // TODO: omMetrics.incNumTenantRevokeAdminFails()
     }
     return omClientResponse;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
index 28808a3..c9436fb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.request.s3.tenant;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -145,30 +146,23 @@ public class OMTenantRevokeUserAccessIdRequest extends 
OMClientRequest {
     final TenantRevokeUserAccessIdRequest request =
         getOmRequest().getTenantRevokeUserAccessIdRequest();
     final String accessId = request.getAccessId();
-    final String tenantName = request.getTenantName();
+    final String tenantId = request.getTenantName();
 
-    boolean acquiredVolumeLock = false;
     boolean acquiredS3SecretLock = false;
+    boolean acquiredVolumeLock = false;
     IOException exception = null;
 
-    final String volumeName = OMTenantRequestHelper.getTenantVolumeName(
-        omMetadataManager, tenantName);
     String userPrincipal = null;
 
+    String volumeName = null;
+
     try {
+      volumeName = OMTenantRequestHelper.getTenantVolumeName(
+          omMetadataManager, tenantId);
+
       acquiredVolumeLock =
           omMetadataManager.getLock().acquireWriteLock(VOLUME_LOCK, 
volumeName);
 
-      // Remove from S3SecretTable. TODO: Remove later.
-      acquiredS3SecretLock = omMetadataManager.getLock()
-          .acquireWriteLock(S3_SECRET_LOCK, accessId);
-      omMetadataManager.getS3SecretTable().addCacheEntry(
-          new CacheKey<>(accessId),
-          new CacheValue<>(Optional.absent(), transactionLogIndex));
-      omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK,
-          accessId);
-      acquiredS3SecretLock = false;
-
       // Remove accessId from principalToAccessIdsTable
       OmDBAccessIdInfo omDBAccessIdInfo =
           omMetadataManager.getTenantAccessIdTable().get(accessId);
@@ -201,6 +195,15 @@ public class OMTenantRevokeUserAccessIdRequest extends 
OMClientRequest {
           new CacheKey<>(accessId),
           new CacheValue<>(Optional.absent(), transactionLogIndex));
 
+      // Remove from S3SecretTable.
+      // Note: S3SecretTable will be deprecated in the future.
+      acquiredS3SecretLock = omMetadataManager.getLock()
+          .acquireWriteLock(S3_SECRET_LOCK, accessId);
+
+      omMetadataManager.getS3SecretTable().addCacheEntry(
+          new CacheKey<>(accessId),
+          new CacheValue<>(Optional.absent(), transactionLogIndex));
+
       // Generate response
       omResponse.setTenantRevokeUserAccessIdResponse(
           
TenantRevokeUserAccessIdResponse.newBuilder().setSuccess(true).build()
@@ -225,25 +228,26 @@ public class OMTenantRevokeUserAccessIdRequest extends 
OMClientRequest {
         omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
       }
       if (acquiredVolumeLock) {
+        Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
     }
 
     // Audit
-    auditMap.put(OzoneConsts.TENANT, tenantName);
+    auditMap.put(OzoneConsts.TENANT, tenantId);
     auditMap.put("accessId", accessId);
-    auditMap.put("user", userPrincipal);
+    auditMap.put("userPrincipal", userPrincipal);
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
         OMAction.TENANT_REVOKE_USER_ACCESSID, auditMap, exception,
         getOmRequest().getUserInfo()));
 
     if (exception == null) {
       LOG.info("Revoked user '{}' accessId '{}' to tenant '{}'",
-          userPrincipal, accessId, tenantName);
+          userPrincipal, accessId, tenantId);
       // TODO: omMetrics.incNumTenantRevokeUser()
     } else {
       LOG.error("Failed to revoke user '{}' accessId '{}' to tenant '{}': {}",
-          userPrincipal, accessId, tenantName, exception.getMessage());
+          userPrincipal, accessId, tenantId, exception.getMessage());
       // TODO: omMetrics.incNumTenantRevokeUserFails()
     }
     return omClientResponse;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
index 3ece840..5cc92ea 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
@@ -100,6 +100,16 @@ public class OMVolumeDeleteRequest extends OMVolumeRequest 
{
 
       OmVolumeArgs omVolumeArgs = getVolumeInfo(omMetadataManager, volume);
 
+      // Check reference count
+      final long volRefCount = omVolumeArgs.getRefCount();
+      if (volRefCount != 0L) {
+        LOG.debug("volume: {} has a non-zero ref count. won't delete", volume);
+        throw new OMException("Volume reference count is not zero (" +
+            volRefCount + "). Ozone features are enabled on this volume. " +
+            "Try `ozone tenant delete <tenantId>` first.",
+            OMException.ResultCodes.VOLUME_IS_REFERENCED);
+      }
+
       owner = omVolumeArgs.getOwnerName();
       acquiredUserLock = 
omMetadataManager.getLock().acquireWriteLock(USER_LOCK,
           owner);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantCreateResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantCreateResponse.java
index 3fa2485..abca5c5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantCreateResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantCreateResponse.java
@@ -79,9 +79,9 @@ public class OMTenantCreateResponse extends OMClientResponse {
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
-    final String tenantName = omTenantInfo.getTenantId();
+    final String tenantId = omTenantInfo.getTenantId();
     omMetadataManager.getTenantStateTable().putWithBatch(
-        batchOperation, tenantName, omTenantInfo);
+        batchOperation, tenantId, omTenantInfo);
 
     final String userPolicyGroupName =
         omTenantInfo.getUserPolicyGroupName();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantDeleteResponse.java
new file mode 100644
index 0000000..becbb16
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantDeleteResponse.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.tenant;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.TENANT_POLICY_TABLE;
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.TENANT_STATE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
+
+/**
+ * Response for DeleteTenant request.
+ */
+@CleanupTableInfo(cleanupTables = {
+    TENANT_STATE_TABLE,
+    TENANT_POLICY_TABLE,
+    VOLUME_TABLE
+})
+public class OMTenantDeleteResponse extends OMClientResponse {
+
+  private String volumeName;
+  private OmVolumeArgs omVolumeArgs;
+  private String tenantId;
+  private String userPolicyGroupName;
+  private String bucketPolicyGroupName;
+
+  public OMTenantDeleteResponse(@Nonnull OMResponse omResponse,
+                                @Nonnull String volumeName,
+                                @Nullable OmVolumeArgs omVolumeArgs,
+                                @Nonnull String tenantId,
+                                @Nonnull String userPolicyGroupName,
+                                @Nonnull String bucketPolicyGroupName) {
+    super(omResponse);
+    this.volumeName = volumeName;
+    this.omVolumeArgs = omVolumeArgs;
+    this.tenantId = tenantId;
+    this.userPolicyGroupName = userPolicyGroupName;
+    this.bucketPolicyGroupName = bucketPolicyGroupName;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMTenantDeleteResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
+  @Override
+  protected void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    omMetadataManager.getTenantStateTable().deleteWithBatch(
+        batchOperation, tenantId);
+
+    omMetadataManager.getTenantPolicyTable().deleteWithBatch(
+        batchOperation, userPolicyGroupName);
+
+    omMetadataManager.getTenantPolicyTable().deleteWithBatch(
+        batchOperation, bucketPolicyGroupName);
+
+    if (volumeName.length() > 0) {
+      Preconditions.checkNotNull(omVolumeArgs);
+      Preconditions.checkState(omVolumeArgs.getVolume().equals(volumeName));
+
+      final String dbVolumeKey = omMetadataManager.getVolumeKey(volumeName);
+      omMetadataManager.getVolumeTable().putWithBatch(batchOperation,
+          dbVolumeKey, omVolumeArgs);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
index b33a6af..ebcdb52 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.security.acl;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -227,6 +228,8 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
   }
 
   private boolean isAdmin(UserGroupInformation callerUgi) {
+    Preconditions.checkNotNull(callerUgi, "callerUgi should not be null!");
+
     if (ozAdmins == null) {
       return false;
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
index 5fe57ff..a2b9587 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
@@ -154,6 +154,7 @@ public class TestS3GetSecretRequest {
         .setCreateTenantRequest(
             CreateTenantRequest.newBuilder()
                 .setTenantName(tenantNameStr)
+                .setVolumeName(tenantNameStr)
                 .build()
         ).build();
   }
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantAssignAdminHandler.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantAssignAdminHandler.java
index 1e05e32..2029dec 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantAssignAdminHandler.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantAssignAdminHandler.java
@@ -39,9 +39,6 @@ import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERM
     description = "Assign admin role to accessIds in a tenant")
 public class TenantAssignAdminHandler extends TenantHandler {
 
-  @CommandLine.Spec
-  private CommandLine.Model.CommandSpec spec;
-
   @CommandLine.Parameters(description = "List of accessIds", arity = "1..")
   private List<String> accessIds = new ArrayList<>();
 
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantCreateHandler.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantCreateHandler.java
index 86d9fbe..abbcedd 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantCreateHandler.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantCreateHandler.java
@@ -17,42 +17,33 @@
  */
 package org.apache.hadoop.ozone.shell.tenant;
 
-import org.apache.hadoop.hdds.cli.GenericCli;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.shell.OzoneAddress;
 import picocli.CommandLine;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * ozone tenant create.
  */
 @CommandLine.Command(name = "create",
-    description = "Create one or more tenants")
+    description = "Create a tenant."
+        + " This will also create a new Ozone volume for the tenant.")
 public class TenantCreateHandler extends TenantHandler {
 
-  @CommandLine.Spec
-  private CommandLine.Model.CommandSpec spec;
-
-  @CommandLine.Parameters(description = "List of tenant names")
-  private List<String> tenants = new ArrayList<>();
+  @CommandLine.Parameters(description = "Tenant name", arity = "1..1")
+  private String tenantId;
 
   @Override
-  protected void execute(OzoneClient client, OzoneAddress address) {
-    if (tenants.size() > 0) {
-      for (String tenantName : tenants) {
-        try {
-          client.getObjectStore().createTenant(tenantName);
-          out().println("Created tenant '" + tenantName + "'.");
-        } catch (IOException e) {
-          err().println("Failed to create tenant '" + tenantName + "': " +
-              e.getMessage());
-        }
-      }
-    } else {
-      GenericCli.missingSubcommand(spec);
+  protected void execute(OzoneClient client, OzoneAddress address)
+      throws IOException{
+    try {
+      client.getObjectStore().createTenant(tenantId);
+      // TODO: Add return value and print volume name?
+      out().println("Created tenant '" + tenantId + "'.");
+    } catch (IOException e) {
+      // Throw exception to make client exit code non-zero
+      throw new IOException("Failed to create tenant '" + tenantId + "'", e);
     }
   }
 }
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantDeleteHandler.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantDeleteHandler.java
index a1f2503..1f8067d 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantDeleteHandler.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantDeleteHandler.java
@@ -18,18 +18,45 @@
 package org.apache.hadoop.ozone.shell.tenant;
 
 import org.apache.hadoop.ozone.client.OzoneClient;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantResponse;
 import org.apache.hadoop.ozone.shell.OzoneAddress;
 import picocli.CommandLine;
 
+import java.io.IOException;
+
 /**
  * ozone tenant delete.
  */
[email protected](name = "delete",
-    description = "Delete a tenant")
[email protected](name = "delete", aliases = "remove",
+    description = "Delete an empty tenant. "
+        + "Will not remove the associated volume.")
 public class TenantDeleteHandler extends TenantHandler {
 
+  @CommandLine.Parameters(description = "Tenant name", arity = "1..1")
+  private String tenantId;
+
   @Override
-  protected void execute(OzoneClient client, OzoneAddress address) {
-    err().println("Not Implemented.");
+  protected void execute(OzoneClient client, OzoneAddress address)
+      throws IOException {
+    try {
+      final DeleteTenantResponse resp =
+          client.getObjectStore().deleteTenant(tenantId);
+      out().println("Deleted tenant '" + tenantId + "'.");
+      long volumeRefCount = resp.getVolRefCount();
+      assert(volumeRefCount >= 0L);
+      final String volumeName = resp.getVolumeName();
+      final String extraPrompt =
+          "But the associated volume '" + volumeName + "' is not removed. ";
+      if (volumeRefCount == 0L) {
+        out().println(extraPrompt + "To delete it, run"
+            + "\n    ozone sh volume delete " + volumeName + "\n");
+      } else {
+        out().println(extraPrompt + "And it is still referenced by some other "
+            + "Ozone features (refCount is " + volumeRefCount + ").");
+      }
+    } catch (IOException e) {
+      // Throw exception to make client exit code non-zero
+      throw new IOException("Failed to delete tenant '" + tenantId + "'", e);
+    }
   }
 }
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantSetSecretHandler.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantSetSecretHandler.java
index b1b6b49..9c3e9ff 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantSetSecretHandler.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantSetSecretHandler.java
@@ -68,7 +68,8 @@ public class TenantSetSecretHandler extends TenantHandler {
       if (omEx.getResult().equals(ACCESSID_NOT_FOUND)) {
         // Print to stderr here in order not to contaminate stdout just in
         // case -e is specified.
-        err().println("AccessId '" + accessId + "' doesn't exist");
+        throw new IOException("AccessId '" + accessId + "' doesn't exist",
+            omEx);
       } else {
         throw omEx;
       }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to