This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new a699e6a95 KUDU-3385 Integrate Kudu with Ranger KMS
a699e6a95 is described below

commit a699e6a95e32459467270b389633d1fe2f815145
Author: Attila Bukor <[email protected]>
AuthorDate: Sat Jul 2 00:22:56 2022 +0200

    KUDU-3385 Integrate Kudu with Ranger KMS
    
    To make data at rest encryption actually secure, the cluster key needs
    to be generated and stored securely. Hadoop/Ranger KMS was chosen for
    this purpose, which are API compatible.
    
    This patch implements a Ranger KMS key provider and the Ranger client,
    and also wraps up initialization of keys and policies in MiniRanger and
    MiniRangerKMS. KeyProvider interface and its default mplementation had
    to be moved into fs/ from server/ to avoid introducing circular
    dependencies.
    
    It also removes the 'hidden' tag from the encryption-related flags as it
    is now safe to use.
    
    Change-Id: I681bed35f44cd03c1db69847c04faa460745f206
    Reviewed-on: http://gerrit.cloudera.org:8080/18692
    Reviewed-by: Alexey Serbin <[email protected]>
    Tested-by: Attila Bukor <[email protected]>
---
 build-support/dist_test.py                         |   1 +
 build-support/run_dist_test.py                     |  10 +-
 src/kudu/consensus/raft_consensus_quorum-test.cc   |  10 +-
 src/kudu/fs/CMakeLists.txt                         |   7 +-
 .../{server => fs}/default_key_provider-test.cc    |  12 +-
 src/kudu/{server => fs}/default_key_provider.h     |  39 +++-
 src/kudu/fs/fs.proto                               |   8 +-
 src/kudu/fs/fs_manager.cc                          | 100 +++++++--
 src/kudu/fs/fs_manager.h                           |  19 +-
 src/kudu/{server => fs}/key_provider.h             |  10 +-
 .../ranger_kms_key_provider.cc}                    |  30 +--
 .../ranger_kms_key_provider.h}                     |  28 ++-
 .../integration-tests/master_failover-itest.cc     |  15 ++
 .../integration-tests/master_migration-itest.cc    |  17 ++
 src/kudu/integration-tests/raft_consensus-itest.cc |   8 +-
 src/kudu/integration-tests/security-itest.cc       |  11 +
 src/kudu/mini-cluster/external_mini_cluster.cc     |  49 ++++-
 src/kudu/mini-cluster/external_mini_cluster.h      |  24 ++-
 src/kudu/mini-cluster/internal_mini_cluster.cc     |  10 +-
 src/kudu/ranger-kms/CMakeLists.txt                 |  14 +-
 src/kudu/ranger-kms/mini_ranger_kms.cc             | 226 ++++++++++++++++-----
 src/kudu/ranger-kms/mini_ranger_kms.h              |  16 +-
 src/kudu/ranger-kms/mini_ranger_kms_configs.h      |  71 ++++++-
 src/kudu/ranger-kms/ranger_kms_client.cc           | 113 +++++++++++
 .../ranger_kms_client.h}                           |  27 ++-
 src/kudu/ranger/mini_ranger.cc                     |  12 +-
 src/kudu/ranger/mini_ranger.h                      |  26 ++-
 src/kudu/ranger/mini_ranger_configs.h              |  13 ++
 src/kudu/server/CMakeLists.txt                     |   1 -
 src/kudu/server/server_base.cc                     |   5 +-
 src/kudu/server/server_base_options.cc             |  17 +-
 src/kudu/server/server_base_options.h              |   2 +
 src/kudu/tools/kudu-admin-test.cc                  |   4 +
 src/kudu/tools/kudu-tool-test.cc                   |  40 +++-
 src/kudu/tools/tool_action_common.cc               |  26 ++-
 src/kudu/tools/tool_action_fs.cc                   |  29 ++-
 src/kudu/tools/tool_action_master.cc               |  25 ++-
 src/kudu/tserver/tablet_copy_client-test.cc        |  10 +-
 src/kudu/tserver/tablet_server-test-base.cc        |   6 +-
 src/kudu/util/env_posix.cc                         |   6 +-
 src/kudu/util/test_util.cc                         |  16 +-
 src/kudu/util/test_util.h                          |   4 +-
 42 files changed, 901 insertions(+), 216 deletions(-)

diff --git a/build-support/dist_test.py b/build-support/dist_test.py
index 00d4c786a..432050a14 100755
--- a/build-support/dist_test.py
+++ b/build-support/dist_test.py
@@ -112,6 +112,7 @@ DEPS_FOR_ALL = \
      "build/latest/bin/postgres-lib",
      "build/latest/bin/postgres-share",
      "build/latest/bin/postgresql.jar",
+     "build/latest/bin/ranger_kms-home",
      "build/latest/bin/ranger-home",
 
      # Add the Kudu HMS plugin.
diff --git a/build-support/run_dist_test.py b/build-support/run_dist_test.py
index 8d8a5a577..0a3ceeb82 100755
--- a/build-support/run_dist_test.py
+++ b/build-support/run_dist_test.py
@@ -150,7 +150,8 @@ def main():
   # are used in mini_hms.cc and mini_ranger.cc.
   env['HIVE_HOME'] = glob.glob(os.path.join(ROOT, "thirdparty/src/hive-*"))[0]
   env['HADOOP_HOME'] = glob.glob(os.path.join(ROOT, 
"thirdparty/src/hadoop-*"))[0]
-  env['RANGER_HOME'] = glob.glob(os.path.join(ROOT, 
"thirdparty/src/ranger-*"))[0]
+  env['RANGER_HOME'] = glob.glob(os.path.join(ROOT, 
"thirdparty/src/ranger-*-admin"))[0]
+  env['RANGER_KMS_HOME'] = glob.glob(os.path.join(ROOT, 
"thirdparty/src/ranger-*-kms"))[0]
   env['JAVA_HOME'] = glob.glob("/usr/lib/jvm/java-1.8.0-*")[0]
 
   # Restore the symlinks to the chrony binaries and Postgres and Ranger
@@ -169,8 +170,10 @@ def main():
                os.path.join(bin_path, "postgres-share"))
     os.symlink(glob.glob(os.path.join(ROOT, 
"thirdparty/src/postgresql-*/postgresql-*.jar"))[0],
                os.path.join(bin_path, "postgresql.jar"))
-    os.symlink(glob.glob(os.path.join(ROOT, "thirdparty/src/ranger-*"))[0],
+    os.symlink(glob.glob(os.path.join(ROOT, 
"thirdparty/src/ranger-*-admin"))[0],
                os.path.join(bin_path, "ranger-home"))
+    os.symlink(glob.glob(os.path.join(ROOT, "thirdparty/src/ranger-*-kms"))[0],
+               os.path.join(bin_path, "ranger_kms-home"))
     os.symlink(os.path.join(ROOT, "thirdparty/installed/common/opt/hadoop"),
                os.path.join(bin_path, "hadoop-home"))
     # When building Ranger, we symlink conf.dist to conf. Overwrite the link we
@@ -178,6 +181,9 @@ def main():
     os.unlink(os.path.join(bin_path, 
"ranger-home/ews/webapp/WEB-INF/classes/conf"))
     os.symlink(os.path.join(bin_path, 
"ranger-home/ews/webapp/WEB-INF/classes/conf.dist"),
                os.path.join(bin_path, 
"ranger-home/ews/webapp/WEB-INF/classes/conf"))
+    os.unlink(os.path.join(bin_path, 
"ranger_kms-home/ews/webapp/WEB-INF/classes/conf"))
+    os.symlink(os.path.join(bin_path, 
"ranger_kms-home/ews/webapp/WEB-INF/classes/conf.dist"),
+               os.path.join(bin_path, 
"ranger_kms-home/ews/webapp/WEB-INF/classes/conf"))
 
   env['LD_LIBRARY_PATH'] = ":".join(
     [os.path.join(ROOT, "build/dist-test-system-libs/")] +
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc 
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 92251b21f..0a06db858 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -134,11 +134,17 @@ class RaftConsensusQuorumTest : public KuduTest {
       opts.wal_root = test_path;
       opts.data_roots = { test_path };
       unique_ptr<FsManager> fs_manager(new FsManager(env_, opts));
-      string server_key = GetEncryptionKey();
+      string server_key;
+      string server_key_iv;
+      string server_key_version;
+      GetEncryptionKey(&server_key, &server_key_iv, &server_key_version);
       if (server_key.empty()) {
         RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout());
       } else {
-        RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout(nullopt, 
server_key));
+        RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout(nullopt,
+                                                                server_key,
+                                                                server_key_iv,
+                                                                
server_key_version));
       }
       RETURN_NOT_OK(fs_manager->Open());
 
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 514935efe..2a880cf9b 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -36,12 +36,14 @@ add_library(kudu_fs
   file_block_manager.cc
   fs_manager.cc
   fs_report.cc
-  log_block_manager.cc)
+  log_block_manager.cc
+  ranger_kms_key_provider.cc)
 
 target_link_libraries(kudu_fs
   fs_proto
   kudu_util
-  gutil)
+  gutil
+  ranger_kms_client)
 
 add_library(kudu_fs_test_util
   log_block_manager-test-util.cc)
@@ -57,6 +59,7 @@ SET_KUDU_TEST_LINK_LIBS(kudu_fs kudu_fs_test_util)
 ADD_KUDU_TEST(block_manager-test)
 ADD_KUDU_TEST(block_manager-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(data_dirs-test)
+ADD_KUDU_TEST(default_key_provider-test)
 ADD_KUDU_TEST(dir_util-test)
 ADD_KUDU_TEST(error_manager-test)
 ADD_KUDU_TEST(fs_manager-test)
diff --git a/src/kudu/server/default_key_provider-test.cc 
b/src/kudu/fs/default_key_provider-test.cc
similarity index 80%
copy from src/kudu/server/default_key_provider-test.cc
copy to src/kudu/fs/default_key_provider-test.cc
index 22d0b7f07..fa9700f8f 100644
--- a/src/kudu/server/default_key_provider-test.cc
+++ b/src/kudu/fs/default_key_provider-test.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/server/default_key_provider.h"
+#include "kudu/fs/default_key_provider.h"
 
 #include <string>
 
@@ -35,13 +35,13 @@ class DefaultKeyProviderTest : public KuduTest {
 };
 
 TEST_F(DefaultKeyProviderTest, TestEncryptAndDecrypt) {
-  string key = "foo";
   string encrypted_key;
+  string iv;
+  string version;
   string decrypted_key;
-  ASSERT_OK(key_provider_.EncryptServerKey(key, &encrypted_key));
-  ASSERT_OK(key_provider_.DecryptServerKey(encrypted_key, &decrypted_key));
-  ASSERT_NE(key, encrypted_key);
-  ASSERT_EQ(key, decrypted_key);
+  ASSERT_OK(key_provider_.GenerateEncryptedServerKey(&encrypted_key, &iv, 
&version));
+  ASSERT_OK(key_provider_.DecryptServerKey(encrypted_key, iv, version, 
&decrypted_key));
+  ASSERT_NE(encrypted_key, decrypted_key);
 }
 
 } // namespace security
diff --git a/src/kudu/server/default_key_provider.h 
b/src/kudu/fs/default_key_provider.h
similarity index 53%
rename from src/kudu/server/default_key_provider.h
rename to src/kudu/fs/default_key_provider.h
index 33bab8098..35eb4e15a 100644
--- a/src/kudu/server/default_key_provider.h
+++ b/src/kudu/fs/default_key_provider.h
@@ -18,8 +18,11 @@
 #pragma once
 
 #include <string>
+#include <openssl/rand.h>
 
