acelyc111 commented on code in PR #1388:
URL: 
https://github.com/apache/incubator-pegasus/pull/1388#discussion_r1136497157


##########
src/runtime/ranger/ranger_resource_policy_manager.h:
##########
@@ -95,5 +134,11 @@ class ranger_resource_policy_manager
 
     FRIEND_TEST(ranger_resource_policy_manager_test, 
parse_policies_from_json_for_test);
 };
+
+// Try to get the database name of 'app_name'.
+// When using Ranger for ACL, the constraint table naming rule is
+// "{database_name}.{table_name}", use "." to split database name and table 
name.
+// Return an empty string if 'app_name' is not a valid Ranger rule table name.
+std::string get_database_name_from_app_name(const std::string &app_name);

Review Comment:
   Add a test to check it's functionality.



##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -17,19 +17,58 @@
 
 #include <ctype.h>
 #include <algorithm>
+#include <chrono>
+#include <iosfwd>
+#include <memory>
 #include <unordered_set>
 #include <utility>
 
+// Disable class-memaccess warning to facilitate compilation with gcc>7
+// https://github.com/Tencent/rapidjson/issues/1700
+#pragma GCC diagnostic push
+#if defined(__GNUC__) && __GNUC__ >= 8
+#pragma GCC diagnostic ignored "-Wclass-memaccess"
+#endif
+#include <rapidjson/document.h>
+
+#pragma GCC diagnostic pop
+
+#include "common/replica_envs.h"
+#include "common/replication.codes.h"
+#include "common/replication_common.h"
+#include "dsn.layer2_types.h"
+#include "fmt/core.h"
 #include "meta/meta_options.h"
 #include "meta/meta_service.h"
+#include "meta/meta_state_service.h"
+#include "meta/server_state.h"
+#include "meta_admin_types.h"
 #include "ranger_resource_policy_manager.h"
+#include "rapidjson/allocators.h"
 #include "runtime/ranger/ranger_resource_policy.h"
+#include "runtime/task/async_calls.h"
+#include "runtime/task/task.h"
 #include "runtime/task/task_code.h"
