acelyc111 commented on code in PR #1706:
URL: 
https://github.com/apache/incubator-pegasus/pull/1706#discussion_r1448364338


##########
src/security/kms_client.cpp:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <initializer_list>
+#include <map>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "absl/strings/escaping.h"
+#include "fmt/core.h"
+#include "http/http_client.h"
+#include "http/http_method.h"
+#include "http/http_status_code.h"
+#include "nlohmann/json.hpp"
+#include "nlohmann/json_fwd.hpp"
+#include "replica/replication_app_base.h"
+#include "security/kms_client.h"
+#include "utils/error_code.h"
+#include "utils/fmt_logging.h"
+
+namespace dsn {
+namespace security {
+
+#define RETURN_ERRS_NOT_TRUE(exp, code, ...)                                   
                    \
+    do {                                                                       
                    \
+        if (dsn_unlikely(!exp)) {                                              
                    \
+            return dsn::error_s::make(code, fmt::format(__VA_ARGS__));         
                    \
+        }                                                                      
                    \
+    } while (false);
+
+dsn::error_s KMSClient::DecryptEncryptionKey(const dsn::replication::kms_info 
&kms_info,
+                                             std::string *decrypted_key)
+{
+    nlohmann::json payload;
+    payload["name"] = cluster_key_name_;
+    std::string iv_plain = ::absl::HexStringToBytes(kms_info.iv);
+    std::string iv_b64;
+    ::absl::WebSafeBase64Escape(iv_plain, &iv_b64);
+    payload["iv"] = iv_b64;
+    std::string eek_plain = ::absl::HexStringToBytes(kms_info.eek);
+    std::string eek_b64;
+    ::absl::WebSafeBase64Escape(eek_plain, &eek_b64);
+    payload["material"] = eek_b64;
+
+    http_client client;
+    RETURN_NOT_OK(client.init());
+    RETURN_NOT_OK(client.set_auth(http_auth_type::SPNEGO));
+
+    std::vector<std::string> urls;
+    urls.reserve(kms_urls_.size());
+    for (const auto &url : kms_urls_) {
+        
urls.emplace_back(fmt::format("{}/v1/keyversion/{}/_eek?eek_op=decrypt", url, 
kms_info.kv));
+    }
+    client.clear_header_fields();
+    client.set_content_type("application/json");
+    client.set_accept("*/*");
+
+    RETURN_NOT_OK(client.with_post_method(payload.dump()));
+
+    nlohmann::json j;
+    for (const auto &url : urls) {
+        RETURN_NOT_OK(client.set_url(url));
+        std::string resp;
+        auto err = client.exec_method(&resp);
+        if (err.code() == ERR_NETWORK_FAILURE || err.code() == ERR_TIMEOUT) {
+            continue;
+        }
+        RETURN_NOT_OK(err);
+        http_status_code http_status;
+        client.get_http_status(http_status);
+        if (http_status == http_status_code::kOk) {
+            try {
+                j = nlohmann::json::parse(resp);
+            } catch (nlohmann::json::exception &exp) {
+                LOG_ERROR("encode kms_info to json failed: {}, data = [{}]", 
exp.what(), resp);
+            }
+            break;
+        }
+        LOG_WARNING(
+            "The http status is ({}), and url is ({})", 
get_http_status_message(http_status), url);

Review Comment:
   Reduce the indents to improve readability.
   ```suggestion
           if (http_status != http_status_code::kOk) {
               LOG_WARNING(...);
               continue;
           }
           
           try {
               j = nlohmann::json::parse(resp);
           } catch (nlohmann::json::exception &exp) {
               LOG_ERROR("encode kms_info to json failed: {}, data = [{}]", 
exp.what(), resp);
           }
           break;
   ```



##########
src/security/kms_client.cpp:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <initializer_list>
+#include <map>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "absl/strings/escaping.h"
+#include "fmt/core.h"
+#include "http/http_client.h"
+#include "http/http_method.h"
+#include "http/http_status_code.h"
+#include "nlohmann/json.hpp"
+#include "nlohmann/json_fwd.hpp"
+#include "replica/replication_app_base.h"
+#include "security/kms_client.h"
+#include "utils/error_code.h"
+#include "utils/fmt_logging.h"
+
+namespace dsn {
+namespace security {
+
+#define RETURN_ERRS_NOT_TRUE(exp, code, ...)                                   
                    \
+    do {                                                                       
                    \
+        if (dsn_unlikely(!exp)) {                                              
                    \
+            return dsn::error_s::make(code, fmt::format(__VA_ARGS__));         
                    \
+        }                                                                      
                    \
+    } while (false);
+
+dsn::error_s KMSClient::DecryptEncryptionKey(const dsn::replication::kms_info 
&kms_info,
+                                             std::string *decrypted_key)
+{
+    nlohmann::json payload;
+    payload["name"] = cluster_key_name_;
+    std::string iv_plain = ::absl::HexStringToBytes(kms_info.iv);
+    std::string iv_b64;
+    ::absl::WebSafeBase64Escape(iv_plain, &iv_b64);
+    payload["iv"] = iv_b64;
+    std::string eek_plain = ::absl::HexStringToBytes(kms_info.eek);
+    std::string eek_b64;
+    ::absl::WebSafeBase64Escape(eek_plain, &eek_b64);
+    payload["material"] = eek_b64;
+
+    http_client client;
+    RETURN_NOT_OK(client.init());
+    RETURN_NOT_OK(client.set_auth(http_auth_type::SPNEGO));
+
+    std::vector<std::string> urls;
+    urls.reserve(kms_urls_.size());
+    for (const auto &url : kms_urls_) {
+        
urls.emplace_back(fmt::format("{}/v1/keyversion/{}/_eek?eek_op=decrypt", url, 
kms_info.kv));
+    }
+    client.clear_header_fields();
+    client.set_content_type("application/json");
+    client.set_accept("*/*");
+
+    RETURN_NOT_OK(client.with_post_method(payload.dump()));
+
+    nlohmann::json j;
+    for (const auto &url : urls) {
+        RETURN_NOT_OK(client.set_url(url));
+        std::string resp;
+        auto err = client.exec_method(&resp);
+        if (err.code() == ERR_NETWORK_FAILURE || err.code() == ERR_TIMEOUT) {
+            continue;
+        }
+        RETURN_NOT_OK(err);
+        http_status_code http_status;
+        client.get_http_status(http_status);
+        if (http_status == http_status_code::kOk) {
+            try {
+                j = nlohmann::json::parse(resp);
+            } catch (nlohmann::json::exception &exp) {
+                LOG_ERROR("encode kms_info to json failed: {}, data = [{}]", 
exp.what(), resp);
+            }
+            break;
+        }
+        LOG_WARNING(
+            "The http status is ({}), and url is ({})", 
get_http_status_message(http_status), url);
+    }
+
+    std::string dek_b64;
+    RETURN_ERRS_NOT_TRUE(j.contains("material"), ERR_INVALID_DATA, "Null 
material received");
+    dek_b64 = j.at("material");
+
+    std::string dek_plain;
+    RETURN_ERRS_NOT_TRUE(::absl::WebSafeBase64Unescape(dek_b64, &dek_plain), 
ERR_INVALID_DATA, "Invalid IV received");
+
+    *decrypted_key = ::absl::BytesToHexString(dek_plain);
+    return dsn::error_s::ok();
+}
+
+dsn::error_s KMSClient::GenerateEncryptionKeyFromKMS(const std::string 
&key_name,
+                                                     
dsn::replication::kms_info *kms_info)
+{
+    http_client client;
+    RETURN_NOT_OK(client.init());
+    RETURN_NOT_OK(client.set_auth(http_auth_type::SPNEGO));
+
+    std::vector<std::string> urls;
+    urls.reserve(kms_urls_.size());
+    for (const auto &url : kms_urls_) {
+        urls.emplace_back(
+            fmt::format("{}/v1/key/{}/_eek?eek_op=generate&num_keys=1", url, 
key_name));
+    }
+
+    nlohmann::json j = nlohmann::json::object();
+    for (const auto &url : urls) {
+        RETURN_NOT_OK(client.set_url(url));
+        RETURN_NOT_OK(client.with_get_method());
+        std::string resp;
+        auto err = client.exec_method(&resp);
+        if (err.code() == ERR_NETWORK_FAILURE || err.code() == ERR_TIMEOUT) {
+            continue;
+        }
+        RETURN_NOT_OK(err);
+        http_status_code http_status;
+        client.get_http_status(http_status);
+        if (http_status == http_status_code::kOk) {
+            try {
+                j = nlohmann::json::parse(resp).at(0);
+            } catch (nlohmann::json::exception &exp) {
+                LOG_ERROR("encode kms_info to json failed: {}, data = [{}]", 
exp.what(), resp);
+            }
+            break;
+        }
+        LOG_WARNING(
+            "The http status is ({}), and url is ({})", 
get_http_status_message(http_status), url);
+    }
+
+    RETURN_ERRS_NOT_TRUE(!j["versionName"].is_null(), ERR_INVALID_DATA, "Null 
versionName received");
+    j["versionName"].get_to(kms_info->kv);
+
+    std::string iv_b64;
+    RETURN_ERRS_NOT_TRUE(!j["iv"].is_null(), ERR_INVALID_DATA, "Null IV 
received");
+    j["iv"].get_to(iv_b64);
+
+    std::string iv_plain;
+    RETURN_ERRS_NOT_TRUE(::absl::WebSafeBase64Unescape(iv_b64, &iv_plain), 
ERR_INVALID_DATA, "Invalid IV received");
+    kms_info->iv = ::absl::BytesToHexString(iv_plain);
+
+    std::string key_b64;
+    RETURN_ERRS_NOT_TRUE(!j["encryptedKeyVersion"].is_null() && 
!j["encryptedKeyVersion"]["material"].is_null(), ERR_INVALID_DATA, "Null 
encryptedKeyVersion or material received");

Review Comment:
   It's too long and the error message is ambiguity, separate the 2 states.



##########
src/security/kms_client.cpp:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <initializer_list>
+#include <map>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "absl/strings/escaping.h"
+#include "fmt/core.h"
+#include "http/http_client.h"
+#include "http/http_method.h"
+#include "http/http_status_code.h"
+#include "nlohmann/json.hpp"
+#include "nlohmann/json_fwd.hpp"
+#include "replica/replication_app_base.h"
+#include "security/kms_client.h"
+#include "utils/error_code.h"
+#include "utils/fmt_logging.h"
+
+namespace dsn {
+namespace security {
+
+#define RETURN_ERRS_NOT_TRUE(exp, code, ...)                                   
                    \
+    do {                                                                       
                    \
+        if (dsn_unlikely(!exp)) {                                              
                    \
+            return dsn::error_s::make(code, fmt::format(__VA_ARGS__));         
                    \
+        }                                                                      
                    \
+    } while (false);
+
+dsn::error_s KMSClient::DecryptEncryptionKey(const dsn::replication::kms_info 
&kms_info,

Review Comment:
   Add tests to check the functions, the tests can be skipped if no KMS confif 
provided, see how does `TEST_P(HDFSClientTest, test_hdfs_read_write)` do.



##########
src/replica/replica_stub.cpp:
##########
@@ -292,12 +304,62 @@ DSN_DEFINE_int32(
     "if tcmalloc reserved but not-used memory exceed this percentage of 
application allocated "
     "memory, replica server will release the exceeding memory back to 
operating system");
 
+DSN_DEFINE_string(
+    pegasus.server,
+    hadoop_kms_url,
+    "",
+    "Provide the comma-separated list of URLs from which to retrieve the "
+    "file system's server key. Example format: 
'hostname1:1234/kms,hostname2:1234/kms'.");
+
 DSN_DECLARE_bool(duplication_enabled);
 DSN_DECLARE_int32(fd_beacon_interval_seconds);
 DSN_DECLARE_int32(fd_check_interval_seconds);
 DSN_DECLARE_int32(fd_grace_seconds);
 DSN_DECLARE_int32(fd_lease_seconds);
 DSN_DECLARE_int32(gc_interval_ms);
+DSN_DECLARE_string(data_dirs);
+DSN_DEFINE_group_validator(encrypt_data_at_rest_pre_check, [](std::string 
&message) -> bool {
+    if (!dsn::security::FLAGS_enable_acl && FLAGS_encrypt_data_at_rest) {
+        message = fmt::format("[pegasus.server] encrypt_data_at_rest should be 
enabled only if "
+                              "[security] enable_acl is enabled.");
+        return false;
+    }
+    return true;
+});
+
+DSN_DEFINE_group_validator(encrypt_data_not_support_close, [](std::string 
&message) -> bool {

Review Comment:
   ```suggestion
   DSN_DEFINE_group_validator(encrypt_data_is_irreversible, [](std::string 
&message) -> bool {
   ```



##########
src/replica/replica_stub.cpp:
##########
@@ -292,12 +304,62 @@ DSN_DEFINE_int32(
     "if tcmalloc reserved but not-used memory exceed this percentage of 
application allocated "
     "memory, replica server will release the exceeding memory back to 
operating system");
 
+DSN_DEFINE_string(
+    pegasus.server,
+    hadoop_kms_url,
+    "",
+    "Provide the comma-separated list of URLs from which to retrieve the "
+    "file system's server key. Example format: 
'hostname1:1234/kms,hostname2:1234/kms'.");
+
 DSN_DECLARE_bool(duplication_enabled);
 DSN_DECLARE_int32(fd_beacon_interval_seconds);
 DSN_DECLARE_int32(fd_check_interval_seconds);
 DSN_DECLARE_int32(fd_grace_seconds);
 DSN_DECLARE_int32(fd_lease_seconds);
 DSN_DECLARE_int32(gc_interval_ms);
+DSN_DECLARE_string(data_dirs);
+DSN_DEFINE_group_validator(encrypt_data_at_rest_pre_check, [](std::string 
&message) -> bool {
+    if (!dsn::security::FLAGS_enable_acl && FLAGS_encrypt_data_at_rest) {
+        message = fmt::format("[pegasus.server] encrypt_data_at_rest should be 
enabled only if "
+                              "[security] enable_acl is enabled.");
+        return false;
+    }
+    return true;
+});
+
+DSN_DEFINE_group_validator(encrypt_data_not_support_close, [](std::string 
&message) -> bool {
+    std::vector<std::string> dirs;
+    std::string data_dirs;
+    // In some unit test FLAGS_data_dirs may not set.

Review Comment:
   Use `MOCK_TEST` to check it.



##########
src/security/kms_client.cpp:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <initializer_list>
+#include <map>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "absl/strings/escaping.h"
+#include "fmt/core.h"
+#include "http/http_client.h"
+#include "http/http_method.h"
+#include "http/http_status_code.h"
+#include "nlohmann/json.hpp"
+#include "nlohmann/json_fwd.hpp"
+#include "replica/replication_app_base.h"
+#include "security/kms_client.h"
+#include "utils/error_code.h"
+#include "utils/fmt_logging.h"
+
+namespace dsn {
+namespace security {
+
+#define RETURN_ERRS_NOT_TRUE(exp, code, ...)                                   
                    \

Review Comment:
   Move it to src/utils/errors.h and submit it in a separate patch.



##########
src/replica/replica_stub.cpp:
##########
@@ -292,12 +304,62 @@ DSN_DEFINE_int32(
     "if tcmalloc reserved but not-used memory exceed this percentage of 
application allocated "
     "memory, replica server will release the exceeding memory back to 
operating system");
 
+DSN_DEFINE_string(
+    pegasus.server,
+    hadoop_kms_url,
+    "",
+    "Provide the comma-separated list of URLs from which to retrieve the "
+    "file system's server key. Example format: 
'hostname1:1234/kms,hostname2:1234/kms'.");
+
 DSN_DECLARE_bool(duplication_enabled);
 DSN_DECLARE_int32(fd_beacon_interval_seconds);
 DSN_DECLARE_int32(fd_check_interval_seconds);
 DSN_DECLARE_int32(fd_grace_seconds);
 DSN_DECLARE_int32(fd_lease_seconds);
 DSN_DECLARE_int32(gc_interval_ms);
+DSN_DECLARE_string(data_dirs);
+DSN_DEFINE_group_validator(encrypt_data_at_rest_pre_check, [](std::string 
&message) -> bool {
+    if (!dsn::security::FLAGS_enable_acl && FLAGS_encrypt_data_at_rest) {
+        message = fmt::format("[pegasus.server] encrypt_data_at_rest should be 
enabled only if "
+                              "[security] enable_acl is enabled.");
+        return false;
+    }
+    return true;
+});
+
+DSN_DEFINE_group_validator(encrypt_data_not_support_close, [](std::string 
&message) -> bool {
+    std::vector<std::string> dirs;
+    std::string data_dirs;
+    // In some unit test FLAGS_data_dirs may not set.
+    if (!dsn::utils::is_empty(FLAGS_data_dirs)) {
+        data_dirs = FLAGS_data_dirs;
+    } else {
+        return true;
+    }
+    ::absl::StrSplit(data_dirs.c_str(), dirs, ',');
+    std::string kms_path =
+        utils::filesystem::path_combine(dirs[0], 
dsn::replication::kms_info::kKmsInfo);
+    if (!FLAGS_encrypt_data_at_rest && 
utils::filesystem::path_exists(kms_path)) {
+        message = fmt::format("The kms_info file exists at ({}), but 
[pegasus.server] "
+                              "encrypt_data_at_rest is set to ({})."
+                              "Encryption in Pegasus is irreversible after its 
initial activation.",
+                              kms_path,
+                              FLAGS_encrypt_data_at_rest);
+        return false;
+    }
+    return true;
+});
+
+DSN_DEFINE_group_validator(encrypt_data_at_rest_with_kms_url, [](std::string 
&message) -> bool {
+    #ifndef MOCK_TEST

Review Comment:
   ```suggestion
   #ifndef MOCK_TEST
   ```



##########
src/security/kms_client.cpp:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <initializer_list>
+#include <map>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "absl/strings/escaping.h"
+#include "fmt/core.h"
+#include "http/http_client.h"
+#include "http/http_method.h"
+#include "http/http_status_code.h"
+#include "nlohmann/json.hpp"
+#include "nlohmann/json_fwd.hpp"
+#include "replica/replication_app_base.h"
+#include "security/kms_client.h"
+#include "utils/error_code.h"
+#include "utils/fmt_logging.h"
+
+namespace dsn {
+namespace security {
+
+#define RETURN_ERRS_NOT_TRUE(exp, code, ...)                                   
                    \
+    do {                                                                       
                    \
+        if (dsn_unlikely(!exp)) {                                              
                    \
+            return dsn::error_s::make(code, fmt::format(__VA_ARGS__));         
                    \
+        }                                                                      
                    \
+    } while (false);
+
+dsn::error_s KMSClient::DecryptEncryptionKey(const dsn::replication::kms_info 
&kms_info,
+                                             std::string *decrypted_key)
+{
+    nlohmann::json payload;
+    payload["name"] = cluster_key_name_;
+    std::string iv_plain = ::absl::HexStringToBytes(kms_info.iv);
+    std::string iv_b64;
+    ::absl::WebSafeBase64Escape(iv_plain, &iv_b64);
+    payload["iv"] = iv_b64;
+    std::string eek_plain = ::absl::HexStringToBytes(kms_info.eek);
+    std::string eek_b64;
+    ::absl::WebSafeBase64Escape(eek_plain, &eek_b64);
+    payload["material"] = eek_b64;
+
+    http_client client;
+    RETURN_NOT_OK(client.init());
+    RETURN_NOT_OK(client.set_auth(http_auth_type::SPNEGO));
+
+    std::vector<std::string> urls;
+    urls.reserve(kms_urls_.size());
+    for (const auto &url : kms_urls_) {
+        
urls.emplace_back(fmt::format("{}/v1/keyversion/{}/_eek?eek_op=decrypt", url, 
kms_info.kv));
+    }
+    client.clear_header_fields();
+    client.set_content_type("application/json");
+    client.set_accept("*/*");
+
+    RETURN_NOT_OK(client.with_post_method(payload.dump()));
+
+    nlohmann::json j;
+    for (const auto &url : urls) {
+        RETURN_NOT_OK(client.set_url(url));
+        std::string resp;
+        auto err = client.exec_method(&resp);
+        if (err.code() == ERR_NETWORK_FAILURE || err.code() == ERR_TIMEOUT) {
+            continue;
+        }
+        RETURN_NOT_OK(err);
+        http_status_code http_status;
+        client.get_http_status(http_status);
+        if (http_status == http_status_code::kOk) {
+            try {
+                j = nlohmann::json::parse(resp);
+            } catch (nlohmann::json::exception &exp) {
+                LOG_ERROR("encode kms_info to json failed: {}, data = [{}]", 
exp.what(), resp);
+            }
+            break;
+        }
+        LOG_WARNING(
+            "The http status is ({}), and url is ({})", 
get_http_status_message(http_status), url);
+    }
+
+    std::string dek_b64;
+    RETURN_ERRS_NOT_TRUE(j.contains("material"), ERR_INVALID_DATA, "Null 
material received");
+    dek_b64 = j.at("material");
+
+    std::string dek_plain;
+    RETURN_ERRS_NOT_TRUE(::absl::WebSafeBase64Unescape(dek_b64, &dek_plain), 
ERR_INVALID_DATA, "Invalid IV received");
+
+    *decrypted_key = ::absl::BytesToHexString(dek_plain);
+    return dsn::error_s::ok();
+}
+
+dsn::error_s KMSClient::GenerateEncryptionKeyFromKMS(const std::string 
&key_name,
+                                                     
dsn::replication::kms_info *kms_info)
+{
+    http_client client;
+    RETURN_NOT_OK(client.init());
+    RETURN_NOT_OK(client.set_auth(http_auth_type::SPNEGO));
+
+    std::vector<std::string> urls;
+    urls.reserve(kms_urls_.size());
+    for (const auto &url : kms_urls_) {
+        urls.emplace_back(
+            fmt::format("{}/v1/key/{}/_eek?eek_op=generate&num_keys=1", url, 
key_name));
+    }
+
+    nlohmann::json j = nlohmann::json::object();
+    for (const auto &url : urls) {
+        RETURN_NOT_OK(client.set_url(url));
+        RETURN_NOT_OK(client.with_get_method());
+        std::string resp;
+        auto err = client.exec_method(&resp);
+        if (err.code() == ERR_NETWORK_FAILURE || err.code() == ERR_TIMEOUT) {
+            continue;
+        }
+        RETURN_NOT_OK(err);
+        http_status_code http_status;
+        client.get_http_status(http_status);
+        if (http_status == http_status_code::kOk) {

Review Comment:
   Refactor the code as above.



##########
src/replica/replica_stub.cpp:
##########
@@ -292,12 +304,62 @@ DSN_DEFINE_int32(
     "if tcmalloc reserved but not-used memory exceed this percentage of 
application allocated "
     "memory, replica server will release the exceeding memory back to 
operating system");
 
+DSN_DEFINE_string(
+    pegasus.server,
+    hadoop_kms_url,
+    "",
+    "Provide the comma-separated list of URLs from which to retrieve the "
+    "file system's server key. Example format: 
'hostname1:1234/kms,hostname2:1234/kms'.");
+
 DSN_DECLARE_bool(duplication_enabled);
 DSN_DECLARE_int32(fd_beacon_interval_seconds);
 DSN_DECLARE_int32(fd_check_interval_seconds);
 DSN_DECLARE_int32(fd_grace_seconds);
 DSN_DECLARE_int32(fd_lease_seconds);
 DSN_DECLARE_int32(gc_interval_ms);
+DSN_DECLARE_string(data_dirs);
+DSN_DEFINE_group_validator(encrypt_data_at_rest_pre_check, [](std::string 
&message) -> bool {
+    if (!dsn::security::FLAGS_enable_acl && FLAGS_encrypt_data_at_rest) {
+        message = fmt::format("[pegasus.server] encrypt_data_at_rest should be 
enabled only if "
+                              "[security] enable_acl is enabled.");
+        return false;
+    }
+    return true;
+});
+
+DSN_DEFINE_group_validator(encrypt_data_not_support_close, [](std::string 
&message) -> bool {
+    std::vector<std::string> dirs;
+    std::string data_dirs;
+    // In some unit test FLAGS_data_dirs may not set.
+    if (!dsn::utils::is_empty(FLAGS_data_dirs)) {
+        data_dirs = FLAGS_data_dirs;
+    } else {
+        return true;
+    }
+    ::absl::StrSplit(data_dirs.c_str(), dirs, ',');
+    std::string kms_path =
+        utils::filesystem::path_combine(dirs[0], 
dsn::replication::kms_info::kKmsInfo);
+    if (!FLAGS_encrypt_data_at_rest && 
utils::filesystem::path_exists(kms_path)) {
+        message = fmt::format("The kms_info file exists at ({}), but 
[pegasus.server] "
+                              "encrypt_data_at_rest is set to ({})."
+                              "Encryption in Pegasus is irreversible after its 
initial activation.",
+                              kms_path,
+                              FLAGS_encrypt_data_at_rest);
+        return false;
+    }
+    return true;
+});
+
+DSN_DEFINE_group_validator(encrypt_data_at_rest_with_kms_url, [](std::string 
&message) -> bool {
+    #ifndef MOCK_TEST
+    if (FLAGS_encrypt_data_at_rest && utils::is_empty(FLAGS_hadoop_kms_url)) {
+        message = fmt::format("[security] hadoop_kms_url should not be empty 
when [pegasus.server] encrypt_data_at_rest should be enabled only if "

Review Comment:
   ```suggestion
           message = fmt::format("[security] hadoop_kms_url should not be empty 
when [pegasus.server] encrypt_data_at_rest "
   ```



##########
src/replica/replica_stub.cpp:
##########
@@ -389,9 +451,40 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
         }
     }
 
+    std::string server_key;
+    dsn::replication::kms_info kms_info;
+    std::string kms_path = utils::filesystem::path_combine(
+        _options.data_dirs[0], dsn::replication::kms_info::kKmsInfo);
+    if (FLAGS_encrypt_data_at_rest && !utils::is_empty(FLAGS_hadoop_kms_url)) {
+        key_provider.reset(new dsn::security::KMSKeyProvider(
+            ::absl::StrSplit(FLAGS_hadoop_kms_url, ",", ::absl::SkipEmpty()), 
FLAGS_cluster_name));
+        auto ec = dsn::utils::load_rjobj_from_file(
+            kms_path, dsn::utils::FileDataType::kNonSensitive, &kms_info);
+        if (ec != dsn::ERR_OK) {
+            LOG_WARNING("It's normal to encounter a temporary inability to 
open the kms-info file "
+                        "during the first process launch. error_code = {}",
+                        ec);
+        }
+        // Upon the first launch, the encryption key should be empty. The 
process will then retrieve
+        // EEK, IV, and KV from KMS.
+        // After the first launch, the encryption key, obtained from the 
kms-info file, should not
+        // be empty. The process will then acquire the DEK from KMS.
+        if (!utils::filesystem::path_exists(kms_path)) {
+            CHECK_OK(key_provider->GenerateEncryptionKey(&kms_info), "Generate 
encryption key from kms failed");
+        }
+        CHECK_OK(key_provider->DecryptEncryptionKey(kms_info, &server_key), 
"Get decryption key failed from {}", kms_path);
+        FLAGS_server_key = server_key.c_str();
+    }
+
     // Initialize the file system manager.
     _fs_manager.initialize(_options.data_dirs, _options.data_dir_tags);
 
+    if (key_provider) {
+        auto err = dsn::utils::dump_rjobj_to_file(

Review Comment:
   Is it necessary to write the file when the file exist?



##########
src/replica/replica_stub.cpp:
##########
@@ -389,9 +451,40 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
         }
     }
 
+    std::string server_key;
+    dsn::replication::kms_info kms_info;
+    std::string kms_path = utils::filesystem::path_combine(
+        _options.data_dirs[0], dsn::replication::kms_info::kKmsInfo);
+    if (FLAGS_encrypt_data_at_rest && !utils::is_empty(FLAGS_hadoop_kms_url)) {
+        key_provider.reset(new dsn::security::KMSKeyProvider(
+            ::absl::StrSplit(FLAGS_hadoop_kms_url, ",", ::absl::SkipEmpty()), 
FLAGS_cluster_name));
+        auto ec = dsn::utils::load_rjobj_from_file(
+            kms_path, dsn::utils::FileDataType::kNonSensitive, &kms_info);
+        if (ec != dsn::ERR_OK) {

Review Comment:
   Use the dsn::ERR_PATH_NOT_FOUND code to check the first time starting the 
server.
   LOG fatal and exit the process in other error code.



##########
src/replica/replica_stub.cpp:
##########
@@ -292,12 +304,62 @@ DSN_DEFINE_int32(
     "if tcmalloc reserved but not-used memory exceed this percentage of 
application allocated "
     "memory, replica server will release the exceeding memory back to 
operating system");
 
+DSN_DEFINE_string(
+    pegasus.server,
+    hadoop_kms_url,
+    "",
+    "Provide the comma-separated list of URLs from which to retrieve the "
+    "file system's server key. Example format: 
'hostname1:1234/kms,hostname2:1234/kms'.");
+
 DSN_DECLARE_bool(duplication_enabled);
 DSN_DECLARE_int32(fd_beacon_interval_seconds);
 DSN_DECLARE_int32(fd_check_interval_seconds);
 DSN_DECLARE_int32(fd_grace_seconds);
 DSN_DECLARE_int32(fd_lease_seconds);
 DSN_DECLARE_int32(gc_interval_ms);
+DSN_DECLARE_string(data_dirs);
+DSN_DEFINE_group_validator(encrypt_data_at_rest_pre_check, [](std::string 
&message) -> bool {
+    if (!dsn::security::FLAGS_enable_acl && FLAGS_encrypt_data_at_rest) {
+        message = fmt::format("[pegasus.server] encrypt_data_at_rest should be 
enabled only if "
+                              "[security] enable_acl is enabled.");
+        return false;
+    }
+    return true;
+});
+
+DSN_DEFINE_group_validator(encrypt_data_not_support_close, [](std::string 
&message) -> bool {
+    std::vector<std::string> dirs;

Review Comment:
   Move it closer to the place where it is used.



##########
src/replica/replica_stub.cpp:
##########
@@ -389,9 +451,40 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
         }
     }
 
+    std::string server_key;
+    dsn::replication::kms_info kms_info;
+    std::string kms_path = utils::filesystem::path_combine(
+        _options.data_dirs[0], dsn::replication::kms_info::kKmsInfo);
+    if (FLAGS_encrypt_data_at_rest && !utils::is_empty(FLAGS_hadoop_kms_url)) {
+        key_provider.reset(new dsn::security::KMSKeyProvider(
+            ::absl::StrSplit(FLAGS_hadoop_kms_url, ",", ::absl::SkipEmpty()), 
FLAGS_cluster_name));
+        auto ec = dsn::utils::load_rjobj_from_file(
+            kms_path, dsn::utils::FileDataType::kNonSensitive, &kms_info);
+        if (ec != dsn::ERR_OK) {
+            LOG_WARNING("It's normal to encounter a temporary inability to 
open the kms-info file "
+                        "during the first process launch. error_code = {}",
+                        ec);
+        }
+        // Upon the first launch, the encryption key should be empty. The 
process will then retrieve
+        // EEK, IV, and KV from KMS.
+        // After the first launch, the encryption key, obtained from the 
kms-info file, should not
+        // be empty. The process will then acquire the DEK from KMS.
+        if (!utils::filesystem::path_exists(kms_path)) {
+            CHECK_OK(key_provider->GenerateEncryptionKey(&kms_info), "Generate 
encryption key from kms failed");

Review Comment:
   GenerateEncryptionKey when load_rjobj_from_file return ERR_PATH_NOT_FOUND 
only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to