acelyc111 commented on code in PR #1388:
URL: 
https://github.com/apache/incubator-pegasus/pull/1388#discussion_r1138514791


##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +215,368 @@ void 
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code 
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // for the newly created table, its app envs must be empty. This needs 
to be executed

Review Comment:
   ```suggestion
           // For the newly created table, its app envs must be empty. This 
needs to be executed
   ```



##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +215,368 @@ void 
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code 
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // for the newly created table, its app envs must be empty. This needs 
to be executed
+        // periodically to update the table's app envs, regardless of whether 
the Ranger policy is
+        // updated or not.
+        ERR_LOG_AND_RETURN_NOT_OK(sync_policies_to_app_envs(), "Sync policies 
to app envs failed.");
+        return dsn::ERR_OK;
+    }
+    ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+    start_to_dump_and_sync_policies();
+
+    return dsn::ERR_OK;
+}
+
+dsn::error_code 
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+    std::string *ranger_policies) const
+{
+    std::string cmd =
+        fmt::format("curl {}/{}", FLAGS_ranger_service_url, 
FLAGS_ranger_service_name);
+    std::stringstream resp;
+    if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+        return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+    }
+
+    *ranger_policies = resp.str();
+    return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const 
std::string &data)
+{
+    // The Ranger policy pulled from Ranger service demo.
+    /*
+    {
+        "serviceName": "PEGASUS1",
+        "serviceId": 1069,
+        "policyVersion": 60,
+        "policyUpdateTime": 1673254471000,
+        "policies": [{
+            "id": 5334,
+            "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+            "isEnabled": true,
+            "version": 13,
+            "service": "PEGASUS1",
+            "name": "all - database",
+            "policyType": 0,
+            "policyPriority": 0,
+            "description": "Policy for all - database",
+            "isAuditEnabled": true,
+            "resources": {
+                "database": {
+                    "values": ["PEGASUS1"],
+                    "isExcludes": false,
+                    "isRecursive": true
+                }
+            },
+            "policyItems": [{
+                "accesses": [{
+                    "type": "create",
+                    "isAllowed": true
+                }, {
+                    "type": "drop",
+                    "isAllowed": true
+                }, {
+                    "type": "control",
+                    "isAllowed": true
+                }, {
+                    "type": "metadata",
+                    "isAllowed": true
+                }, {
+                    "type": "list",
+                    "isAllowed": true
+                }],
+                "users": ["PEGASUS1"],
+                "groups": [],
+                "roles": [],
+                "conditions": [],
+                "delegateAdmin": true
+            }],
+            "denyPolicyItems": [],
+            "allowExceptions": [],
+            "denyExceptions": [],
+            "dataMaskPolicyItems": [],
+            "rowFilterPolicyItems": [],
+            "serviceType": "pegasus",
+            "options": {},
+            "validitySchedules": [],
+            "policyLabels": [],
+            "zoneName": "",
+            "isDenyAllElse": false
+        }],
+        "auditMode": "audit-default",
+        "serviceConfig": {}
+    }
+    */
+    rapidjson::Document doc;
+    doc.Parse(data.c_str());
+
+    // Check if it is needed to update policies.
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+    int remote_policy_version = doc["policyVersion"].GetInt();
+    if (_local_policy_version == remote_policy_version) {
+        LOG_DEBUG("Ranger policy version: {}, no need to update.", 
_local_policy_version);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version > remote_policy_version) {
+        LOG_WARNING("Local Ranger policy version ({}) is larger than remote 
version ({}), please "
+                    "check Ranger services ({}).",
+                    _local_policy_version,
+                    remote_policy_version,
+                    FLAGS_ranger_service_name);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    _local_policy_version = remote_policy_version;
+
+    // Update policies.
+    _all_resource_policies.clear();
+
+    // TODO(wanghao): it's optional
+    // Provide a DATABASE default policy for legacy tables.
+    // ranger_resource_policy default_database_policy;
+    // 
ranger_resource_policy::create_default_database_policy(default_database_policy);
+    // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = 
{default_database_policy};
+
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+    const rapidjson::Value &policies = doc["policies"];
+    RETURN_ERR_IF_NOT_ARRAY(policies);
+    for (const auto &policy : policies.GetArray()) {
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+        // 1. Check if the policy is enabled or not.
+        if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+            continue;
+        }
+
+        // 2. Parse resource type.
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+        std::map<std::string, std::unordered_set<std::string>> 
values_of_resource_type;
+        for (const auto &resource : policy["resources"].GetObject()) {
+            RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+            RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+            std::unordered_set<std::string> values;
+            for (const auto &v : (resource.value)["values"].GetArray()) {
+                values.insert(v.GetString());
+            }
+            
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), 
values));
+        }
+
+        // 3. Construct ACL policy.
+        ranger_resource_policy resource_policy;
+        CONTINUE_IF_MISSING_MEMBER(policy, "name");
+        resource_policy.name = policy["name"].GetString();
+
+        resource_type rt = resource_type::kUnknown;
+        do {
+            // TODO(wanghao): refactor the following code
+            // parse Ranger policies json string into 
`values_of_resource_type`, distinguish
+            // resource types by `values_of_resource_type.size()`
+            if (values_of_resource_type.size() == 1) {
+                auto iter = values_of_resource_type.find("global");
+                if (iter != values_of_resource_type.end()) {
+                    rt = resource_type::kGlobal;
+                    break;
+                }
+                iter = values_of_resource_type.find("database");
+                if (iter != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter->second;
+                    rt = resource_type::kDatabase;
+                    break;
+                }
+            } else if (values_of_resource_type.size() == 2) {
+                auto iter1 = values_of_resource_type.find("database");
+                auto iter2 = values_of_resource_type.find("table");
+                if (iter1 != values_of_resource_type.end() &&
+                    iter2 != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter1->second;
+                    resource_policy.table_names = iter2->second;
+                    rt = resource_type::kDatabaseTable;
+                    break;
+                }
+            }
+            return dsn::ERR_RANGER_PARSE_ACL;
+        } while (false);
+
+        parse_policies_from_json(policy["policyItems"], 
resource_policy.policies.allow_policies);
+        parse_policies_from_json(policy["denyPolicyItems"], 
resource_policy.policies.deny_policies);
+        parse_policies_from_json(policy["allowExceptions"],
+                                 
resource_policy.policies.allow_policies_exclude);
+        parse_policies_from_json(policy["denyExceptions"],
+                                 
resource_policy.policies.deny_policies_exclude);
+
+        // 4. Add the ACL policy.
+        auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+                                                  
resource_policies({resource_policy}));
+        if (!ret.second) {
+            ret.first->second.emplace_back(resource_policy);
+        }
+    }
+
+    return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+    dsn::task_ptr sync_task = dsn::tasking::create_task(
+        LPC_USE_RANGER_ACCESS_CONTROL, &_tracker, [this]() { 
dump_and_sync_policies(); });
+    _meta_svc->get_remote_storage()->create_node(
+        _ranger_policy_meta_root,
+        LPC_USE_RANGER_ACCESS_CONTROL,
+        [this, sync_task](dsn::error_code err) {
+            if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+                LOG_DEBUG("Create Ranger policy meta root succeed.");
+                sync_task->enqueue();
+                return;
+            }
+            CHECK_EQ(err, dsn::ERR_TIMEOUT);
+            LOG_ERROR("Create Ranger policy meta root timeout, retry later.");
+            dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
+                                  &_tracker,
+                                  [this]() { 
start_to_dump_and_sync_policies(); },
+                                  0,
+                                  kLoadRangerPolicyRetryDelayMs);
+        });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+    dump_policies_to_remote_storage();
+    LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+    update_cached_policies();
+    LOG_DEBUG("Update using resources policies succeed.");
+
+    if (dsn::ERR_OK == sync_policies_to_app_envs()) {

Review Comment:
   ```suggestion
       if (dsn::ERR_OK != sync_policies_to_app_envs()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to