+#include "utils/blob.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
+#include "utils/process_utils.h"
+#include "utils/smart_pointers.h"
+#include "utils/strings.h"
 
 namespace dsn {
 namespace ranger {
 
+DSN_DEFINE_uint32(security,
+                  update_ranger_policy_interval_sec,
+                  5,
+                  "The interval seconds of meta "
+                  "server to pull the latest "
+                  "access control policy from "
+                  "Ranger service.");
+DSN_DEFINE_string(ranger, ranger_service_url, "", "Apache Ranger service 
url.");
+DSN_DEFINE_string(ranger, ranger_service_name, "", "use policy name.");

Review Comment:
   The description is not very clear, could you improve it please?



##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void 
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code 
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // for the newly created table, its app envs must be empty. This needs 
to be executed
+        // periodically to update the table's app envs, regardless of whether 
the Ranger policy is
+        // updated or not.
+        CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies 
to app envs failed.");
+        LOG_DEBUG("Sync policies to app envs succeeded.");
+        return dsn::ERR_OK;
+    }
+    ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+    start_to_dump_and_sync_policies();
+
+    return dsn::ERR_OK;
+}
+
+dsn::error_code 
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+    std::string *ranger_policies) const
+{
+    std::string cmd =
+        fmt::format("curl {}/{}", FLAGS_ranger_service_url, 
FLAGS_ranger_service_name);
+    std::stringstream resp;
+    if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+        return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+    }
+
+    *ranger_policies = resp.str();
+    return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const 
std::string &data)
+{
+    // The Ranger policy pulled from Ranger service demo.
+    /*
+    {
+        "serviceName": "PEGASUS1",
+        "serviceId": 1069,
+        "policyVersion": 60,
+        "policyUpdateTime": 1673254471000,
+        "policies": [{
+            "id": 5334,
+            "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+            "isEnabled": true,
+            "version": 13,
+            "service": "PEGASUS1",
+            "name": "all - database",
+            "policyType": 0,
+            "policyPriority": 0,
+            "description": "Policy for all - database",
+            "isAuditEnabled": true,
+            "resources": {
+                "database": {
+                    "values": ["PEGASUS1"],
+                    "isExcludes": false,
+                    "isRecursive": true
+                }
+            },
+            "policyItems": [{
+                "accesses": [{
+                    "type": "create",
+                    "isAllowed": true
+                }, {
+                    "type": "drop",
+                    "isAllowed": true
+                }, {
+                    "type": "control",
+                    "isAllowed": true
+                }, {
+                    "type": "metadata",
+                    "isAllowed": true
+                }, {
+                    "type": "list",
+                    "isAllowed": true
+                }],
+                "users": ["PEGASUS1"],
+                "groups": [],
+                "roles": [],
+                "conditions": [],
+                "delegateAdmin": true
+            }],
+            "denyPolicyItems": [],
+            "allowExceptions": [],
+            "denyExceptions": [],
+            "dataMaskPolicyItems": [],
+            "rowFilterPolicyItems": [],
+            "serviceType": "pegasus",
+            "options": {},
+            "validitySchedules": [],
+            "policyLabels": [],
+            "zoneName": "",
+            "isDenyAllElse": false
+        }],
+        "auditMode": "audit-default",
+        "serviceConfig": {}
+    }
+    */
+    rapidjson::Document doc;
+    doc.Parse(data.c_str());
+
+    // Check if it is needed to update policies.
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+    int remote_policy_version = doc["policyVersion"].GetInt();
+    if (_local_policy_version == remote_policy_version) {
+        LOG_DEBUG("Ranger policy version: {}, no need to update.", 
_local_policy_version);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version > remote_policy_version) {
+        LOG_WARNING("Local Ranger policy version ({}) is larger than remote 
version ({}), please "
+                    "check Ranger services ({}).",
+                    _local_policy_version,
+                    remote_policy_version,
+                    FLAGS_ranger_service_name);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version == 0) {
+        _local_policy_version = remote_policy_version;
+    }
+
+    // Update policies.
+    _all_resource_policies.clear();
+
+    // TODO(wanghao): it's optional
+    // Provide a DATABASE default policy for legacy tables.
+    // ranger_resource_policy default_database_policy;
+    // 
ranger_resource_policy::create_default_database_policy(default_database_policy);
+    // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = 
{default_database_policy};
+
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+    const rapidjson::Value &policies = doc["policies"];
+    RETURN_ERR_IF_NOT_ARRAY(policies);
+    for (const auto &policy : policies.GetArray()) {
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+        // 1. Check if the policy is enabled or not.
+        if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+            continue;
+        }
+
+        // 2. Parse resource type.
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+        std::map<std::string, std::unordered_set<std::string>> 
values_of_resource_type;
+        for (const auto &resource : policy["resources"].GetObject()) {
+            RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+            RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+            std::unordered_set<std::string> values;
+            for (const auto &v : (resource.value)["values"].GetArray()) {
+                values.insert(v.GetString());
+            }
+            
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), 
values));
+        }
+
+        // 3. Construct ACL policy.
+        ranger_resource_policy resource_policy;
+        CONTINUE_IF_MISSING_MEMBER(policy, "name");
+        resource_policy.name = policy["name"].GetString();
+
+        resource_type rt = resource_type::kUnknown;
+        do {
+            // TODO(wanghao): refactor the following code
+            // parse Ranger policies json string into 
`values_of_resource_type`, distinguish
+            // resource types by `values_of_resource_type.size()`
+            if (values_of_resource_type.size() == 1) {
+                auto iter = values_of_resource_type.find("global");
+                if (iter != values_of_resource_type.end()) {
+                    rt = resource_type::kGlobal;
+                    break;
+                }
+                iter = values_of_resource_type.find("database");
+                if (iter != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter->second;
+                    rt = resource_type::kDatabase;
+                    break;
+                }
+            } else if (values_of_resource_type.size() == 2) {
+                auto iter1 = values_of_resource_type.find("database");
+                auto iter2 = values_of_resource_type.find("table");
+                if (iter1 != values_of_resource_type.end() &&
+                    iter2 != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter1->second;
+                    resource_policy.table_names = iter2->second;
+                    rt = resource_type::kDatabaseTable;
+                    break;
+                }
+            }
+            return dsn::ERR_RANGER_PARSE_ACL;
+        } while (false);
+
+        parse_policies_from_json(policy["policyItems"], 
resource_policy.policies.allow_policies);
+        parse_policies_from_json(policy["denyPolicyItems"], 
resource_policy.policies.deny_policies);
+        parse_policies_from_json(policy["allowExceptions"],
+                                 
resource_policy.policies.allow_policies_exclude);
+        parse_policies_from_json(policy["denyExceptions"],
+                                 
resource_policy.policies.deny_policies_exclude);
+
+        // 4. Add the ACL policy.
+        auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+                                                  
resource_policies({resource_policy}));
+        if (!ret.second) {
+            ret.first->second.emplace_back(resource_policy);
+        }
+    }
+
+    return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+    dsn::task_ptr sync_task = dsn::tasking::create_task(
+        LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() { 
dump_and_sync_policies(); });
+    _meta_svc->get_remote_storage()->create_node(
+        _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this, 
sync_task](dsn::error_code err) {
+            if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+                LOG_DEBUG("Create Ranger policy meta root succeed.");
+                sync_task->enqueue();
+                return;
+            }
+            CHECK_EQ(err, dsn::ERR_TIMEOUT);
+            LOG_ERROR("Create Ranger policy meta root timeout, try it later.");
+            dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+                                  &_tracker,
+                                  [this]() { 
start_to_dump_and_sync_policies(); },
+                                  0,
+                                  load_ranger_policy_retry_delay_ms);
+        });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+    dump_policies_to_remote_storage();
+    LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+    update_cached_policies();
+    LOG_DEBUG("Update using resources policies succeed.");
+
+    CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to 
app envs failed.");
+    LOG_DEBUG("Sync policies to app envs succeeded.");
+}
+
+void ranger_resource_policy_manager::dump_policies_to_remote_storage()
+{
+    dsn::blob value = 
json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
+    _meta_svc->get_remote_storage()->set_data(
+        _ranger_policy_meta_root, value, LPC_CM_GET_RANGER_POLICY, 
[this](dsn::error_code e) {
+            if (e == dsn::ERR_OK) {
+                LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+                return;
+            }
+            CHECK_EQ_MSG(e, dsn::ERR_TIMEOUT, "Dump Ranger policies to remote 
storage failed.");
+            LOG_ERROR("Dump Ranger policies to remote storage timeout, retry 
later.");
+            dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+                                  &_tracker,
+                                  [this]() { 
dump_policies_to_remote_storage(); },
+                                  0,
+                                  load_ranger_policy_retry_delay_ms);
+        });
+}
+
+void ranger_resource_policy_manager::update_cached_policies()
+{
+    {
+        utils::auto_write_lock l(_global_policies_lock);
+        
_global_policies_cache.swap(_all_resource_policies[enum_to_string(resource_type::kGlobal)]);
+        // TODO(wanghao): provide a query method
+    }
+    {
+        utils::auto_write_lock l(_database_policies_lock);
+        _database_policies_cache.swap(
+            _all_resource_policies[enum_to_string(resource_type::kDatabase)]);
+        // TODO(wanghao): provide a query method
+    }
+}
+
+dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
+{
+    const auto &table_policies =
+        
_all_resource_policies.find(enum_to_string(resource_type::kDatabaseTable));
+    if (table_policies == _all_resource_policies.end()) {
+        LOG_INFO("DATABASE_TABLE level policy is empty, skip to sync app 
envs.");
+        return dsn::ERR_OK;
+    }
+
+    dsn::replication::configuration_list_apps_response list_resp;
+    dsn::replication::configuration_list_apps_request list_req;
+    list_req.status = dsn::app_status::AS_AVAILABLE;
+    _meta_svc->get_server_state()->list_apps(list_req, list_resp);
+    ERR_LOG_AND_RETURN_NOT_OK(list_resp.err, "list_apps failed.");
+    for (const auto &app : list_resp.infos) {
+        std::string database_name = 
get_database_name_from_app_name(app.app_name);
+        std::string table_name;
+        if (database_name.empty()) {
+            database_name = "*";
+            table_name = app.app_name;
+        } else {
+            table_name = app.app_name.substr(database_name.size());
+        }
+
+        auto req = 
dsn::make_unique<dsn::replication::configuration_update_app_env_request>();
+        req->__set_app_name(app.app_name);
+        req->__set_keys(
+            
{dsn::replication::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES});
+        bool has_match_policy = false;

Review Comment:
   rename to `is_policy_matched` ?



##########
src/runtime/ranger/ranger_resource_policy_manager.h:
##########
@@ -73,20 +76,56 @@ class ranger_resource_policy_manager
     static void parse_policies_from_json(const rapidjson::Value &data,
                                          std::vector<policy_item> &policies);
 
+    // Update policies from Ranger service.
+    dsn::error_code update_policies_from_ranger_service();
+
+    // Pull policies in JSON format from Ranger service.
+    dsn::error_code pull_policies_from_ranger_service(std::string 
*ranger_policies) const;

Review Comment:
   I guess most of the new added functions can be declared as private, right?



##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void 
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code 
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // for the newly created table, its app envs must be empty. This needs 
to be executed
+        // periodically to update the table's app envs, regardless of whether 
the Ranger policy is
+        // updated or not.
+        CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies 
to app envs failed.");

Review Comment:
   The server will crash id sync failed, is it too restrict to assert the 
result must be OK?



##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void 
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code 
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // for the newly created table, its app envs must be empty. This needs 
to be executed
+        // periodically to update the table's app envs, regardless of whether 
the Ranger policy is
+        // updated or not.
+        CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies 
to app envs failed.");
+        LOG_DEBUG("Sync policies to app envs succeeded.");
+        return dsn::ERR_OK;
+    }
+    ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+    start_to_dump_and_sync_policies();
+
+    return dsn::ERR_OK;
+}
+
+dsn::error_code 
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+    std::string *ranger_policies) const
+{
+    std::string cmd =
+        fmt::format("curl {}/{}", FLAGS_ranger_service_url, 
FLAGS_ranger_service_name);
+    std::stringstream resp;
+    if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+        return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+    }
+
+    *ranger_policies = resp.str();
+    return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const 
std::string &data)
+{
+    // The Ranger policy pulled from Ranger service demo.
+    /*
+    {
+        "serviceName": "PEGASUS1",
+        "serviceId": 1069,
+        "policyVersion": 60,
+        "policyUpdateTime": 1673254471000,
+        "policies": [{
+            "id": 5334,
+            "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+            "isEnabled": true,
+            "version": 13,
+            "service": "PEGASUS1",
+            "name": "all - database",
+            "policyType": 0,
+            "policyPriority": 0,
+            "description": "Policy for all - database",
+            "isAuditEnabled": true,
+            "resources": {
+                "database": {
+                    "values": ["PEGASUS1"],
+                    "isExcludes": false,
+                    "isRecursive": true
+                }
+            },
+            "policyItems": [{
+                "accesses": [{
+                    "type": "create",
+                    "isAllowed": true
+                }, {
+                    "type": "drop",
+                    "isAllowed": true
+                }, {
+                    "type": "control",
+                    "isAllowed": true
+                }, {
+                    "type": "metadata",
+                    "isAllowed": true
+                }, {
+                    "type": "list",
+                    "isAllowed": true
+                }],
+                "users": ["PEGASUS1"],
+                "groups": [],
+                "roles": [],
+                "conditions": [],
+                "delegateAdmin": true
+            }],
+            "denyPolicyItems": [],
+            "allowExceptions": [],
+            "denyExceptions": [],
+            "dataMaskPolicyItems": [],
+            "rowFilterPolicyItems": [],
+            "serviceType": "pegasus",
+            "options": {},
+            "validitySchedules": [],
+            "policyLabels": [],
+            "zoneName": "",
+            "isDenyAllElse": false
+        }],
+        "auditMode": "audit-default",
+        "serviceConfig": {}
+    }
+    */
+    rapidjson::Document doc;
+    doc.Parse(data.c_str());
+
+    // Check if it is needed to update policies.
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+    int remote_policy_version = doc["policyVersion"].GetInt();
+    if (_local_policy_version == remote_policy_version) {
+        LOG_DEBUG("Ranger policy version: {}, no need to update.", 
_local_policy_version);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version > remote_policy_version) {
+        LOG_WARNING("Local Ranger policy version ({}) is larger than remote 
version ({}), please "
+                    "check Ranger services ({}).",
+                    _local_policy_version,
+                    remote_policy_version,
+                    FLAGS_ranger_service_name);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version == 0) {
+        _local_policy_version = remote_policy_version;
+    }
+
+    // Update policies.
+    _all_resource_policies.clear();
+
+    // TODO(wanghao): it's optional
+    // Provide a DATABASE default policy for legacy tables.
+    // ranger_resource_policy default_database_policy;
+    // 
ranger_resource_policy::create_default_database_policy(default_database_policy);
+    // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = 
{default_database_policy};
+
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+    const rapidjson::Value &policies = doc["policies"];
+    RETURN_ERR_IF_NOT_ARRAY(policies);
+    for (const auto &policy : policies.GetArray()) {
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+        // 1. Check if the policy is enabled or not.
+        if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+            continue;
+        }
+
+        // 2. Parse resource type.
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+        std::map<std::string, std::unordered_set<std::string>> 
values_of_resource_type;
+        for (const auto &resource : policy["resources"].GetObject()) {
+            RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+            RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+            std::unordered_set<std::string> values;
+            for (const auto &v : (resource.value)["values"].GetArray()) {
+                values.insert(v.GetString());
+            }
+            
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), 
values));
+        }
+
+        // 3. Construct ACL policy.
+        ranger_resource_policy resource_policy;
+        CONTINUE_IF_MISSING_MEMBER(policy, "name");
+        resource_policy.name = policy["name"].GetString();
+
+        resource_type rt = resource_type::kUnknown;
+        do {
+            // TODO(wanghao): refactor the following code
+            // parse Ranger policies json string into 
`values_of_resource_type`, distinguish
+            // resource types by `values_of_resource_type.size()`
+            if (values_of_resource_type.size() == 1) {
+                auto iter = values_of_resource_type.find("global");
+                if (iter != values_of_resource_type.end()) {
+                    rt = resource_type::kGlobal;
+                    break;
+                }
+                iter = values_of_resource_type.find("database");
+                if (iter != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter->second;
+                    rt = resource_type::kDatabase;
+                    break;
+                }
+            } else if (values_of_resource_type.size() == 2) {
+                auto iter1 = values_of_resource_type.find("database");
+                auto iter2 = values_of_resource_type.find("table");
+                if (iter1 != values_of_resource_type.end() &&
+                    iter2 != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter1->second;
+                    resource_policy.table_names = iter2->second;
+                    rt = resource_type::kDatabaseTable;
+                    break;
+                }
+            }
+            return dsn::ERR_RANGER_PARSE_ACL;
+        } while (false);
+
+        parse_policies_from_json(policy["policyItems"], 
resource_policy.policies.allow_policies);
+        parse_policies_from_json(policy["denyPolicyItems"], 
resource_policy.policies.deny_policies);
+        parse_policies_from_json(policy["allowExceptions"],
+                                 
resource_policy.policies.allow_policies_exclude);
+        parse_policies_from_json(policy["denyExceptions"],
+                                 
resource_policy.policies.deny_policies_exclude);
+
+        // 4. Add the ACL policy.
+        auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+                                                  
resource_policies({resource_policy}));
+        if (!ret.second) {
+            ret.first->second.emplace_back(resource_policy);
+        }
+    }
+
+    return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+    dsn::task_ptr sync_task = dsn::tasking::create_task(
+        LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() { 
dump_and_sync_policies(); });
+    _meta_svc->get_remote_storage()->create_node(
+        _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this, 
sync_task](dsn::error_code err) {
+            if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+                LOG_DEBUG("Create Ranger policy meta root succeed.");
+                sync_task->enqueue();
+                return;
+            }
+            CHECK_EQ(err, dsn::ERR_TIMEOUT);
+            LOG_ERROR("Create Ranger policy meta root timeout, try it later.");