-#include "kudu/server/key_provider.h"
+#include "kudu/fs/key_provider.h"
+#include "kudu/gutil/strings/escaping.h"
+#include "kudu/util/openssl_util.h"
 
 namespace kudu {
 namespace security {
@@ -28,24 +31,40 @@ class DefaultKeyProvider : public KeyProvider {
 public:
   ~DefaultKeyProvider() override {}
   Status DecryptServerKey(const std::string& encrypted_server_key,
+                          const std::string& /*iv*/,
+                          const std::string& /*key_version*/,
                           std::string* server_key) override {
-    return EncryptServerKey(encrypted_server_key, server_key);
-  }
-
-  Status EncryptServerKey(const std::string& server_key,
-                          std::string* encrypted_server_key) override {
-    *encrypted_server_key = server_key;
+    *server_key = strings::a2b_hex(encrypted_server_key);
 #ifdef __linux__
-    memfrob(encrypted_server_key->data(), server_key.length());
+    memfrob(server_key->data(), server_key->length());
 #else
     // On Linux, memfrob() bitwise XORs the data with the magic number that is
     // the answer to the ultimate question of life, the universe, and
     // everything. On Mac, we do this manually.
     const uint8_t kMagic = 42;
-    for (auto i = 0; i < server_key.length(); ++i) {
-      encrypted_server_key->data()[i] ^= kMagic;
+    for (auto i = 0; i < server_key->length(); ++i) {
+      server_key->data()[i] ^= kMagic;
     }
 #endif
+    *server_key = strings::b2a_hex(*server_key);
+    return Status::OK();
+  }
+
+  Status GenerateEncryptedServerKey(std::string* server_key,
+                                    std::string* iv,
+                                    std::string* key_version) override {
+    uint8_t key_bytes[32];
+    uint8_t iv_bytes[32];
+    int num_bytes = 16;
+    std::string dek;
+    OPENSSL_RET_NOT_OK(RAND_bytes(key_bytes, num_bytes),
+                       "Failed to generate random key");
+    strings::b2a_hex(key_bytes, &dek, num_bytes);
+    OPENSSL_RET_NOT_OK(RAND_bytes(iv_bytes, num_bytes),
+                       "Failed to generate random key");
+    strings::b2a_hex(iv_bytes, iv, num_bytes);
+    DecryptServerKey(dek, *iv, *key_version, server_key);
+    *key_version = "clusterkey@0";
     return Status::OK();
   }
 };
diff --git a/src/kudu/fs/fs.proto b/src/kudu/fs/fs.proto
index 19c0051cd..849aa591a 100644
--- a/src/kudu/fs/fs.proto
+++ b/src/kudu/fs/fs.proto
@@ -36,7 +36,13 @@ message InstanceMetadataPB {
   required string format_stamp = 2;
 
   // Encrypted server key used to encrypt/decrypt file keys on this server.
-  optional bytes server_key = 3;
+  optional string server_key = 3;
+
+  // Initialization vector for the server key.
+  optional string server_key_iv = 4;
+
+  // Server key version.
+  optional string server_key_version = 5;
 
   // TODO: add a "node type" (TS/Master?)
 }
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 375616042..68df1e3ca 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -22,7 +22,6 @@
 #include <functional>
 #include <initializer_list>
 #include <iostream>
-#include <openssl/rand.h>
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
@@ -32,11 +31,14 @@
 
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/default_key_provider.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
+#include "kudu/fs/ranger_kms_key_provider.h"
+#include "kudu/fs/key_provider.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
@@ -48,15 +50,13 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/gutil/walltime.h"
-#include "kudu/server/default_key_provider.h"
-#include "kudu/server/key_provider.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/oid_generator.h"
-#include "kudu/util/openssl_util.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -128,6 +128,35 @@ METRIC_DEFINE_gauge_int64(server, 
log_block_manager_containers_processing_time_s
                           "files during the startup",
                           kudu::MetricLevel::kDebug);
 
+DEFINE_string(encryption_key_provider, "default",
+              "Key provider implementation to generate and decrypt server 
keys. "
+              "Valid values are: 'default' (not for production usage), and 
'ranger-kms'.");
+
+DEFINE_validator(encryption_key_provider, [](const char* /*n*/, const 
std::string& value) {
+  return value == "default" || value == "ranger-kms";
+});
+
+DEFINE_string(ranger_kms_url, "",
+              "URL of the Ranger KMS server. Must be set when 
'encryption_key_provider' "
+              "is set to 'ranger-kms'.");
+
+DEFINE_string(encryption_cluster_key_name, "kudu_cluster_key",
+              "Name of the cluster key that is used to encrypt server 
encryption keys as "
+              "stored in Ranger KMS.");
+
+bool ValidateRangerKMSFlags() {
+  if (FLAGS_encryption_key_provider == "ranger-kms") {
+    if (FLAGS_ranger_kms_url.empty() || 
FLAGS_encryption_cluster_key_name.empty()) {
+      LOG(ERROR) << "If 'encryption_key_provider' is set to 'ranger-kms', then 
"
+                    "'ranger_kms_url' and 'encryption_cluster_key_name' must 
also be set.";
+      return false;
+    }
+  }
+  return true;
+}
+
+GROUP_FLAG_VALIDATOR(validate_ranger_kms_flags, ValidateRangerKMSFlags);
+
 DECLARE_bool(encrypt_data_at_rest);
 DECLARE_int32(encryption_key_length);
 
@@ -146,6 +175,7 @@ using kudu::fs::UpdateInstanceBehavior;
 using kudu::fs::WritableBlock;
 using kudu::pb_util::SecureDebugString;
 using kudu::security::DefaultKeyProvider;
+using kudu::security::RangerKMSKeyProvider;
 using std::optional;
 using std::ostream;
 using std::string;
@@ -153,6 +183,7 @@ using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
+using strings::a2b_hex;
 using strings::Substitute;
 
 namespace kudu {
@@ -197,7 +228,12 @@ FsManager::FsManager(Env* env, FsManagerOpts opts)
   DCHECK(opts_.update_instances == UpdateInstanceBehavior::DONT_UPDATE ||
          !opts_.read_only) << "FsManager can only be for updated if not in 
read-only mode";
   if (FLAGS_encrypt_data_at_rest) {
-    key_provider_.reset(new DefaultKeyProvider());
+    if (FLAGS_encryption_key_provider == "ranger-kms") {
+      key_provider_.reset(new RangerKMSKeyProvider(FLAGS_ranger_kms_url,
+                                                   
FLAGS_encryption_cluster_key_name));
+    } else {
+      key_provider_.reset(new DefaultKeyProvider());
+    }
   }
 }
 
@@ -465,11 +501,14 @@ Status FsManager::Open(FsReport* report, Timer* 
read_instance_metadata_files,
 
   if (!server_key().empty() && key_provider_) {
     string server_key;
-    RETURN_NOT_OK(key_provider_->DecryptServerKey(this->server_key(), 
&server_key));
+    RETURN_NOT_OK(key_provider_->DecryptServerKey(this->server_key(),
+                                                  this->server_key_iv(),
+                                                  this->server_key_version(),
+                                                  &server_key));
     // 'server_key' is a hexadecimal string and SetEncryptionKey expects bits
     // (hex / 2 = bytes * 8 = bits).
     env_->SetEncryptionKey(reinterpret_cast<const uint8_t*>(
-                             strings::a2b_hex(server_key).c_str()),
+                             a2b_hex(server_key).c_str()),
                            server_key.length() * 4);
   }
 
@@ -553,7 +592,9 @@ Status FsManager::Open(FsReport* report, Timer* 
read_instance_metadata_files,
 }
 
 Status FsManager::CreateInitialFileSystemLayout(optional<string> uuid,
-                                                optional<string> server_key) {
+                                                optional<string> server_key,
+                                                optional<string> server_key_iv,
+                                                optional<string> 
server_key_version) {
   CHECK(!opts_.read_only);
 
   RETURN_NOT_OK(Init());
@@ -577,7 +618,11 @@ Status 
FsManager::CreateInitialFileSystemLayout(optional<string> uuid,
   //
   // Files/directories created will NOT be synchronized to disk.
   InstanceMetadataPB metadata;
-  RETURN_NOT_OK_PREPEND(CreateInstanceMetadata(std::move(uuid), 
std::move(server_key), &metadata),
+  RETURN_NOT_OK_PREPEND(CreateInstanceMetadata(std::move(uuid),
+                                               std::move(server_key),
+                                               std::move(server_key_iv),
+                                               std::move(server_key_version),
+                                               &metadata),
                         "unable to create instance metadata");
   RETURN_NOT_OK_PREPEND(FsManager::CreateFileSystemRoots(
       canonicalized_all_fs_roots_, metadata, &created_dirs, &created_files),
@@ -673,6 +718,8 @@ Status FsManager::CreateFileSystemRoots(
 
 Status FsManager::CreateInstanceMetadata(optional<string> uuid,
                                          optional<string> server_key,
+                                         optional<string> server_key_iv,
+                                         optional<string> server_key_version,
                                          InstanceMetadataPB* metadata) {
   if (uuid) {
     string canonicalized_uuid;
@@ -681,19 +728,22 @@ Status FsManager::CreateInstanceMetadata(optional<string> 
uuid,
   } else {
     metadata->set_uuid(oid_generator_.Next());
   }
-  if (server_key) {
-    RETURN_NOT_OK(key_provider_->EncryptServerKey(*server_key,
-                                                  
metadata->mutable_server_key()));
+  if (server_key && server_key_iv && server_key_version) {
+    metadata->set_server_key(*server_key);
+    metadata->set_server_key_iv(*server_key_iv);
+    metadata->set_server_key_version(*server_key_version);
+  } else if (server_key || server_key_iv || server_key_version) {
+    return Status::InvalidArgument(
+        "'server_key', 'server_key_iv', and 'server_key_version' must be 
specified "
+        "together (either all of them must be specified, or none of them).");
   } else if (FLAGS_encrypt_data_at_rest) {
-    uint8_t key_bytes[32];
-    int num_bytes = FLAGS_encryption_key_length / 8;
-    DCHECK(num_bytes <= sizeof(key_bytes));
-    OPENSSL_RET_NOT_OK(RAND_bytes(key_bytes, num_bytes),
-                       "Failed to generate random key");
-    string plain_server_key;
-    strings::b2a_hex(key_bytes, &plain_server_key, num_bytes);
-    RETURN_NOT_OK(key_provider_->EncryptServerKey(plain_server_key,
-                                                  
metadata->mutable_server_key()));
+    string key_version;
+    RETURN_NOT_OK_PREPEND(
+        
key_provider_->GenerateEncryptedServerKey(metadata->mutable_server_key(),
+                                                  
metadata->mutable_server_key_iv(),
+                                                  &key_version),
+        "failed to generate encrypted server key");
+    metadata->set_server_key_version(key_version);
   }
 
   string time_str;
@@ -730,6 +780,14 @@ const string& FsManager::server_key() const {
   return CHECK_NOTNULL(metadata_.get())->server_key();
 }
 
+const string& FsManager::server_key_iv() const {
+  return CHECK_NOTNULL(metadata_.get())->server_key_iv();
+}
+
+const string& FsManager::server_key_version() const {
+  return CHECK_NOTNULL(metadata_.get())->server_key_version();
+}
+
 vector<string> FsManager::GetDataRootDirs() const {
   // Get the data subdirectory for each data root.
   return dd_manager_->GetDirs();
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 705e90104..30a15626d 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -205,14 +205,17 @@ class FsManager {
               std::atomic<int>* containers_total = nullptr );
 
   // Create the initial filesystem layout. If 'uuid' is provided, uses it as
-  // uuid of the filesystem. Otherwise generates one at random. If 'server_key'
-  // is provided, it is used as the server key of the filesystem. Otherwise, if
-  // encryption is enabled, generates one at random.
+  // uuid of the filesystem. Otherwise generates one at random. If 
'server_key',
+  // 'server_key_iv', and 'server_key_version' are provided, they are used as
+  // the server key of the filesystem. Otherwise, if encryption is enabled,
+  // generates one at random.
   //
   // Returns an error if the file system is already initialized.
   Status CreateInitialFileSystemLayout(
       std::optional<std::string> uuid = std::nullopt,
-      std::optional<std::string> server_key = std::nullopt);
+      std::optional<std::string> server_key = std::nullopt,
+      std::optional<std::string> server_key_iv = std::nullopt,
+      std::optional<std::string> server_key_version = std::nullopt);
 
   // ==========================================================================
   //  Error handling helpers
@@ -303,6 +306,12 @@ class FsManager {
   // crash. If the file system is not encrypted, it returns an empty string.
   const std::string& server_key() const;
 
+  // Return the initialization vector for the server key.
+  const std::string& server_key_iv() const;
+
+  // Return the version of the server key.
+  const std::string& server_key_version() const;
+
   // ==========================================================================
   //  file-system helpers
   // ==========================================================================
@@ -364,6 +373,8 @@ class FsManager {
   // Create a new InstanceMetadataPB.
   Status CreateInstanceMetadata(std::optional<std::string> uuid,
                                 std::optional<std::string> server_key,
+                                std::optional<std::string> server_key_iv,
+                                std::optional<std::string> server_key_version,
                                 InstanceMetadataPB* metadata);
 
   // Save a InstanceMetadataPB to the filesystem.
diff --git a/src/kudu/server/key_provider.h b/src/kudu/fs/key_provider.h
similarity index 76%
copy from src/kudu/server/key_provider.h
copy to src/kudu/fs/key_provider.h
index 0c52282f7..b5a558402 100644
--- a/src/kudu/server/key_provider.h
+++ b/src/kudu/fs/key_provider.h
@@ -31,11 +31,15 @@ class KeyProvider {
 
   // Decrypts the server key.
   virtual Status DecryptServerKey(const std::string& encrypted_server_key,
+                                  const std::string& iv,
+                                  const std::string& key_version,
                                   std::string* server_key) = 0;
 
-  // Encrypts the server key.
-  virtual Status EncryptServerKey(const std::string& server_key,
-                                  std::string* encrypted_server_key) = 0;
+  // Generates an encrypted server key.
+  virtual Status GenerateEncryptedServerKey(std::string* encrypted_server_key,
+                                            std::string* iv,
+                                            std::string* key_version) = 0;
+
 };
 } // namespace security
 } // namespace kudu
diff --git a/src/kudu/server/default_key_provider-test.cc 
b/src/kudu/fs/ranger_kms_key_provider.cc
similarity index 54%
rename from src/kudu/server/default_key_provider-test.cc
rename to src/kudu/fs/ranger_kms_key_provider.cc
index 22d0b7f07..d0beb493f 100644
--- a/src/kudu/server/default_key_provider-test.cc
+++ b/src/kudu/fs/ranger_kms_key_provider.cc
@@ -15,34 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/server/default_key_provider.h"
-
 #include <string>
 
-#include <gtest/gtest.h>
-
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
+#include "kudu/fs/ranger_kms_key_provider.h"
 
 using std::string;
 
 namespace kudu {
 namespace security {
 
-class DefaultKeyProviderTest : public KuduTest {
- protected:
-  DefaultKeyProvider key_provider_;
-};
-
-TEST_F(DefaultKeyProviderTest, TestEncryptAndDecrypt) {
-  string key = "foo";
-  string encrypted_key;
-  string decrypted_key;
-  ASSERT_OK(key_provider_.EncryptServerKey(key, &encrypted_key));
-  ASSERT_OK(key_provider_.DecryptServerKey(encrypted_key, &decrypted_key));
-  ASSERT_NE(key, encrypted_key);
-  ASSERT_EQ(key, decrypted_key);
+Status RangerKMSKeyProvider::DecryptServerKey(const std::string& 
encrypted_server_key,
+                                              const std::string& iv,
+                                              const std::string& key_version,
+                                              std::string* server_key) {
+  return client_.DecryptKey(encrypted_server_key, iv, key_version, server_key);
 }
 
+Status RangerKMSKeyProvider::GenerateEncryptedServerKey(std::string* 
encrypted_server_key,
+                                                        std::string* iv,
+                                                        std::string* 
key_version) {
+  return client_.GenerateEncryptedServerKey(encrypted_server_key, iv, 
key_version);
+}
 } // namespace security
 } // namespace kudu
diff --git a/src/kudu/server/key_provider.h 
b/src/kudu/fs/ranger_kms_key_provider.h
similarity index 54%
copy from src/kudu/server/key_provider.h
copy to src/kudu/fs/ranger_kms_key_provider.h
index 0c52282f7..eef22b3c2 100644
--- a/src/kudu/server/key_provider.h
+++ b/src/kudu/fs/ranger_kms_key_provider.h
@@ -18,24 +18,34 @@
 #pragma once
 
 #include <string>
+#include <utility>
 
+#include "kudu/fs/key_provider.h"
+#include "kudu/ranger-kms/ranger_kms_client.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 namespace security {
-
-// An interface for encrypting and decrypting Kudu's server keys.
-class KeyProvider {
+class RangerKMSKeyProvider : public KeyProvider {
  public:
-  virtual ~KeyProvider() = default;
+  ~RangerKMSKeyProvider() override {}
+
+  RangerKMSKeyProvider(std::string kms_url, std::string cluster_key_name)
+      : client_(std::move(kms_url), std::move(cluster_key_name)) {}
 
   // Decrypts the server key.
-  virtual Status DecryptServerKey(const std::string& encrypted_server_key,
-                                  std::string* server_key) = 0;
+  Status DecryptServerKey(const std::string& encrypted_server_key,
+                          const std::string& iv,
+                          const std::string& key_version,
+                          std::string* server_key) override;
+
+  // Generates an encrypted server key.
+  Status GenerateEncryptedServerKey(std::string* encrypted_server_key,
+                                    std::string* iv,
+                                    std::string* key_version) override;
 
-  // Encrypts the server key.
-  virtual Status EncryptServerKey(const std::string& server_key,
-                                  std::string* encrypted_server_key) = 0;
+ private:
+  RangerKMSClient client_;
 };
 } // namespace security
 } // namespace kudu
diff --git a/src/kudu/integration-tests/master_failover-itest.cc 
b/src/kudu/integration-tests/master_failover-itest.cc
index 95ea9a25b..cf79eb35f 100644
--- a/src/kudu/integration-tests/master_failover-itest.cc
+++ b/src/kudu/integration-tests/master_failover-itest.cc
@@ -38,6 +38,7 @@
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/master/sys_catalog.h" // IWYU pragma: keep
 #include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/util/env.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h" // IWYU pragma: keep
@@ -369,6 +370,10 @@ TEST_P(MasterFailoverTest, TestMasterUUIDResolution) {
 TEST_P(MasterFailoverTest, TestMasterPermanentFailure) {
   const string kBinPath = cluster_->GetBinaryPath("kudu");
   Random r(SeedRandom());
+  string encryption_args;
+  if (Env::Default()->IsEncryptionEnabled()) {
+    encryption_args = "--encrypt_data_at_rest=1";
+  }
 
   // Repeat the test for each master.
   for (int i = 0; i < cluster_->num_masters(); i++) {
@@ -398,6 +403,10 @@ TEST_P(MasterFailoverTest, TestMasterPermanentFailure) {
           "--fs_data_dirs=" + other_master->data_dir(),
           master::SysCatalogTable::kSysCatalogTabletId
       };
+      if (!encryption_args.empty()) {
+        args.emplace_back(encryption_args);
+      }
+
       string output;
       ASSERT_OK(Subprocess::Call(args, "", &output));
       StripWhiteSpace(&output);
@@ -426,6 +435,9 @@ TEST_P(MasterFailoverTest, TestMasterPermanentFailure) {
           "--fs_data_dirs=" + failed_master->data_dir(),
           "--uuid=" + uuid
       };
+      if (!encryption_args.empty()) {
+        args.emplace_back(encryption_args);
+      }
       ASSERT_OK(Subprocess::Call(args));
     }
 
@@ -440,6 +452,9 @@ TEST_P(MasterFailoverTest, TestMasterPermanentFailure) {
           master::SysCatalogTable::kSysCatalogTabletId,
           other_master->bound_rpc_hostport().ToString()
       };
+      if (!encryption_args.empty()) {
+        args.emplace_back(encryption_args);
+      }
       ASSERT_OK(Subprocess::Call(args));
     }
 
diff --git a/src/kudu/integration-tests/master_migration-itest.cc 
b/src/kudu/integration-tests/master_migration-itest.cc
index afa33c869..aa146a490 100644
--- a/src/kudu/integration-tests/master_migration-itest.cc
+++ b/src/kudu/integration-tests/master_migration-itest.cc
@@ -120,6 +120,11 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) {
   // List of every master UUIDs.
   vector<string> uuids = { cluster->master()->uuid() };
 
+  string encryption_flags;
+  if (Env::Default()->IsEncryptionEnabled()) {
+    encryption_flags = "--encrypt_data_at_rest=true";
+  }
+
   // Format a filesystem tree for each of the new masters and get the uuids.
   for (int i = 1; i < kNumMasters; i++) {
     string data_root = cluster->GetDataPath(Substitute("master-$0", i));
@@ -134,6 +139,9 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) {
           "--fs_wal_dir=" + wal_dir,
           "--fs_data_dirs=" + data_root
       };
+      if (!encryption_flags.empty()) {
+        args.emplace_back(encryption_flags);
+      }
       ASSERT_OK(Subprocess::Call(args));
     }
     {
@@ -145,6 +153,9 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) {
           "--fs_wal_dir=" + wal_dir,
           "--fs_data_dirs=" + data_root
       };
+      if (!encryption_flags.empty()) {
+        args.emplace_back(encryption_flags);
+      }
       string uuid;
       ASSERT_OK(Subprocess::Call(args, "", &uuid));
       StripWhiteSpace(&uuid);
@@ -164,6 +175,9 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) {
         "--fs_data_dirs=" + data_root,
         SysCatalogTable::kSysCatalogTabletId
     };
+    if (!encryption_flags.empty()) {
+      args.emplace_back(encryption_flags);
+    }
     for (int i = 0; i < kNumMasters; i++) {
       args.emplace_back(Substitute("$0:$1", uuids[i], 
master_rpc_addresses[i].ToString()));
     }
@@ -193,6 +207,9 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) {
         SysCatalogTable::kSysCatalogTabletId,
         cluster->master()->bound_rpc_hostport().ToString()
     };
+    if (!encryption_flags.empty()) {
+      args.emplace_back(encryption_flags);
+    }
     ASSERT_OK(Subprocess::Call(args));
   }
 
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc 
b/src/kudu/integration-tests/raft_consensus-itest.cc
index 795d5c091..bfefe1abd 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -828,7 +828,13 @@ TEST_P(RaftConsensusParamEncryptionITest, 
TestCatchupAfterOpsEvicted) {
     // process, as both of them access encrypted files.
     SetEncryptionFlags(true);
     kTsFlags.emplace_back("--encrypt_data_at_rest=true");
-    kTsFlags.emplace_back("--test_server_key=" + GetEncryptionKey());
+    string server_key;
+    string server_key_iv;
+    string server_key_version;
+    GetEncryptionKey(&server_key, &server_key_iv, &server_key_version);
+    kTsFlags.emplace_back("--test_server_key=" + server_key);
+    kTsFlags.emplace_back("--test_server_key_iv=" + server_key_iv);
+    kTsFlags.emplace_back("--test_server_key_version=" + server_key_version);
   }
 
   NO_FATALS(BuildAndStart(kTsFlags));
diff --git a/src/kudu/integration-tests/security-itest.cc 
b/src/kudu/integration-tests/security-itest.cc
index 2fa920fe7..9b702fe0a 100644
--- a/src/kudu/integration-tests/security-itest.cc
+++ b/src/kudu/integration-tests/security-itest.cc
@@ -682,6 +682,17 @@ TEST_F(SecurityITest, 
TestRequireAuthenticationSecureCluster) {
   SmokeTestCluster(client, /* transactional */ false);
 }
 
+TEST_F(SecurityITest, TestEncryptionWithKMSIntegration) {
+  cluster_opts_.enable_ranger = true;
+  cluster_opts_.enable_ranger_kms = true;
+  ASSERT_OK(StartCluster());
+
+  shared_ptr<KuduClient> client;
+  KuduClientBuilder b;
+  ASSERT_OK(cluster_->CreateClient(&b, &client));
+  SmokeTestCluster(client, /* transactional */ false);
+}
+
 class EncryptionPolicyTest :
     public SecurityITest,
     public ::testing::WithParamInterface<tuple<
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc 
b/src/kudu/mini-cluster/external_mini_cluster.cc
index 7a765b83b..480581adb 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -34,11 +34,12 @@
 
 #include "kudu/client/client.h"
 #include "kudu/client/master_rpc.h"
+#include "kudu/fs/default_key_provider.h"
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/key_provider.h"
 #include "kudu/postgres/mini_postgres.h"
 #include "kudu/ranger-kms/mini_ranger_kms.h"
 #include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/server/key_provider.h"
 #if !defined(NO_CHRONY)
 #include "kudu/clock/test/mini_chronyd.h"
 #endif
@@ -59,7 +60,6 @@
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/user_credentials.h"
 #include "kudu/security/test/mini_kdc.h"
-#include "kudu/server/default_key_provider.h"
 #include "kudu/server/server_base.pb.h"
 #include "kudu/server/server_base.proxy.h"
 #include "kudu/tablet/metadata.pb.h"
@@ -134,6 +134,7 @@ ExternalMiniClusterOptions::ExternalMiniClusterOptions()
       hms_mode(HmsMode::NONE),
       enable_ranger(false),
       enable_ranger_kms(false),
+      ranger_cluster_key("kuduclusterkey"),
       enable_encryption(FLAGS_encrypt_data_at_rest),
       logtostderr(true),
       start_process_timeout(MonoDelta::FromSeconds(70)),
@@ -369,13 +370,24 @@ Status ExternalMiniCluster::Start() {
     if (opts_.enable_kerberos) {
       string keytab;
       RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
-      Substitute("rangerkms/[email protected]", host),
-                  &keytab),
-                  "could not create rangeradmin keytab");
+          Substitute("rangerkms/[email protected]", host),
+          &keytab),
+        "could not create ranger kms keytab");
+      string spnego_keytab;
+      RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
+          Substitute("HTTP/[email protected]", host),
+          &spnego_keytab),
+        "could not create ranger kms keytab");
 
-      ranger_kms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], keytab);
+      ranger_kms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], keytab, 
spnego_keytab);
     }