Review Comment:
   ```suggestion
               LOG_ERROR("Create Ranger policy meta root timeout, retry 
later.");
   ```



##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -83,9 +122,11 @@ const std::map<std::string, access_type> 
kAccessTypeMaping({{"READ", access_type
                                                             {"CONTROL", 
access_type::kControl}});
 } // anonymous namespace
 
+std::chrono::milliseconds load_ranger_policy_retry_delay_ms(10000);

Review Comment:
   How about declare it as a const variable? And rename as 
`kLoadRangerPolicyRetryDelayMs`?



##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void 
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code 
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // for the newly created table, its app envs must be empty. This needs 
to be executed
+        // periodically to update the table's app envs, regardless of whether 
the Ranger policy is
+        // updated or not.
+        CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies 
to app envs failed.");
+        LOG_DEBUG("Sync policies to app envs succeeded.");
+        return dsn::ERR_OK;
+    }
+    ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+    start_to_dump_and_sync_policies();
+
+    return dsn::ERR_OK;
+}
+
+dsn::error_code 
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+    std::string *ranger_policies) const
+{
+    std::string cmd =
+        fmt::format("curl {}/{}", FLAGS_ranger_service_url, 
FLAGS_ranger_service_name);
+    std::stringstream resp;
+    if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+        return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+    }
+
+    *ranger_policies = resp.str();
+    return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const 
std::string &data)
+{
+    // The Ranger policy pulled from Ranger service demo.
+    /*
+    {
+        "serviceName": "PEGASUS1",
+        "serviceId": 1069,
+        "policyVersion": 60,
+        "policyUpdateTime": 1673254471000,
+        "policies": [{
+            "id": 5334,
+            "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+            "isEnabled": true,
+            "version": 13,
+            "service": "PEGASUS1",
+            "name": "all - database",
+            "policyType": 0,
+            "policyPriority": 0,
+            "description": "Policy for all - database",
+            "isAuditEnabled": true,
+            "resources": {
+                "database": {
+                    "values": ["PEGASUS1"],
+                    "isExcludes": false,
+                    "isRecursive": true
+                }
+            },
+            "policyItems": [{
+                "accesses": [{
+                    "type": "create",
+                    "isAllowed": true
+                }, {
+                    "type": "drop",
+                    "isAllowed": true
+                }, {
+                    "type": "control",
+                    "isAllowed": true
+                }, {
+                    "type": "metadata",
+                    "isAllowed": true
+                }, {
+                    "type": "list",
+                    "isAllowed": true
+                }],
+                "users": ["PEGASUS1"],
+                "groups": [],
+                "roles": [],
+                "conditions": [],
+                "delegateAdmin": true
+            }],
+            "denyPolicyItems": [],
+            "allowExceptions": [],
+            "denyExceptions": [],
+            "dataMaskPolicyItems": [],
+            "rowFilterPolicyItems": [],
+            "serviceType": "pegasus",
+            "options": {},
+            "validitySchedules": [],
+            "policyLabels": [],
+            "zoneName": "",
+            "isDenyAllElse": false
+        }],
+        "auditMode": "audit-default",
+        "serviceConfig": {}
+    }
+    */
+    rapidjson::Document doc;
+    doc.Parse(data.c_str());
+
+    // Check if it is needed to update policies.
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+    int remote_policy_version = doc["policyVersion"].GetInt();
+    if (_local_policy_version == remote_policy_version) {
+        LOG_DEBUG("Ranger policy version: {}, no need to update.", 
_local_policy_version);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version > remote_policy_version) {
+        LOG_WARNING("Local Ranger policy version ({}) is larger than remote 
version ({}), please "
+                    "check Ranger services ({}).",
+                    _local_policy_version,
+                    remote_policy_version,
+                    FLAGS_ranger_service_name);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version == 0) {
+        _local_policy_version = remote_policy_version;
+    }
+
+    // Update policies.
+    _all_resource_policies.clear();
+
+    // TODO(wanghao): it's optional
+    // Provide a DATABASE default policy for legacy tables.
+    // ranger_resource_policy default_database_policy;
+    // 
ranger_resource_policy::create_default_database_policy(default_database_policy);
+    // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = 
{default_database_policy};
+
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+    const rapidjson::Value &policies = doc["policies"];
+    RETURN_ERR_IF_NOT_ARRAY(policies);
+    for (const auto &policy : policies.GetArray()) {
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+        // 1. Check if the policy is enabled or not.
+        if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+            continue;
+        }
+
+        // 2. Parse resource type.
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+        std::map<std::string, std::unordered_set<std::string>> 
values_of_resource_type;
+        for (const auto &resource : policy["resources"].GetObject()) {
+            RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+            RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+            std::unordered_set<std::string> values;
+            for (const auto &v : (resource.value)["values"].GetArray()) {
+                values.insert(v.GetString());
+            }
+            
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), 
values));
+        }
+
+        // 3. Construct ACL policy.
+        ranger_resource_policy resource_policy;
+        CONTINUE_IF_MISSING_MEMBER(policy, "name");
+        resource_policy.name = policy["name"].GetString();
+
+        resource_type rt = resource_type::kUnknown;
+        do {
+            // TODO(wanghao): refactor the following code
+            // parse Ranger policies json string into 
`values_of_resource_type`, distinguish
+            // resource types by `values_of_resource_type.size()`
+            if (values_of_resource_type.size() == 1) {
+                auto iter = values_of_resource_type.find("global");
+                if (iter != values_of_resource_type.end()) {
+                    rt = resource_type::kGlobal;
+                    break;
+                }
+                iter = values_of_resource_type.find("database");
+                if (iter != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter->second;
+                    rt = resource_type::kDatabase;
+                    break;
+                }
+            } else if (values_of_resource_type.size() == 2) {
+                auto iter1 = values_of_resource_type.find("database");
+                auto iter2 = values_of_resource_type.find("table");
+                if (iter1 != values_of_resource_type.end() &&
+                    iter2 != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter1->second;
+                    resource_policy.table_names = iter2->second;
+                    rt = resource_type::kDatabaseTable;
+                    break;
+                }
+            }
+            return dsn::ERR_RANGER_PARSE_ACL;
+        } while (false);
+
+        parse_policies_from_json(policy["policyItems"], 
resource_policy.policies.allow_policies);
+        parse_policies_from_json(policy["denyPolicyItems"], 
resource_policy.policies.deny_policies);
+        parse_policies_from_json(policy["allowExceptions"],
+                                 
resource_policy.policies.allow_policies_exclude);
+        parse_policies_from_json(policy["denyExceptions"],
+                                 
resource_policy.policies.deny_policies_exclude);
+
+        // 4. Add the ACL policy.
+        auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+                                                  
resource_policies({resource_policy}));
+        if (!ret.second) {
+            ret.first->second.emplace_back(resource_policy);
+        }
+    }
+
+    return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+    dsn::task_ptr sync_task = dsn::tasking::create_task(
+        LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() { 
dump_and_sync_policies(); });
+    _meta_svc->get_remote_storage()->create_node(
+        _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this, 
sync_task](dsn::error_code err) {

Review Comment:
   I'm a little curious why use 'GET' in `LPC_CM_GET_RANGER_POLICY`? In fact 
it's a 'put' operation?



##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void 
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code 
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // for the newly created table, its app envs must be empty. This needs 
to be executed
+        // periodically to update the table's app envs, regardless of whether 
the Ranger policy is
+        // updated or not.
+        CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies 
to app envs failed.");
+        LOG_DEBUG("Sync policies to app envs succeeded.");
+        return dsn::ERR_OK;
+    }
+    ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+    start_to_dump_and_sync_policies();
+
+    return dsn::ERR_OK;
+}
+
+dsn::error_code 
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+    std::string *ranger_policies) const
+{
+    std::string cmd =
+        fmt::format("curl {}/{}", FLAGS_ranger_service_url, 
FLAGS_ranger_service_name);
+    std::stringstream resp;
+    if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+        return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+    }
+
+    *ranger_policies = resp.str();
+    return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const 
std::string &data)
+{
+    // The Ranger policy pulled from Ranger service demo.
+    /*
+    {
+        "serviceName": "PEGASUS1",
+        "serviceId": 1069,
+        "policyVersion": 60,
+        "policyUpdateTime": 1673254471000,
+        "policies": [{
+            "id": 5334,
+            "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+            "isEnabled": true,
+            "version": 13,
+            "service": "PEGASUS1",
+            "name": "all - database",
+            "policyType": 0,
+            "policyPriority": 0,
+            "description": "Policy for all - database",
+            "isAuditEnabled": true,
+            "resources": {
+                "database": {
+                    "values": ["PEGASUS1"],
+                    "isExcludes": false,
+                    "isRecursive": true
+                }
+            },
+            "policyItems": [{
+                "accesses": [{
+                    "type": "create",
+                    "isAllowed": true
+                }, {
+                    "type": "drop",
+                    "isAllowed": true
+                }, {
+                    "type": "control",
+                    "isAllowed": true
+                }, {
+                    "type": "metadata",
+                    "isAllowed": true
+                }, {
+                    "type": "list",
+                    "isAllowed": true
+                }],
+                "users": ["PEGASUS1"],
+                "groups": [],
+                "roles": [],
+                "conditions": [],
+                "delegateAdmin": true
+            }],
+            "denyPolicyItems": [],
+            "allowExceptions": [],
+            "denyExceptions": [],
+            "dataMaskPolicyItems": [],
+            "rowFilterPolicyItems": [],
+            "serviceType": "pegasus",
+            "options": {},
+            "validitySchedules": [],
+            "policyLabels": [],
+            "zoneName": "",
+            "isDenyAllElse": false
+        }],
+        "auditMode": "audit-default",
+        "serviceConfig": {}
+    }
+    */
+    rapidjson::Document doc;
+    doc.Parse(data.c_str());
+
+    // Check if it is needed to update policies.
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+    int remote_policy_version = doc["policyVersion"].GetInt();
+    if (_local_policy_version == remote_policy_version) {
+        LOG_DEBUG("Ranger policy version: {}, no need to update.", 
_local_policy_version);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version > remote_policy_version) {
+        LOG_WARNING("Local Ranger policy version ({}) is larger than remote 
version ({}), please "
+                    "check Ranger services ({}).",
+                    _local_policy_version,
+                    remote_policy_version,
+                    FLAGS_ranger_service_name);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version == 0) {
+        _local_policy_version = remote_policy_version;
+    }
+
+    // Update policies.
+    _all_resource_policies.clear();
+
+    // TODO(wanghao): it's optional
+    // Provide a DATABASE default policy for legacy tables.
+    // ranger_resource_policy default_database_policy;
+    // 
ranger_resource_policy::create_default_database_policy(default_database_policy);
+    // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = 
{default_database_policy};
+
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+    const rapidjson::Value &policies = doc["policies"];
+    RETURN_ERR_IF_NOT_ARRAY(policies);
+    for (const auto &policy : policies.GetArray()) {
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+        // 1. Check if the policy is enabled or not.
+        if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+            continue;
+        }
+
+        // 2. Parse resource type.
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+        std::map<std::string, std::unordered_set<std::string>> 
values_of_resource_type;
+        for (const auto &resource : policy["resources"].GetObject()) {
+            RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+            RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+            std::unordered_set<std::string> values;
+            for (const auto &v : (resource.value)["values"].GetArray()) {
+                values.insert(v.GetString());
+            }
+            
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), 
values));
+        }
+
+        // 3. Construct ACL policy.
+        ranger_resource_policy resource_policy;
+        CONTINUE_IF_MISSING_MEMBER(policy, "name");
+        resource_policy.name = policy["name"].GetString();
+
+        resource_type rt = resource_type::kUnknown;
+        do {
+            // TODO(wanghao): refactor the following code
+            // parse Ranger policies json string into 
`values_of_resource_type`, distinguish
+            // resource types by `values_of_resource_type.size()`
+            if (values_of_resource_type.size() == 1) {
+                auto iter = values_of_resource_type.find("global");
+                if (iter != values_of_resource_type.end()) {
+                    rt = resource_type::kGlobal;
+                    break;
+                }
+                iter = values_of_resource_type.find("database");
+                if (iter != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter->second;
+                    rt = resource_type::kDatabase;
+                    break;
+                }
+            } else if (values_of_resource_type.size() == 2) {
+                auto iter1 = values_of_resource_type.find("database");
+                auto iter2 = values_of_resource_type.find("table");
+                if (iter1 != values_of_resource_type.end() &&
+                    iter2 != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter1->second;
+                    resource_policy.table_names = iter2->second;
+                    rt = resource_type::kDatabaseTable;
+                    break;
+                }
+            }
+            return dsn::ERR_RANGER_PARSE_ACL;
+        } while (false);
+
+        parse_policies_from_json(policy["policyItems"], 
resource_policy.policies.allow_policies);
+        parse_policies_from_json(policy["denyPolicyItems"], 
resource_policy.policies.deny_policies);
+        parse_policies_from_json(policy["allowExceptions"],
+                                 
resource_policy.policies.allow_policies_exclude);
+        parse_policies_from_json(policy["denyExceptions"],
+                                 
resource_policy.policies.deny_policies_exclude);
+
+        // 4. Add the ACL policy.
+        auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+                                                  
resource_policies({resource_policy}));
+        if (!ret.second) {
+            ret.first->second.emplace_back(resource_policy);
+        }
+    }
+
+    return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+    dsn::task_ptr sync_task = dsn::tasking::create_task(
+        LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() { 
dump_and_sync_policies(); });
+    _meta_svc->get_remote_storage()->create_node(
+        _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this, 
sync_task](dsn::error_code err) {
+            if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+                LOG_DEBUG("Create Ranger policy meta root succeed.");
+                sync_task->enqueue();
+                return;
+            }
+            CHECK_EQ(err, dsn::ERR_TIMEOUT);
+            LOG_ERROR("Create Ranger policy meta root timeout, try it later.");
+            dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+                                  &_tracker,
+                                  [this]() { 
start_to_dump_and_sync_policies(); },
+                                  0,
+                                  load_ranger_policy_retry_delay_ms);
+        });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+    dump_policies_to_remote_storage();
+    LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+    update_cached_policies();
+    LOG_DEBUG("Update using resources policies succeed.");
+
+    CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to 
app envs failed.");
+    LOG_DEBUG("Sync policies to app envs succeeded.");
+}
+
+void ranger_resource_policy_manager::dump_policies_to_remote_storage()
+{
+    dsn::blob value = 
json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
+    _meta_svc->get_remote_storage()->set_data(
+        _ranger_policy_meta_root, value, LPC_CM_GET_RANGER_POLICY, 
[this](dsn::error_code e) {
+            if (e == dsn::ERR_OK) {
+                LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+                return;
+            }
+            CHECK_EQ_MSG(e, dsn::ERR_TIMEOUT, "Dump Ranger policies to remote 
storage failed.");
+            LOG_ERROR("Dump Ranger policies to remote storage timeout, retry 
later.");
+            dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+                                  &_tracker,
+                                  [this]() { 
dump_policies_to_remote_storage(); },
+                                  0,
+                                  load_ranger_policy_retry_delay_ms);
+        });
+}
+
+void ranger_resource_policy_manager::update_cached_policies()
+{
+    {
+        utils::auto_write_lock l(_global_policies_lock);
+        
_global_policies_cache.swap(_all_resource_policies[enum_to_string(resource_type::kGlobal)]);
+        // TODO(wanghao): provide a query method
+    }
+    {
+        utils::auto_write_lock l(_database_policies_lock);
+        _database_policies_cache.swap(
+            _all_resource_policies[enum_to_string(resource_type::kDatabase)]);
+        // TODO(wanghao): provide a query method
+    }
+}
+
+dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
+{
+    const auto &table_policies =
+        
_all_resource_policies.find(enum_to_string(resource_type::kDatabaseTable));
+    if (table_policies == _all_resource_policies.end()) {
+        LOG_INFO("DATABASE_TABLE level policy is empty, skip to sync app 
envs.");
+        return dsn::ERR_OK;
+    }
+
+    dsn::replication::configuration_list_apps_response list_resp;
+    dsn::replication::configuration_list_apps_request list_req;
+    list_req.status = dsn::app_status::AS_AVAILABLE;
+    _meta_svc->get_server_state()->list_apps(list_req, list_resp);
+    ERR_LOG_AND_RETURN_NOT_OK(list_resp.err, "list_apps failed.");
+    for (const auto &app : list_resp.infos) {
+        std::string database_name = 
get_database_name_from_app_name(app.app_name);
+        std::string table_name;
+        if (database_name.empty()) {
+            database_name = "*";
+            table_name = app.app_name;
+        } else {
+            table_name = app.app_name.substr(database_name.size());
+        }

Review Comment:
   Add a new function like `get_table_name_from_app_name`?



##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +212,362 @@ void 
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code 
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // for the newly created table, its app envs must be empty. This needs 
to be executed
+        // periodically to update the table's app envs, regardless of whether 
the Ranger policy is
+        // updated or not.
+        CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies 
to app envs failed.");
+        LOG_DEBUG("Sync policies to app envs succeeded.");
+        return dsn::ERR_OK;
+    }
+    ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+    start_to_dump_and_sync_policies();
+
+    return dsn::ERR_OK;
+}
+
+dsn::error_code 
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+    std::string *ranger_policies) const
+{
+    std::string cmd =
+        fmt::format("curl {}/{}", FLAGS_ranger_service_url, 
FLAGS_ranger_service_name);
+    std::stringstream resp;
+    if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+        return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+    }
+
+    *ranger_policies = resp.str();
+    return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const 
std::string &data)
+{
+    // The Ranger policy pulled from Ranger service demo.
+    /*
+    {
+        "serviceName": "PEGASUS1",
+        "serviceId": 1069,
+        "policyVersion": 60,
+        "policyUpdateTime": 1673254471000,
+        "policies": [{
+            "id": 5334,
+            "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+            "isEnabled": true,
+            "version": 13,
+            "service": "PEGASUS1",
+            "name": "all - database",
+            "policyType": 0,
+            "policyPriority": 0,
+            "description": "Policy for all - database",
+            "isAuditEnabled": true,
+            "resources": {
+                "database": {
+                    "values": ["PEGASUS1"],
+                    "isExcludes": false,
+                    "isRecursive": true
+                }
+            },
+            "policyItems": [{
+                "accesses": [{
+                    "type": "create",
+                    "isAllowed": true
+                }, {
+                    "type": "drop",
+                    "isAllowed": true
+                }, {
+                    "type": "control",
+                    "isAllowed": true
+                }, {
+                    "type": "metadata",
+                    "isAllowed": true
+                }, {
+                    "type": "list",
+                    "isAllowed": true
+                }],
+                "users": ["PEGASUS1"],
+                "groups": [],
+                "roles": [],
+                "conditions": [],
+                "delegateAdmin": true
+            }],
+            "denyPolicyItems": [],
+            "allowExceptions": [],
+            "denyExceptions": [],
+            "dataMaskPolicyItems": [],
+            "rowFilterPolicyItems": [],
+            "serviceType": "pegasus",
+            "options": {},
+            "validitySchedules": [],
+            "policyLabels": [],
+            "zoneName": "",
+            "isDenyAllElse": false
+        }],
+        "auditMode": "audit-default",
+        "serviceConfig": {}
+    }
+    */
+    rapidjson::Document doc;
+    doc.Parse(data.c_str());
+
+    // Check if it is needed to update policies.
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+    int remote_policy_version = doc["policyVersion"].GetInt();
+    if (_local_policy_version == remote_policy_version) {
+        LOG_DEBUG("Ranger policy version: {}, no need to update.", 
_local_policy_version);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version > remote_policy_version) {
+        LOG_WARNING("Local Ranger policy version ({}) is larger than remote 
version ({}), please "
+                    "check Ranger services ({}).",
+                    _local_policy_version,
+                    remote_policy_version,
+                    FLAGS_ranger_service_name);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version == 0) {
+        _local_policy_version = remote_policy_version;
+    }
+
+    // Update policies.
+    _all_resource_policies.clear();
+
+    // TODO(wanghao): it's optional
+    // Provide a DATABASE default policy for legacy tables.
+    // ranger_resource_policy default_database_policy;
+    // 
ranger_resource_policy::create_default_database_policy(default_database_policy);
+    // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = 
{default_database_policy};
+
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+    const rapidjson::Value &policies = doc["policies"];
+    RETURN_ERR_IF_NOT_ARRAY(policies);
+    for (const auto &policy : policies.GetArray()) {
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+        // 1. Check if the policy is enabled or not.
+        if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+            continue;
+        }
+
+        // 2. Parse resource type.
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+        std::map<std::string, std::unordered_set<std::string>> 
values_of_resource_type;
+        for (const auto &resource : policy["resources"].GetObject()) {
+            RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+            RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+            std::unordered_set<std::string> values;
+            for (const auto &v : (resource.value)["values"].GetArray()) {
+                values.insert(v.GetString());
+            }
+            
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), 
values));
+        }
+
+        // 3. Construct ACL policy.
+        ranger_resource_policy resource_policy;
+        CONTINUE_IF_MISSING_MEMBER(policy, "name");
+        resource_policy.name = policy["name"].GetString();
+
+        resource_type rt = resource_type::kUnknown;
+        do {
+            // TODO(wanghao): refactor the following code
+            // parse Ranger policies json string into 
`values_of_resource_type`, distinguish
+            // resource types by `values_of_resource_type.size()`
+            if (values_of_resource_type.size() == 1) {
+                auto iter = values_of_resource_type.find("global");
+                if (iter != values_of_resource_type.end()) {
+                    rt = resource_type::kGlobal;
+                    break;
+                }
+                iter = values_of_resource_type.find("database");
+                if (iter != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter->second;
+                    rt = resource_type::kDatabase;
+                    break;
+                }
+            } else if (values_of_resource_type.size() == 2) {
+                auto iter1 = values_of_resource_type.find("database");
+                auto iter2 = values_of_resource_type.find("table");
+                if (iter1 != values_of_resource_type.end() &&
+                    iter2 != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter1->second;
+                    resource_policy.table_names = iter2->second;
+                    rt = resource_type::kDatabaseTable;
+                    break;
+                }
+            }
+            return dsn::ERR_RANGER_PARSE_ACL;
+        } while (false);
+
+        parse_policies_from_json(policy["policyItems"], 
resource_policy.policies.allow_policies);
+        parse_policies_from_json(policy["denyPolicyItems"], 
resource_policy.policies.deny_policies);
+        parse_policies_from_json(policy["allowExceptions"],
+                                 
resource_policy.policies.allow_policies_exclude);
+        parse_policies_from_json(policy["denyExceptions"],
+                                 
resource_policy.policies.deny_policies_exclude);
+
+        // 4. Add the ACL policy.
+        auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+                                                  
resource_policies({resource_policy}));
+        if (!ret.second) {
+            ret.first->second.emplace_back(resource_policy);
+        }
+    }
+
+    return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+    dsn::task_ptr sync_task = dsn::tasking::create_task(
+        LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() { 
dump_and_sync_policies(); });
+    _meta_svc->get_remote_storage()->create_node(
+        _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this, 
sync_task](dsn::error_code err) {
+            if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+                LOG_DEBUG("Create Ranger policy meta root succeed.");
+                sync_task->enqueue();
+                return;
+            }
+            CHECK_EQ(err, dsn::ERR_TIMEOUT);
+            LOG_ERROR("Create Ranger policy meta root timeout, try it later.");
+            dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
+                                  &_tracker,
+                                  [this]() { 
start_to_dump_and_sync_policies(); },
+                                  0,
+                                  load_ranger_policy_retry_delay_ms);
+        });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+    dump_policies_to_remote_storage();
+    LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+    update_cached_policies();
+    LOG_DEBUG("Update using resources policies succeed.");
+
+    CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to 
app envs failed.");

Review Comment:
   Same. The server will crash id sync failed, is it too restrict to assert the 
result must be OK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to