+    RETURN_NOT_OK(kdc_->CreateUserPrincipal("keyadmin"));
+    RETURN_NOT_OK(kdc_->Kinit("keyadmin"));
     RETURN_NOT_OK_PREPEND(ranger_kms_->Start(), "Failed to start the Ranger 
KMS service");
+    
RETURN_NOT_OK_PREPEND(ranger_kms_->CreateClusterKey(opts_.ranger_cluster_key,
+                                                        
&opts_.ranger_cluster_key_version),
+                          "Failed to create cluster key");;
+    RETURN_NOT_OK(kdc_->Kinit("test-admin"));
   }
 
   // Start the HMS.
@@ -604,6 +616,11 @@ Status ExternalMiniCluster::AddTabletServer() {
   ExternalDaemonOptions opts;
   opts.messenger = messenger_;
   opts.enable_encryption = opts_.enable_encryption;
+  opts.enable_ranger_kms = opts_.enable_ranger_kms;
+  opts.ranger_cluster_key = opts_.ranger_cluster_key;
+  if (opts.enable_ranger_kms) {
+    opts.ranger_kms_url = ranger_kms_->url();
+  }
   opts.block_manager_type = opts_.block_manager_type;
   opts.exe = GetBinaryPath(kKuduBinaryName);
   opts.wal_dir = GetWalPath(daemon_id);
@@ -688,11 +705,21 @@ Status ExternalMiniCluster::CreateMaster(const 
vector<HostPort>& master_rpc_addr
     flags.emplace_back(Substitute("--ranger_config_path=$0",
                                   JoinPathSegments(cluster_root(),
                                                    "ranger-client")));
+    flags.emplace_back("--trusted_user_acl=test-admin");
   }
   if (!opts_.master_alias_prefix.empty()) {
     flags.emplace_back(Substitute("--host_for_tests=$0.$1",
                                   opts_.master_alias_prefix, idx));
   }
+
+  if (opts_.enable_encryption) {
+    flags.emplace_back("--encrypt_data_at_rest=true");
+    if (opts_.enable_ranger_kms) {
+      flags.emplace_back("--encryption_key_provider=ranger-kms");
+      flags.emplace_back(Substitute("--encryption_cluster_key_name=$0", 
opts_.ranger_cluster_key));
+      flags.emplace_back(Substitute("--ranger_kms_url=$0", 
ranger_kms_->url()));
+    }
+  }
   // Add custom master flags.
   copy(opts_.extra_master_flags.begin(), opts_.extra_master_flags.end(),
        std::back_inserter(flags));
@@ -1170,6 +1197,11 @@ std::vector<std::string> 
ExternalDaemon::GetDaemonFlags(const ExternalDaemonOpti
 
   if (opts.enable_encryption) {
     flags.emplace_back("--encrypt_data_at_rest=true");
+    if (opts.enable_ranger_kms) {
+      flags.emplace_back("--encryption_key_provider=ranger-kms");
+      flags.emplace_back(Substitute("--encryption_cluster_key_name=$0", 
opts.ranger_cluster_key));
+      flags.emplace_back(Substitute("--ranger_kms_url=$0", 
opts.ranger_kms_url));
+    }
   }
 
   // If large keys are not enabled.
@@ -1305,7 +1337,10 @@ Status ExternalDaemon::SetServerKey() {
   RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(env(), path, &instance, 
pb_util::NOT_SENSITIVE));
   if (!instance.server_key().empty()) {
     string key;
-    RETURN_NOT_OK(key_provider_->DecryptServerKey(instance.server_key(), 
&key));
+    RETURN_NOT_OK(key_provider_->DecryptServerKey(instance.server_key(),
+                                                  instance.server_key_iv(),
+                                                  
instance.server_key_version(),
+                                                  &key));
     LOG(INFO) << "Setting key " << key;
     env()->SetEncryptionKey(reinterpret_cast<const 
uint8_t*>(a2b_hex(key).c_str()), key.size() * 4);
   }
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h 
b/src/kudu/mini-cluster/external_mini_cluster.h
index b42494c44..b06e66ccb 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -48,6 +48,10 @@ class Env;
 class NodeInstancePB;
 class Sockaddr;
 class Subprocess;
+namespace ranger {
+class MiniRanger;
+}  // namespace ranger
+
 namespace rangerkms {
 class MiniRangerKMS;
 }  // namespace rangerkms
@@ -83,10 +87,6 @@ namespace postgres {
 class MiniPostgres;
 } // namespace postgres
 
-namespace ranger {
-class MiniRanger;
-} // namespace ranger
-
 namespace server {
 class ServerStatusPB;
 } // namespace server
@@ -227,6 +227,16 @@ struct ExternalMiniClusterOptions {
   // Default: false.
   bool enable_ranger_kms;
 
+  // Cluster key in Ranger.
+  //
+  // Default: "".
+  std::string ranger_cluster_key;
+
+  // Cluster key version in Ranger.
+  //
+  // Default: "".
+  std::string ranger_cluster_key_version;
+
   // If true, enable data at rest encryption.
   //
   // Default: false.
@@ -586,11 +596,13 @@ class ExternalMiniCluster : public MiniCluster {
 struct ExternalDaemonOptions {
   ExternalDaemonOptions()
       : logtostderr(false),
-        enable_encryption(false) {
+        enable_encryption(false),
+        enable_ranger_kms(false) {
   }
 
   bool logtostderr;
   bool enable_encryption;
+  bool enable_ranger_kms;
   std::shared_ptr<rpc::Messenger> messenger;
   std::string block_manager_type;
   std::string exe;
@@ -601,6 +613,8 @@ struct ExternalDaemonOptions {
   std::string perf_record_filename;
   std::vector<std::string> extra_flags;
   MonoDelta start_process_timeout;
+  std::string ranger_kms_url;
+  std::string ranger_cluster_key;
 };
 
 class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc 
b/src/kudu/mini-cluster/internal_mini_cluster.cc
index 7bab85cba..932453ac2 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.cc
+++ b/src/kudu/mini-cluster/internal_mini_cluster.cc
@@ -156,7 +156,10 @@ Status InternalMiniCluster::StartMasters() {
     for (int i = 0; i < num_masters; i++) {
       auto mini_master(std::make_shared<MiniMaster>(
           GetMasterFsRoot(i), master_rpc_addrs[i]));
-      mini_master->mutable_options()->server_key = 
KuduTest::GetEncryptionKey();
+      auto* options = mini_master->mutable_options();
+      KuduTest::GetEncryptionKey(&options->server_key,
+                                 &options->server_key_iv,
+                                 &options->server_key_version);
       if (num_masters > 1 || opts_.supply_single_master_addr) {
         mini_master->SetMasterAddresses(master_rpc_addrs);
       }
@@ -214,7 +217,10 @@ Status InternalMiniCluster::AddTabletServer(const 
HostPort& hp) {
   unique_ptr<MiniTabletServer> tablet_server(
       new MiniTabletServer(GetTabletServerFsRoot(new_idx), hp, 
opts_.num_data_dirs));
   tablet_server->options()->master_addresses = master_rpc_addrs();
-  tablet_server->options()->server_key = KuduTest::GetEncryptionKey();
+  auto* options = tablet_server->options();
+  KuduTest::GetEncryptionKey(&options->server_key,
+                             &options->server_key_iv,
+                             &options->server_key_version);
 
   RETURN_NOT_OK(tablet_server->Start());
   mini_tablet_servers_.emplace_back(std::move(tablet_server));
diff --git a/src/kudu/ranger-kms/CMakeLists.txt 
b/src/kudu/ranger-kms/CMakeLists.txt
index a119668b8..1dae6591c 100644
--- a/src/kudu/ranger-kms/CMakeLists.txt
+++ b/src/kudu/ranger-kms/CMakeLists.txt
@@ -33,6 +33,18 @@ set(MINI_RANGER_KMS_DEPS
 add_library(mini_ranger_kms ${MINI_RANGER_KMS_SRCS})
 target_link_libraries(mini_ranger_kms ${MINI_RANGER_KMS_DEPS})
 
+#######################################
+# ranger_kms_client
+#######################################
+set(RANGER_KMS_CLIENT_SRCS
+        ranger_kms_client.cc)
+set(RANGER_KMS_CLIENT_DEPS
+        kudu_curl_util
+        kudu_util)
+
+add_library(ranger_kms_client ${RANGER_KMS_CLIENT_SRCS})
+target_link_libraries(ranger_kms_client ${RANGER_KMS_CLIENT_DEPS})
+
 #######################################
 # Unit tests
 #######################################
@@ -40,4 +52,4 @@ target_link_libraries(mini_ranger_kms ${MINI_RANGER_KMS_DEPS})
 SET_KUDU_TEST_LINK_LIBS(
         itest_util
         mini_postgres
-        mini_ranger_kms)
\ No newline at end of file
+        mini_ranger_kms)
diff --git a/src/kudu/ranger-kms/mini_ranger_kms.cc 
b/src/kudu/ranger-kms/mini_ranger_kms.cc
index 6ff39f1f9..7f78005d8 100644
--- a/src/kudu/ranger-kms/mini_ranger_kms.cc
+++ b/src/kudu/ranger-kms/mini_ranger_kms.cc
@@ -18,7 +18,7 @@
 #include "kudu/ranger-kms/mini_ranger_kms.h"
 
 #include <csignal>
-
+#include <initializer_list>
 #include <ostream>
 #include <string>
 #include <vector>
@@ -32,6 +32,7 @@
 #include "kudu/util/easy_json.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/jsonreader.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/slice.h"
@@ -91,41 +92,58 @@ Status MiniRangerKMS::CreateConfigs(const std::string& 
conf_dir) {
   ranger_kms_url_ = Substitute("http://$0:$1";, host_, port_);
 
   // Write config files
-  RETURN_NOT_OK(WriteStringToFile(env_,
-                                  GetRangerKMSInstallProperties(bin_dir(),
-                                                                host_,
-                                                                
mini_pg_->bound_port(),
-                                                                
mini_ranger_->admin_url()),
-                                  JoinPathSegments(kms_home, 
"install.properties")));
-
-  RETURN_NOT_OK(WriteStringToFile(env_,
-                                  GetRangerKMSSiteXml(host_,
-                                                      port_,
-                                                      
JoinPathSegments(kms_home, "ews/webapp"),
-                                                      conf_dir),
-                                  JoinPathSegments(kms_home, 
"ranger-kms-site.xml")));
-
-  RETURN_NOT_OK(WriteStringToFile(env_,
-                                  GetRangerKMSDbksSiteXml(host_,
-                                                          
mini_pg_->bound_port(),
-                                                          "postgresql.jar"),
-                                  JoinPathSegments(kms_home, 
"dbks-site.xml")));
-
-  RETURN_NOT_OK(WriteStringToFile(env_,
-                                  GetRangerKMSLog4jProperties("info"),
-                                  JoinPathSegments(kms_home, 
"log4j.properties")));
-
-  RETURN_NOT_OK(WriteStringToFile(env_,
-                                  
GetRangerKMSSecurityXml(mini_ranger_->admin_url(), kms_home),
-                                  JoinPathSegments(kms_home, 
"ranger-kms-security.xml")));
-
-  RETURN_NOT_OK(WriteStringToFile(env_,
-                                  GetKMSSiteXml(kerberos_, ktpath_),
-                                  JoinPathSegments(kms_home, "kms-site.xml")));
-
-  RETURN_NOT_OK(WriteStringToFile(env_,
-                                  GetRangerKMSPolicymgrSSLXml(),
-                                  JoinPathSegments(kms_home, 
"ranger-kms-policymgr-ssl.xml")));
+  RETURN_NOT_OK(WriteStringToFile(
+    env_, GetRangerKMSInstallProperties(
+            bin_dir(),
+            host_,
+            mini_pg_->bound_port(),
+            mini_ranger_->admin_url()),
+    JoinPathSegments(kms_home, "install.properties")));
+
+  RETURN_NOT_OK(WriteStringToFile(
+          env_,
+          GetRangerKMSSiteXml(host_,
+                              port_,
+                              JoinPathSegments(kms_home, "ews/webapp"),
+                              conf_dir),
+          JoinPathSegments(kms_home, "ranger-kms-site.xml")));
+
+  RETURN_NOT_OK(WriteStringToFile(
+          env_, GetRangerKMSDbksSiteXml(mini_pg_->host(),
+                                        mini_pg_->bound_port(),
+                                        "postgresql.jar",
+                                        host_,
+                                        ktpath_),
+          JoinPathSegments(kms_home, "dbks-site.xml")));
+
+  RETURN_NOT_OK(WriteStringToFile(
+          env_, GetRangerKMSLog4jProperties("info"),
+          JoinPathSegments(kms_home, "log4j.properties")
+          ));
+
+  RETURN_NOT_OK(WriteStringToFile(
+          env_, GetRangerKMSSecurityXml(mini_ranger_->admin_url(), kms_home),
+          JoinPathSegments(kms_home, "ranger-kms-security.xml")
+          ));
+
+  RETURN_NOT_OK(WriteStringToFile(
+          env_, GetKMSSiteXml(kerberos_, spnego_ktpath_, host_),
+          JoinPathSegments(kms_home, "kms-site.xml")
+          ));
+
+  RETURN_NOT_OK(WriteStringToFile(
+          env_, GetRangerKMSPolicymgrSSLXml(),
+          JoinPathSegments(kms_home, "ranger-kms-policymgr-ssl.xml")
+          ));
+
+  RETURN_NOT_OK(env_util::CopyFile(
+        env_, JoinPathSegments(mini_ranger_->ranger_admin_home(), 
"ranger-admin-site.xml"),
+        JoinPathSegments(kms_home, "ranger-admin-site.xml"), 
WritableFileOptions()));
+
+  RETURN_NOT_OK(WriteStringToFile(
+        env_, GetRangerKMSCoreSiteXml(/*secure=*/true),
+        JoinPathSegments(kms_home, "core-site.xml")
+        ));
 
   return Status::OK();
 }
@@ -147,11 +165,12 @@ Status MiniRangerKMS::DbSetup(const std::string 
&kms_home, const std::string &ew
 
   static const vector<string> files_to_copy = {
     "dbks-site.xml",
-    "install.properties"
-    "kms-site.xml"
+    "install.properties",
+    "kms-site.xml",
     "log4j.properties",
     "ranger-kms-policymgr-ssl.xml",
-    "ranger-kms-security.xml"
+    "ranger-kms-security.xml",
+    "ranger-kms-site.xml"
   };
 
   for (const auto& file : files_to_copy) {
@@ -194,17 +213,11 @@ Status MiniRangerKMS::StartRangerKMS() {
     RETURN_NOT_OK(FindHomeDir("java", bin_dir, &java_home_));
 
     const string kKMSHome = ranger_kms_home();
-    // kms_home/ews
     const string kEwsDir = JoinPathSegments(kKMSHome, "ews");
-    // kms_home/ews/webapp
     const string kWebAppDir = JoinPathSegments(kEwsDir, "webapp");
-    // kms_home/ews/webapp/WEB-INF
     const string kWebInfDir = JoinPathSegments(kWebAppDir, "WEB-INF");
-    // kms_home/ews/webapp/WEB-INF/classes
     const string kClassesDir = JoinPathSegments(kWebInfDir, "classes");
-    // kms_home/ews/webapp/WEB-INF/classes/conf
     const string kConfDir = JoinPathSegments(kClassesDir, "conf");
-    // kms_home/ews/webapp/WEB-INF/classes/lib
     const string kLibDir = JoinPathSegments(kClassesDir, "lib");
 
     RETURN_NOT_OK(InitRangerKMS(kKMSHome, &fresh_install));
@@ -242,7 +255,6 @@ Status MiniRangerKMS::StartRangerKMS() {
     if (kerberos_) {
       args.emplace_back(Substitute("-Djava.security.krb5.conf=$0", 
krb5_config_));
     }
-
     args.emplace_back("-cp");
     args.emplace_back(classpath);
     args.emplace_back("org.apache.ranger.server.tomcat.EmbeddedServer");
@@ -272,21 +284,123 @@ Status MiniRangerKMS::StartRangerKMS() {
 }
 
 Status MiniRangerKMS::CreateKMSService() {
-  string service_name = "kms";
+  // Create the actual service.
+  const string kServiceName = "kms";
   EasyJson service;
-  service.Set("name", service_name);
-  service.Set("type", service_name);
+  service.Set("name", kServiceName);
+  service.Set("type", "kms");
 
   EasyJson configs = service.Set("configs", EasyJson::kObject);
-  configs.Set("policy.download.auth.users", service_name);
-  configs.Set("tag.download.auth.users", service_name);
-  configs.Set("provider", ranger_kms_url_);
-  configs.Set("username", "rangerkms");
-  configs.Set("password", "rangerkms");
+  configs.Set("policy.download.auth.users", "keyadmin,rangerkms");
+  configs.Set("provider", Substitute("kms://http@$0:$1/kms", host_, port_));
+  configs.Set("username", "keyadmin");
+  configs.Set("password", "keyadmin");
 
   RETURN_NOT_OK_PREPEND(mini_ranger_->PostToRanger("service/plugins/services", 
service),
-                        Substitute("Failed to create $0 service", 
service_name));
-  LOG(INFO) << Substitute("Created $0 service", service_name);
+                        Substitute("Failed to create $0 service", 
kServiceName));
+  LOG(INFO) << Substitute("Created $0 service", kServiceName);
+
+  {
+    // Create kudu user
+    EasyJson user;
+    user.Set("groupList", EasyJson::kArray);
+    user.Set("status", 1);
+    auto roleList = user.Set("userRoleList", EasyJson::kArray);
+    roleList.PushBack("ROLE_USER");
+    user.Set("name", "kudu");
+    user.Set("password", "KuduPass123");
+    user.Set("firstName", "kudu");
+    user.Set("lastName", "kudu");
+    user.Set("emailAddress", "");
+    
RETURN_NOT_OK_PREPEND(mini_ranger_->PostToRanger("service/xusers/secure/users", 
user),
+                          "Failed to create kudu user");
+    LOG(INFO) << "Created kudu user";
+  }
+
+  {
+    // Create rangerkms user
+    EasyJson user;
+    user.Set("groupList", EasyJson::kArray);
+    user.Set("status", 1);
+    auto roleList = user.Set("userRoleList", EasyJson::kArray);
+    roleList.PushBack("ROLE_USER");
+    user.Set("name", "rangerkms");
+    user.Set("password", "rangerkmsPass123");
+    user.Set("firstName", "rangerkms");
+    user.Set("lastName", "rangerkms");
+    user.Set("emailAddress", "");
+    
RETURN_NOT_OK_PREPEND(mini_ranger_->PostToRanger("service/xusers/secure/users", 
user),
+                          "Failed to create rangerkms user");
+    LOG(INFO) << "Created rangerkms user";
+  }
+
+  {
+    // Create policy allowing Kudu to create keys, generate and decrypt EEKs.
+    EasyJson policy;
+    policy.Set("service", kServiceName);
+    policy.Set("name", "all");
+    policy.Set("keyname", "*");
+    policy.Set("isEnabled", true);
+    EasyJson resources = policy.Set("resources", EasyJson::kObject);
+    resources.Set("keyname", "*");
+    EasyJson policy_items = policy.Set("policyItems", EasyJson::kArray);
+    {
+      EasyJson item = policy_items.PushBack(EasyJson::kObject);
+      EasyJson users = item.Set("users", EasyJson::kArray);
+      users.PushBack("kudu");
+      EasyJson accesses = item.Set("accesses", EasyJson::kArray);
+      for (auto a : {"getmetadata", "generateeek", "create", "decrypteek"}) {
+        EasyJson access = accesses.PushBack(EasyJson::kObject);
+        access.Set("type", a);
+        access.Set("isAllowed", true);
+      }
+    }
+    {
+      EasyJson item = policy_items.PushBack(EasyJson::kObject);
+      item.Set("delegateAdmin", true);
+      EasyJson users = item.Set("users", EasyJson::kArray);
+      users.PushBack("keyadmin");
+      users.PushBack("rangerkms");
+      EasyJson accesses = item.Set("accesses", EasyJson::kArray);
+      for (auto a : {"getmetadata", "generateeek", "create", "decrypteek", 
"getkeys", "get"}) {
+        EasyJson access = accesses.PushBack(EasyJson::kObject);
+        access.Set("type", a);
+        access.Set("isAllowed", true);
+      }
+    }
+
+    RETURN_NOT_OK_PREPEND(mini_ranger_->PostToRanger(
+        "service/plugins/policies?deleteIfExists=true", policy, 
/*secure=*/true),
+      "Failed to add policy");
+    LOG(INFO) << "Added ranger policy";
+  }
+
+  return Status::OK();
+}
+
+Status MiniRangerKMS::CreateClusterKey(const string& name, string* version) {
+
+  EasyJson payload;
+  payload.Set("name", name);
+  payload.Set("cipher", "AES/CTR/NoPadding");
+  payload.Set("length", 128);
+  payload.Set("description", name);
+  LOG(INFO) << payload.ToString();
+  EasyCurl curl;
+  curl.set_auth(CurlAuthType::SPNEGO);
+  string url = Substitute("$0:$1/kms/v1/keys", host_, port_);
+  LOG(INFO) << url;
+  faststring resp;
+  string tmp;
+  Status s = curl.PostToURL(url, payload.ToString(), &resp, {"Content-Type: 
application/json"});
+  if (!s.ok()) {
+    LOG(ERROR) << resp.ToString();
+    return s;
+  }
+  JsonReader r(resp.ToString());
+  RETURN_NOT_OK(r.Init());
+  RETURN_NOT_OK(r.ExtractString(r.root(), "versionName", version));
+
   return Status::OK();
 }
 
diff --git a/src/kudu/ranger-kms/mini_ranger_kms.h 
b/src/kudu/ranger-kms/mini_ranger_kms.h
index 262f8105e..d118f967a 100644
--- a/src/kudu/ranger-kms/mini_ranger_kms.h
+++ b/src/kudu/ranger-kms/mini_ranger_kms.h
@@ -30,10 +30,10 @@
 #include "kudu/util/env.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/status.h"
+#include "kudu/util/subprocess.h" // IWYU pragma: keep
 #include "kudu/util/test_util.h"
 
 namespace kudu {
-class Subprocess;
 namespace postgres {
 class MiniPostgres;
 }  // namespace postgres
@@ -76,11 +76,20 @@ class MiniRangerKMS {
 
   Status GetKeys() const;
 
+  Status CreateClusterKey(const std::string& name, std::string* version) 
WARN_UNUSED_RESULT;
+
+
   void EnableKerberos(std::string krb5_config,
-                      std::string ktpath) {
+                      std::string ktpath,
+                      std::string spnego_ktpath) {
     kerberos_ = true;
     krb5_config_ = std::move(krb5_config);
     ktpath_ = std::move(ktpath);
+    spnego_ktpath_ = std::move(spnego_ktpath);
+  }
+
+  std::string url() const {
+    return strings::Substitute("$0:$1/kms", host_, port_);
   }
 
  private:
@@ -103,7 +112,7 @@ class MiniRangerKMS {
 
   // Returns RangerKMS' home directory.
   std::string ranger_kms_home() const {
-    return JoinPathSegments(data_root_, "rangerkms");
+    return JoinPathSegments(data_root_, "ranger-kms");
   }
 
   std::string bin_dir() const {
@@ -152,6 +161,7 @@ class MiniRangerKMS {
   bool kerberos_;
   std::string krb5_config_;
   std::string ktpath_;
+  std::string spnego_ktpath_;
 
   Env* env_;
   EasyCurl curl_;
diff --git a/src/kudu/ranger-kms/mini_ranger_kms_configs.h 
b/src/kudu/ranger-kms/mini_ranger_kms_configs.h
index 8b5011d8a..f34089b65 100644
--- a/src/kudu/ranger-kms/mini_ranger_kms_configs.h
+++ b/src/kudu/ranger-kms/mini_ranger_kms_configs.h
@@ -131,13 +131,33 @@ inline std::string GetRangerKMSSiteXml(const std::string& 
kms_host,
     <name>kms.config.dir</name>
     <value>$3</value>
   </property>
+  <property>
+    <name>hadoop.kms.proxyuser.ranger.groups</name>
+    <value>*</value>
+  </property>
+  <property>
+    <name>hadoop.kms.proxyuser.ranger.hosts</name>
+    <value>*</value>
+  </property>
+  <property>
+    <name>hadoop.kms.authentication.kerberos.name.rules</name>
+    <value>RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/ranger/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/rangertagsync/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/rangerusersync/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/keyadmin/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/atlas/
+DEFAULT</value>
+  </property>
+
 </configuration>)";
   return strings::Substitute(kRangerKMSSiteXmlTemplate, kms_host, kms_port, 
webapp_dir, conf_dir);
 }
 
 inline std::string GetRangerKMSDbksSiteXml(const std::string& pg_host,
                                            const uint16_t pg_port,
-                                           const std::string& pg_driver) {
+                                           const std::string& pg_driver,
+                                           const std::string& host,
+                                           const std::string& keytab) {
   constexpr const char* const kRangerKMSDbksSiteXmlTemplate = R"(
 <configuration>
   <property>
@@ -269,11 +289,11 @@ inline std::string GetRangerKMSDbksSiteXml(const 
std::string& pg_host,
   </property>
   <property>
     <name>ranger.ks.kerberos.principal</name>
-    <value>rangerkms/[email protected]</value>
+    <value>rangerkms/[email protected]</value>
   </property>
   <property>
     <name>ranger.ks.kerberos.keytab</name>
-    <value />
+    <value>$4</value>
   </property>
   <property>
     <name>ranger.kms.keysecure.enabled</name>
@@ -343,7 +363,8 @@ inline std::string GetRangerKMSDbksSiteXml(const 
std::string& pg_host,
   </property>
 </configuration>
 )";
-  return strings::Substitute(kRangerKMSDbksSiteXmlTemplate, pg_host, pg_port, 
pg_driver);
+  return strings::Substitute(kRangerKMSDbksSiteXmlTemplate, pg_host, pg_port,
+                             pg_driver, host, keytab);
 }
 
 inline std::string GetRangerKMSLog4jProperties(const std::string& log_level) {
@@ -447,7 +468,8 @@ inline std::string GetRangerKMSSecurityXml(const 
std::string& ranger_url,
   return strings::Substitute(kRangerKmsSecurityXmlTemplate, ranger_url, 
kms_home, kms_home);
 }
 
-inline std::string GetKMSSiteXml(bool secure, const std::string& keytab) {
+inline std::string GetKMSSiteXml(bool secure, const std::string& keytab, const 
std::string& host) {
+
   constexpr const char* const kmsSiteXml = R"(
   <configuration>
 
@@ -535,7 +557,7 @@ inline std::string GetKMSSiteXml(bool secure, const 
std::string& keytab) {
 
   <property>
     <name>hadoop.kms.authentication.kerberos.principal</name>
-    <value>HTTP/localhost</value>
+    <value>HTTP/[email protected]</value>
     <description>
       The Kerberos principal to use for the HTTP endpoint.
       The principal must start with 'HTTP/' as per the Kerberos HTTP SPNEGO 
specification.
@@ -629,9 +651,9 @@ inline std::string GetKMSSiteXml(bool secure, const 
std::string& keytab) {
 </configuration>
 )";
   if (secure) {
-    return strings::Substitute(kmsSiteXml, "kerberos", keytab);
+    return strings::Substitute(kmsSiteXml, "kerberos", keytab, host);
   }
-  return strings::Substitute(kmsSiteXml, "simple", keytab);
+  return strings::Substitute(kmsSiteXml, "simple", keytab, host);
 }
 
 inline std::string GetRangerKMSAuditXml() {
@@ -680,6 +702,39 @@ inline std::string GetRangerKMSAuditXml() {
   return kRangerKMSAuditXml;
 }
 
+// Gets the core-site.xml that configures authentication.
+inline std::string GetRangerKMSCoreSiteXml(bool secure) {
+  // core-site.xml containing authentication method.
+  //
+  // $0: authn method (simple or kerberos)
+  const char* kCoreSiteTemplate = R"(
+<configuration>
+  <property>
+    <name>hadoop.security.authentication</name>
+    <value>$0</value>
+  </property>
+  <property>
+    <name>hadoop.security.group.mapping</name>
+    <value>org.apache.hadoop.security.NullGroupsMapping</value>
+  </property>
+  <property>
+    <name>hadoop.security.auth_to_local</name>
+    <value>RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/ranger/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/rangertagsync/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/rangerusersync/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/keyadmin/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/atlas/
+DEFAULT</value>
+  </property>
+</configuration>
+)";
+  if (secure) {
+    return strings::Substitute(kCoreSiteTemplate, "kerberos", "true");
+  }
+
+  return strings::Substitute(kCoreSiteTemplate, "simple", "false");
+}
+
 inline std::string GetRangerKMSPolicymgrSSLXml() {
   constexpr const char* const kRangerKMSPolicymgrSSLXml = R"(
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
diff --git a/src/kudu/ranger-kms/ranger_kms_client.cc 
b/src/kudu/ranger-kms/ranger_kms_client.cc
new file mode 100644
index 000000000..29b8c69db
--- /dev/null
+++ b/src/kudu/ranger-kms/ranger_kms_client.cc
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/ranger-kms/ranger_kms_client.h"
+
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/strings/escaping.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/curl_util.h"
+#include "kudu/util/easy_json.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/jsonreader.h"
+
+using rapidjson::Value;
+using std::string;
+using std::vector;
+using strings::a2b_hex;
+using strings::b2a_hex;
+using strings::Substitute;
+using strings::WebSafeBase64Escape;
+using strings::WebSafeBase64Unescape;
+
+namespace kudu {
+namespace security {
+
+Status RangerKMSClient::DecryptKey(const string& encrypted_key,
+                                   const string& iv,
+                                   const string& key_version,
+                                   string* decrypted_key) {
+  EasyJson payload;
+  payload.Set("name", cluster_key_name_);
+  string iv_plain = a2b_hex(iv);
+  string iv_b64;
+  WebSafeBase64Escape(iv_plain, &iv_b64);
+  payload.Set("iv", iv_b64);
+  string eek_plain = a2b_hex(encrypted_key);
+  string eek_b64;
+  WebSafeBase64Escape(eek_plain, &eek_b64);
+  payload.Set("material", eek_b64);
+  EasyCurl curl;
+  curl.set_auth(CurlAuthType::SPNEGO);
+  string url = Substitute("$0/v1/keyversion/$1/_eek?eek_op=decrypt",
+                          kms_url_, key_version);
+  faststring resp;
+  RETURN_NOT_OK_PREPEND(
+      curl.PostToURL(url, payload.ToString(), &resp, {"Content-Type: 
application/json"}),
+      "failed to decrypt server key");
+  JsonReader r(resp.ToString());
+  RETURN_NOT_OK(r.Init());
+  string dek_b64;
+  RETURN_NOT_OK(r.ExtractString(r.root(), "material", &dek_b64));
+  string dek_plain;
+  WebSafeBase64Unescape(dek_b64, &dek_plain);
+  *decrypted_key = b2a_hex(dek_plain);
+  return Status::OK();
+}
+
+Status RangerKMSClient::GenerateEncryptedServerKey(string* encrypted_key,
+                                                   string* iv,
+                                                   string* key_version) {
+  EasyCurl curl;
+  curl.set_auth(CurlAuthType::SPNEGO);
+  string url = Substitute("$0/v1/key/$1/_eek?eek_op=generate&num_keys=1",
+                          kms_url_, cluster_key_name_);
+  faststring resp;
+  RETURN_NOT_OK_PREPEND(curl.FetchURL(url, &resp), "failed to generate server 
key");
+  JsonReader r(resp.ToString());
+  RETURN_NOT_OK(r.Init());
+  vector<const Value*> keys;
+  RETURN_NOT_OK(r.ExtractObjectArray(r.root(), nullptr, &keys));
+  string iv_b64;
+  DCHECK_GT(keys.size(), 0);
+  const Value* key = keys[0];
+  RETURN_NOT_OK(r.ExtractString(key, "iv", &iv_b64));
+  string iv_plain;
+  if (!WebSafeBase64Unescape(iv_b64, &iv_plain)) {
+    return Status::Corruption("Invalid IV received");
+  }
+  *iv = b2a_hex(iv_plain);
+  RETURN_NOT_OK(r.ExtractString(key, "versionName", key_version));
+  const Value* ekv = nullptr;
+  RETURN_NOT_OK(r.ExtractObject(key, "encryptedKeyVersion", &ekv));
+  string key_b64;
+  RETURN_NOT_OK(r.ExtractString(ekv, "material", &key_b64));
+  string key_plain;
+  if (!WebSafeBase64Unescape(key_b64, &key_plain)) {
+    return Status::Corruption("Invalid encryption key received");
+  }
+  *encrypted_key = b2a_hex(key_plain);
+  return Status::OK();
+}
+
+} // namespace security
+} // namespace kudu
diff --git a/src/kudu/server/key_provider.h 
b/src/kudu/ranger-kms/ranger_kms_client.h
similarity index 59%
rename from src/kudu/server/key_provider.h
rename to src/kudu/ranger-kms/ranger_kms_client.h
index 0c52282f7..52e2c5708 100644
--- a/src/kudu/server/key_provider.h
+++ b/src/kudu/ranger-kms/ranger_kms_client.h
@@ -18,24 +18,31 @@
 #pragma once
 
 #include <string>
+#include <utility>
 
 #include "kudu/util/status.h"
 
 namespace kudu {
 namespace security {
-
-// An interface for encrypting and decrypting Kudu's server keys.
-class KeyProvider {
+class RangerKMSClient {
  public:
-  virtual ~KeyProvider() = default;
+  RangerKMSClient(std::string kms_url, std::string cluster_key_name)
+    : kms_url_(std::move(kms_url)),
+      cluster_key_name_(std::move(cluster_key_name)) {}
+
+  Status DecryptKey(const std::string& encrypted_key,
+                    const std::string& iv,
+                    const std::string& key_version,
+                    std::string* decrypted_key);
+
+  Status GenerateEncryptedServerKey(std::string* encrypted_key,
+                                    std::string* iv,
+                                    std::string* key_version);
 
-  // Decrypts the server key.
-  virtual Status DecryptServerKey(const std::string& encrypted_server_key,
-                                  std::string* server_key) = 0;
+ private:
+  std::string kms_url_;
+  std::string cluster_key_name_;
 
-  // Encrypts the server key.
-  virtual Status EncryptServerKey(const std::string& server_key,
-                                  std::string* encrypted_server_key) = 0;
 };
 } // namespace security
 } // namespace kudu
diff --git a/src/kudu/ranger/mini_ranger.cc b/src/kudu/ranger/mini_ranger.cc
index a342e29a4..e8ce7232c 100644
--- a/src/kudu/ranger/mini_ranger.cc
+++ b/src/kudu/ranger/mini_ranger.cc
@@ -332,10 +332,16 @@ Status MiniRanger::AddPolicy(AuthorizationPolicy policy) {
   return Status::OK();
 }
 
-Status MiniRanger::PostToRanger(string url, EasyJson payload) {
+Status MiniRanger::PostToRanger(const string& url, const EasyJson& payload, 
bool secure) {
+  EasyCurl curl;
+  if (secure) {
+    curl.set_auth(CurlAuthType::SPNEGO);
+  } else {
+    curl.set_auth(CurlAuthType::BASIC, "admin", "admin");
+  }
   faststring result;
-  RETURN_NOT_OK_PREPEND(curl_.PostToURL(JoinPathSegments(ranger_admin_url_, 
std::move(url)),
-                                        std::move(payload.ToString()), &result,
+  RETURN_NOT_OK_PREPEND(curl.PostToURL(JoinPathSegments(ranger_admin_url_, 
url),
+                                        payload.ToString(), &result,
                                         {"Content-Type: application/json"}),
                         Substitute("Error received from Ranger: $0", 
result.ToString()));
   return Status::OK();
diff --git a/src/kudu/ranger/mini_ranger.h b/src/kudu/ranger/mini_ranger.h
index 056335017..1f8b3910c 100644
--- a/src/kudu/ranger/mini_ranger.h
+++ b/src/kudu/ranger/mini_ranger.h
@@ -30,7 +30,6 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/ranger/ranger.pb.h"
-#include "kudu/util/curl_util.h"
 #include "kudu/util/env.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/status.h"
@@ -39,9 +38,15 @@
 
 namespace kudu {
 class EasyJson;
+
 namespace postgres {
 class MiniPostgres;
 }  // namespace postgres
+}  // namespace kudu
+
+using kudu::postgres::MiniPostgres;
+
+namespace kudu {
 
 namespace ranger {
 
@@ -84,9 +89,7 @@ class MiniRanger {
       host_(std::move(host)),
       mini_pg_(std::move(mini_pg)),
       kerberos_(false),
-      env_(Env::Default()) {
-        curl_.set_auth(CurlAuthType::BASIC, "admin", "admin");
-      }
+      env_(Env::Default()) {}
 
   // Starts Ranger and its dependencies.
   Status Start() WARN_UNUSED_RESULT;
@@ -119,10 +122,17 @@ class MiniRanger {
     return ranger_admin_url_;
   }
 
+
+  // Returns Ranger admin's home directory.
+  std::string ranger_admin_home() const {
+    return JoinPathSegments(data_root_, "ranger-admin");
+  }
+
   bool IsRunning() const { return process_ && process_->IsStarted(); }
 
   // Sends a POST request to Ranger with 'payload'.
-  Status PostToRanger(std::string url, EasyJson payload) WARN_UNUSED_RESULT;
+  Status PostToRanger(const std::string& url, const EasyJson& payload, bool 
secure = false)
+    WARN_UNUSED_RESULT;
 
 private:
   // Starts the Ranger service.
@@ -144,11 +154,6 @@ private:
   // Creates a Kudu service in Ranger.
   Status CreateKuduService() WARN_UNUSED_RESULT;
 
-  // Returns Ranger admin's home directory.
-  std::string ranger_admin_home() const {
-    return JoinPathSegments(data_root_, "ranger-admin");
-  }
-
   std::string bin_dir() const {
     std::string exe;
     CHECK_OK(env_->GetExecutablePath(&exe));
@@ -188,7 +193,6 @@ private:
   std::string krb5_config_;
 
   Env* env_;
-  EasyCurl curl_;
 
   uint16_t port_ = 0;
 
diff --git a/src/kudu/ranger/mini_ranger_configs.h 
b/src/kudu/ranger/mini_ranger_configs.h
index 2dcf19d1d..c7d6e3d03 100644
--- a/src/kudu/ranger/mini_ranger_configs.h
+++ b/src/kudu/ranger/mini_ranger_configs.h
@@ -294,6 +294,10 @@ inline std::string GetRangerAdminDefaultSiteXml(const 
std::string& pg_driver,
     <value>rangerlogger</value>
     <description/>
   </property>
+  <property>
+    <name>ranger.users.roles.list</name>
+    <value>ROLE_SYS_ADMIN, ROLE_USER, ROLE_OTHER, ROLE_ANON, ROLE_KEY_ADMIN, 
ROLE_ADMIN_AUDITOR, ROLE_KEY_ADMIN_AUDITOR</value>
+  </property>
 </configuration>
 )";
   return strings::Substitute(kRangerAdminDefaultSiteTemplate, pg_driver,
@@ -385,6 +389,15 @@ inline std::string GetRangerCoreSiteXml(bool secure) {
     <name>hadoop.security.group.mapping</name>
     <value>org.apache.hadoop.security.NullGroupsMapping</value>
   </property>
+  <property>
+    <name>hadoop.security.auth_to_local</name>
+    <value>RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/ranger/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/rangertagsync/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/rangerusersync/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/keyadmin/
+RULE:[2:$$1@$$0]([email protected])s/(.*)@KRBTEST.COM/atlas/
+DEFAULT</value>
+  </property>
 </configuration>
 )";
   if (secure) {
diff --git a/src/kudu/server/CMakeLists.txt b/src/kudu/server/CMakeLists.txt
index a61b2d141..98a603560 100644
--- a/src/kudu/server/CMakeLists.txt
+++ b/src/kudu/server/CMakeLists.txt
@@ -87,4 +87,3 @@ ADD_KUDU_TEST(rpc_server-test)
 ADD_KUDU_TEST(webserver-test)
 
 SET_KUDU_TEST_LINK_LIBS(server_process)
-ADD_KUDU_TEST(default_key_provider-test)
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index a5dbfb866..b49d8efb7 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -614,7 +614,10 @@ Status ServerBase::Init() {
     if (options_.server_key.empty()) {
       s = fs_manager_->CreateInitialFileSystemLayout();
     } else {
-      s = fs_manager_->CreateInitialFileSystemLayout(std::nullopt, 
options_.server_key);
+      s = fs_manager_->CreateInitialFileSystemLayout(std::nullopt,
+                                                     options_.server_key,
+                                                     options_.server_key_iv,
+                                                     
options_.server_key_version);
     }
     if (s.IsAlreadyPresent()) {
       return s.CloneAndPrepend("FS layout already exists; not overwriting 
existing layout");
diff --git a/src/kudu/server/server_base_options.cc 
b/src/kudu/server/server_base_options.cc
index 47b9c08fc..d2d984b19 100644
--- a/src/kudu/server/server_base_options.cc
+++ b/src/kudu/server/server_base_options.cc
@@ -47,6 +47,19 @@ DEFINE_string(test_server_key, "",
               "consecutive startups. It should only be used in tests.");
 TAG_FLAG(test_server_key, hidden);
 
+
+DEFINE_string(test_server_key_iv, "",
+              "Server key IV in plain-text to be persisted into the instance 
file. "
+              "It is only used when creating the file system, it's disregarded 
on "
+              "consecutive startups. It should only be used in tests.");
+TAG_FLAG(test_server_key_iv, hidden);
+
+
+DEFINE_string(test_server_key_version, "",
+              "Server key version in plain-text to be persisted into the 
instance file. "
+              "It is only used when creating the file system, it's disregarded 
on "
+              "consecutive startups. It should only be used in tests.");
+TAG_FLAG(test_server_key_version, hidden);
 namespace kudu {
 namespace server {
 
@@ -55,7 +68,9 @@ ServerBaseOptions::ServerBaseOptions()
     dump_info_path(FLAGS_server_dump_info_path),
     dump_info_format(FLAGS_server_dump_info_format),
     metrics_log_interval_ms(FLAGS_metrics_log_interval_ms),
-    server_key(FLAGS_test_server_key) {
+    server_key(FLAGS_test_server_key),
+    server_key_iv(FLAGS_test_server_key_iv),
+    server_key_version(FLAGS_test_server_key_version) {
 }
 
 } // namespace server
diff --git a/src/kudu/server/server_base_options.h 
b/src/kudu/server/server_base_options.h
index c972d6db2..133cf83da 100644
--- a/src/kudu/server/server_base_options.h
+++ b/src/kudu/server/server_base_options.h
@@ -46,6 +46,8 @@ struct ServerBaseOptions {
   int32_t metrics_log_interval_ms;
 
   std::string server_key;
+  std::string server_key_iv;
+  std::string server_key_version;
 
  protected:
   ServerBaseOptions();
diff --git a/src/kudu/tools/kudu-admin-test.cc 
b/src/kudu/tools/kudu-admin-test.cc
index 3b9b12d62..9482afb09 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -74,6 +74,7 @@
 #include "kudu/tools/tool_test_util.h"
 #include "kudu/tserver/tablet_server-test-base.h"
 #include "kudu/util/cow_object.h"
+#include "kudu/util/env.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
@@ -3068,6 +3069,9 @@ vector<string> RebuildMasterCmd(const 
ExternalMiniCluster& cluster,
   if (!tables.empty()) {
     command.emplace_back(Substitute("-tables=$0", tables));
   }
+  if (Env::Default()->IsEncryptionEnabled()) {
+    command.emplace_back("--encrypt_data_at_rest=true");
+  }
   if (log_to_stderr) {
     command.emplace_back("--logtostderr");
   }
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 23601cedf..697edc4a3 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -1633,10 +1633,12 @@ TEST_F(ToolTest, TestFsCheck) {
 }
 
 TEST_F(ToolTest, TestFsCheckLiveServer) {
+  string encryption_args = env_->IsEncryptionEnabled() ? GetEncryptionArgs() : 
"";
   NO_FATALS(StartExternalMiniCluster());
-  string args = Substitute("fs check --fs_wal_dir $0 --fs_data_dirs $1",
+  string args = Substitute("fs check --fs_wal_dir $0 --fs_data_dirs $1 $2",
                            cluster_->GetWalPath("master-0"),
-                           JoinStrings(cluster_->GetDataPaths("master-0"), 
","));
+                           JoinStrings(cluster_->GetDataPaths("master-0"), 
","),
+                           encryption_args);
   NO_FATALS(RunFsCheck(args, 0, "", {}, 0));
   args += " --repair";
   string stdout;
@@ -1676,6 +1678,29 @@ TEST_F(ToolTest, TestFsFormatWithUuid) {
   ASSERT_EQ(fs.uuid(), original_uuid);
 }
 
+TEST_F(ToolTest, TestFsFormatWithServerKey) {
+  const string kTestDir = GetTestPath("test");
+  ObjectIdGenerator generator;
+  string original_uuid = generator.Next();
+  string server_key = "00010203040506070809101112131442";
+  string server_key_iv = "42141312111009080706050403020100";
+  string server_key_version = "kuduclusterkey@0";
+  NO_FATALS(RunActionStdoutNone(Substitute(
+      "fs format --fs_wal_dir=$0 --uuid=$1 --server_key=$2 "
+      "--server_key_iv=$3 --server_key_version=$4",
+      kTestDir, original_uuid, server_key, server_key_iv, 
server_key_version)));
+  FsManager fs(env_, FsManagerOpts(kTestDir));
+  ASSERT_OK(fs.Open());
+
+  string canonicalized_uuid;
+  ASSERT_OK(generator.Canonicalize(fs.uuid(), &canonicalized_uuid));
+  ASSERT_EQ(canonicalized_uuid, fs.uuid());
+  ASSERT_EQ(original_uuid, fs.uuid());
+  ASSERT_EQ(server_key, fs.server_key());
+  ASSERT_EQ(server_key_iv, fs.server_key_iv());
+  ASSERT_EQ(server_key_version, fs.server_key_version());
+}
+
 TEST_F(ToolTest, TestFsDumpUuid) {
   const string kTestDir = GetTestPath("test");
   string uuid;
@@ -8061,6 +8086,17 @@ TEST_F(UnregisterTServerTest, 
TestUnregisterTServerNotPresumedDead) {
 }
 
 TEST_F(ToolTest, TestLocalReplicaCopyLocal) {
+  // TODO(abukor): Rewrite the test to make sure it works with encryption
+  // enabled.
+  //
+  // Right now, this test would fail with encryption enabled, as the local
+  // replica copy shares an Env between the source and the destination and they
+  // use different instance files. This shouldn't be a problem in real life, as
+  // its meant to copy tablets between disks on the same server, which would
+  // share UUIDs and server keys.
+  if (FLAGS_encrypt_data_at_rest) {
+    GTEST_SKIP();
+  }
   // Create replicas and fill some data.
   InternalMiniClusterOptions opts;
   opts.num_data_dirs = 3;
diff --git a/src/kudu/tools/tool_action_common.cc 
b/src/kudu/tools/tool_action_common.cc
index c2aceb824..d2969ac6a 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -54,6 +54,9 @@
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/default_key_provider.h"
+#include "kudu/fs/key_provider.h"
+#include "kudu/fs/ranger_kms_key_provider.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/escaping.h"
@@ -165,6 +168,10 @@ DECLARE_bool(show_values);
 DEFINE_string(instance_file, "",
               "Path to the instance file containing the encrypted encryption 
key.");
 
+DECLARE_string(encryption_key_provider);
+DECLARE_string(ranger_kms_url);
+DECLARE_string(encryption_cluster_key_name);
+
 bool ValidateTimeoutSettings() {
   if (FLAGS_timeout_ms < FLAGS_negotiation_timeout_ms) {
     LOG(ERROR) << strings::Substitute(
@@ -215,6 +222,9 @@ using kudu::rpc::MessengerBuilder;
 using kudu::rpc::RequestIdPB;
 using kudu::rpc::ResponseCallback;
 using kudu::rpc::RpcController;
+using kudu::security::KeyProvider;
+using kudu::security::DefaultKeyProvider;
+using kudu::security::RangerKMSKeyProvider;
 using kudu::server::GenericServiceProxy;
 using kudu::server::GetFlagsRequestPB;
 using kudu::server::GetFlagsResponsePB;
@@ -911,7 +921,21 @@ Status SetServerKey() {
 
   if (string key = instance.server_key();
       !key.empty()) {
-    Env::Default()->SetEncryptionKey(reinterpret_cast<const 
uint8_t*>(a2b_hex(key).c_str()),
+    unique_ptr<security::KeyProvider> key_provider;
+    if (FLAGS_encryption_key_provider == "ranger-kms"
+        || FLAGS_encryption_key_provider == "ranger_kms") {
+      key_provider.reset(new RangerKMSKeyProvider(FLAGS_ranger_kms_url,
+                                                  
FLAGS_encryption_cluster_key_name));
+    } else {
+      key_provider.reset(new DefaultKeyProvider());
+    }
+
+    string server_key;
+    RETURN_NOT_OK(key_provider->DecryptServerKey(instance.server_key(),
+                                                instance.server_key_iv(),
+                                                instance.server_key_version(),
+                                                &server_key));
+    Env::Default()->SetEncryptionKey(reinterpret_cast<const 
uint8_t*>(a2b_hex(server_key).c_str()),
                                      key.length() * 4);
   }
 
diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc
index 18d19e22e..987535ef6 100644
--- a/src/kudu/tools/tool_action_fs.cc
+++ b/src/kudu/tools/tool_action_fs.cc
@@ -70,6 +70,7 @@
 #include "kudu/util/compression/compression.pb.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/flag_validators.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/slice.h"
@@ -87,6 +88,23 @@ DEFINE_string(uuid, "",
               "If not provided, one is generated");
 DEFINE_string(server_key, "",
               "The encrypted server key to use in the filesystem.");
+DEFINE_string(server_key_iv, "",
+              "The server key IV to use in the filesystem.");
+DEFINE_string(server_key_version, "",
+              "The server key version to use in the filesystem.");
+
+bool ServerKeySetTogether() {
+  if (FLAGS_server_key.empty() != FLAGS_server_key_iv.empty()
+      || FLAGS_server_key.empty() != FLAGS_server_key_version.empty()) {
+    LOG(ERROR) << "'server_key', 'server_key_iv', and 'server_key_version' 
must "
+                  "either all be set, or none of them must be set.";
+    return false;
+  }
+  return true;
+}
+
+GROUP_FLAG_VALIDATOR(server_key_set_together, ServerKeySetTogether);
+
 DEFINE_bool(repair, false,
             "Repair any inconsistencies in the filesystem.");
 
@@ -230,13 +248,20 @@ Status Format(const RunnerContext& /*context*/) {
   FsManager fs_manager(Env::Default(), FsManagerOpts());
   optional<string> uuid;
   optional<string> server_key;
+  optional<string> server_key_iv;
+  optional<string> server_key_version;
   if (!FLAGS_uuid.empty()) {
     uuid = FLAGS_uuid;
   }
-  if (!FLAGS_server_key.empty()) {
+  if (!FLAGS_server_key.empty()
+      && !FLAGS_server_key_iv.empty()
+      && !FLAGS_server_key_version.empty()) {
     server_key = FLAGS_server_key;
+    server_key_iv = FLAGS_server_key_iv;
+    server_key_version = FLAGS_server_key_version;
   }
-  return fs_manager.CreateInitialFileSystemLayout(uuid, server_key);
+  return fs_manager.CreateInitialFileSystemLayout(uuid, server_key,
+                                                  server_key_iv, 
server_key_version);
 }
 
 Status DumpUuid(const RunnerContext& /*context*/) {
diff --git a/src/kudu/tools/tool_action_master.cc 
b/src/kudu/tools/tool_action_master.cc
index 088e31217..da1d551bb 100644
--- a/src/kudu/tools/tool_action_master.cc
+++ b/src/kudu/tools/tool_action_master.cc
@@ -353,20 +353,27 @@ Status CopyRemoteSystemCatalog(const string& 
kudu_abs_path,
     return Status::RuntimeError("Failed to find source master to copy system 
catalog");
   }
 
+  vector<string> delete_args =
+      { kudu_abs_path, "local_replica", "delete", 
master::SysCatalogTable::kSysCatalogTabletId,
+        "--fs_wal_dir=" + FLAGS_fs_wal_dir, "--fs_data_dirs=" + 
FLAGS_fs_data_dirs,
+        "-clean_unsafe" };
+  vector<string> copy_args =
+      { kudu_abs_path, "local_replica", "copy_from_remote",
+        master::SysCatalogTable::kSysCatalogTabletId, src_master,
+        "--fs_wal_dir=" + FLAGS_fs_wal_dir, "--fs_data_dirs=" + 
FLAGS_fs_data_dirs };
+
+  if (Env::Default()->IsEncryptionEnabled()) {
+    delete_args.emplace_back("--encrypt_data_at_rest=true");
+    copy_args.emplace_back("--encrypt_data_at_rest=true");
+  }
+
   LOG(INFO) << Substitute("Deleting system catalog on $0", dst_master_str);
   RETURN_NOT_OK_PREPEND(
-      Subprocess::Call(
-          { kudu_abs_path, "local_replica", "delete", 
master::SysCatalogTable::kSysCatalogTabletId,
-            "--fs_wal_dir=" + FLAGS_fs_wal_dir, "--fs_data_dirs=" + 
FLAGS_fs_data_dirs,
-            "-clean_unsafe" }),
+      Subprocess::Call(delete_args),
       "Failed to delete system catalog");
-
   LOG(INFO) << Substitute("Copying system catalog from master $0", src_master);
   RETURN_NOT_OK_PREPEND(
-      Subprocess::Call(
-      { kudu_abs_path, "local_replica", "copy_from_remote",
-        master::SysCatalogTable::kSysCatalogTabletId, src_master,
-        "--fs_wal_dir=" + FLAGS_fs_wal_dir, "--fs_data_dirs=" + 
FLAGS_fs_data_dirs }),
+      Subprocess::Call(copy_args),
       "Failed to copy system catalog");
 
   return Status::OK();
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc 
b/src/kudu/tserver/tablet_copy_client-test.cc
index 6e34a7533..633495cfc 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -123,11 +123,17 @@ class TabletCopyClientTest : public TabletCopyTest {
     metric_entity_ = METRIC_ENTITY_server.Instantiate(&metric_registry_, 
"test");
     opts.metric_entity = metric_entity_;
     fs_manager_.reset(new FsManager(Env::Default(), opts));
-    string server_key = KuduTest::GetEncryptionKey();
+    string server_key;
+    string server_key_iv;
+    string server_key_version;
+    GetEncryptionKey(&server_key, &server_key_iv, &server_key_version);
     if (server_key.empty()) {
       ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
     } else {
-      ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout(std::nullopt, 
server_key));
+      ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout(std::nullopt,
+                                                           server_key,
+                                                           server_key_iv,
+                                                           
server_key_version));
     }
     ASSERT_OK(fs_manager_->Open());
     ASSERT_OK(ResetTabletCopyClient());
diff --git a/src/kudu/tserver/tablet_server-test-base.cc 
b/src/kudu/tserver/tablet_server-test-base.cc
index 38787b30d..c28f9ebac 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -66,6 +66,8 @@ DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in 
seconds");
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_int32(heartbeat_rpc_timeout_ms);
 DECLARE_string(test_server_key);
+DECLARE_string(test_server_key_iv);
+DECLARE_string(test_server_key_version);
 
 METRIC_DEFINE_entity(test);
 
@@ -97,7 +99,9 @@ TabletServerTestBase::TabletServerTestBase()
   // purposefully specify non-running Master servers.
   FLAGS_heartbeat_rpc_timeout_ms = 1000;
 
-  FLAGS_test_server_key = GetEncryptionKey();
+  GetEncryptionKey(&FLAGS_test_server_key,
+                   &FLAGS_test_server_key_iv,
+                   &FLAGS_test_server_key_version);
 }
 
 // Starts the tablet server, override to start it later.
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 7957db5d9..7a68628bf 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -205,9 +205,9 @@ TAG_FLAG(env_inject_lock_failure_globs, hidden);
 
 DEFINE_bool(encrypt_data_at_rest, false,
             "Whether sensitive files should be encrypted on the file system.");
-TAG_FLAG(encrypt_data_at_rest, hidden);
 DEFINE_int32(encryption_key_length, 128, "Encryption key length.");
-TAG_FLAG(encryption_key_length, hidden);
+TAG_FLAG(encryption_key_length, advanced);
+
 DEFINE_validator(encryption_key_length,
                  [](const char* /*n*/, int32 v) { return v == 128 || v == 192 
|| v == 256; });
 
@@ -2286,7 +2286,7 @@ class PosixEnv : public Env {
         eh.algorithm = EncryptionAlgorithm::AES256ECB;
         break;
       default:
-        LOG(FATAL) << "Illegal key size";
+        LOG(FATAL) << "Illegal key size: " << key_size;
     }
     memcpy(eh.key, server_key, key_size / 8);
     server_key_ = eh;
diff --git a/src/kudu/util/test_util.cc b/src/kudu/util/test_util.cc
index ba5f16967..8779d659b 100644
--- a/src/kudu/util/test_util.cc
+++ b/src/kudu/util/test_util.cc
@@ -86,6 +86,9 @@ static const char* const kEncryptDataInTests = 
"KUDU_ENCRYPT_DATA_IN_TESTS";
 static const int kEncryptionKeySize = 16;
 static const uint8_t kEncryptionKey[kEncryptionKeySize] =
   {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 42};
+static const uint8_t kEncryptionKeyIv[kEncryptionKeySize] =
+  {42, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
+static const char* const kEncryptionKeyVersion = "kuduclusterkey@0";
 
 static const uint64_t kTestBeganAtMicros = Env::Default()->NowMicros();
 
@@ -205,13 +208,16 @@ void KuduTest::SetEncryptionFlags(bool enable_encryption) 
{
   }
 }
 
-const string KuduTest::GetEncryptionKey() {
+void KuduTest::GetEncryptionKey(string* key, string* iv, string* version) {
   if (FLAGS_encrypt_data_at_rest) {
-    string key;
-    strings::b2a_hex(kEncryptionKey, &key, kEncryptionKeySize);
-    return key;
+    strings::b2a_hex(kEncryptionKey, key, kEncryptionKeySize);
+    strings::b2a_hex(kEncryptionKeyIv, iv, kEncryptionKeySize);
+    *version = kEncryptionKeyVersion;
+  } else {
+    *key = "";
+    *iv = "";
+    *version = "";
   }
-  return "";
 }
 
 ///////////////////////////////////////////////////
diff --git a/src/kudu/util/test_util.h b/src/kudu/util/test_util.h
index 5009b85a6..be1c37ad0 100644
--- a/src/kudu/util/test_util.h
+++ b/src/kudu/util/test_util.h
@@ -73,8 +73,8 @@ class KuduTest : public ::testing::Test {
   // variables so that we don't pick up the user's credentials.
   static void OverrideKrb5Environment();
 
-  // Returns the encryption key used by the test.
-  static const std::string GetEncryptionKey();
+  // Returns the encryption key, IV, and version used by the test.
+  static void GetEncryptionKey(std::string* key, std::string* iv, std::string* 
version);
 
  protected:
   // Returns absolute path based on a unit test-specific work directory, given

Reply via email to