WHBANG commented on code in PR #1388:
URL: 
https://github.com/apache/incubator-pegasus/pull/1388#discussion_r1138081143


##########
src/runtime/ranger/ranger_resource_policy_manager.cpp:
##########
@@ -171,5 +215,382 @@ void 
ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V
         policies.emplace_back(pi);
     }
 }
+
+dsn::error_code 
ranger_resource_policy_manager::update_policies_from_ranger_service()
+{
+    std::string ranger_policies;
+    
ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies),
+                              "Pull Ranger policies failed.");
+    LOG_DEBUG("Pull Ranger policies success.");
+
+    auto err_code = load_policies_from_json(ranger_policies);
+    if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
+        LOG_DEBUG("Skip to update local policies.");
+        // for the newly created table, its app envs must be empty. This needs 
to be executed
+        // periodically to update the table's app envs, regardless of whether 
the Ranger policy is
+        // updated or not.
+        err_code = sync_policies_to_app_envs();
+        if (err_code == dsn::ERR_OK) {
+            LOG_DEBUG("Sync policies to app envs succeeded.");
+            return dsn::ERR_OK;
+        }
+        ERR_LOG_AND_RETURN_NOT_OK(err_code, "Sync policies to app envs 
failed.");
+    }
+    ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");
+
+    start_to_dump_and_sync_policies();
+
+    return dsn::ERR_OK;
+}
+
+dsn::error_code 
ranger_resource_policy_manager::pull_policies_from_ranger_service(
+    std::string *ranger_policies) const
+{
+    std::string cmd =
+        fmt::format("curl {}/{}", FLAGS_ranger_service_url, 
FLAGS_ranger_service_name);
+    std::stringstream resp;
+    if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
+        return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
+    }
+
+    *ranger_policies = resp.str();
+    return dsn::ERR_OK;
+}
+
+dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const 
std::string &data)
+{
+    // The Ranger policy pulled from Ranger service demo.
+    /*
+    {
+        "serviceName": "PEGASUS1",
+        "serviceId": 1069,
+        "policyVersion": 60,
+        "policyUpdateTime": 1673254471000,
+        "policies": [{
+            "id": 5334,
+            "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
+            "isEnabled": true,
+            "version": 13,
+            "service": "PEGASUS1",
+            "name": "all - database",
+            "policyType": 0,
+            "policyPriority": 0,
+            "description": "Policy for all - database",
+            "isAuditEnabled": true,
+            "resources": {
+                "database": {
+                    "values": ["PEGASUS1"],
+                    "isExcludes": false,
+                    "isRecursive": true
+                }
+            },
+            "policyItems": [{
+                "accesses": [{
+                    "type": "create",
+                    "isAllowed": true
+                }, {
+                    "type": "drop",
+                    "isAllowed": true
+                }, {
+                    "type": "control",
+                    "isAllowed": true
+                }, {
+                    "type": "metadata",
+                    "isAllowed": true
+                }, {
+                    "type": "list",
+                    "isAllowed": true
+                }],
+                "users": ["PEGASUS1"],
+                "groups": [],
+                "roles": [],
+                "conditions": [],
+                "delegateAdmin": true
+            }],
+            "denyPolicyItems": [],
+            "allowExceptions": [],
+            "denyExceptions": [],
+            "dataMaskPolicyItems": [],
+            "rowFilterPolicyItems": [],
+            "serviceType": "pegasus",
+            "options": {},
+            "validitySchedules": [],
+            "policyLabels": [],
+            "zoneName": "",
+            "isDenyAllElse": false
+        }],
+        "auditMode": "audit-default",
+        "serviceConfig": {}
+    }
+    */
+    rapidjson::Document doc;
+    doc.Parse(data.c_str());
+
+    // Check if it is needed to update policies.
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
+    int remote_policy_version = doc["policyVersion"].GetInt();
+    if (_local_policy_version == remote_policy_version) {
+        LOG_DEBUG("Ranger policy version: {}, no need to update.", 
_local_policy_version);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version > remote_policy_version) {
+        LOG_WARNING("Local Ranger policy version ({}) is larger than remote 
version ({}), please "
+                    "check Ranger services ({}).",
+                    _local_policy_version,
+                    remote_policy_version,
+                    FLAGS_ranger_service_name);
+        return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
+    }
+
+    if (_local_policy_version == 0) {
+        _local_policy_version = remote_policy_version;
+    }
+
+    // Update policies.
+    _all_resource_policies.clear();
+
+    // TODO(wanghao): it's optional
+    // Provide a DATABASE default policy for legacy tables.
+    // ranger_resource_policy default_database_policy;
+    // 
ranger_resource_policy::create_default_database_policy(default_database_policy);
+    // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = 
{default_database_policy};
+
+    RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
+    const rapidjson::Value &policies = doc["policies"];
+    RETURN_ERR_IF_NOT_ARRAY(policies);
+    for (const auto &policy : policies.GetArray()) {
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
+        // 1. Check if the policy is enabled or not.
+        if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
+            continue;
+        }
+
+        // 2. Parse resource type.
+        RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
+        std::map<std::string, std::unordered_set<std::string>> 
values_of_resource_type;
+        for (const auto &resource : policy["resources"].GetObject()) {
+            RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
+            RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
+            std::unordered_set<std::string> values;
+            for (const auto &v : (resource.value)["values"].GetArray()) {
+                values.insert(v.GetString());
+            }
+            
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), 
values));
+        }
+
+        // 3. Construct ACL policy.
+        ranger_resource_policy resource_policy;
+        CONTINUE_IF_MISSING_MEMBER(policy, "name");
+        resource_policy.name = policy["name"].GetString();
+
+        resource_type rt = resource_type::kUnknown;
+        do {
+            // TODO(wanghao): refactor the following code
+            // parse Ranger policies json string into 
`values_of_resource_type`, distinguish
+            // resource types by `values_of_resource_type.size()`
+            if (values_of_resource_type.size() == 1) {
+                auto iter = values_of_resource_type.find("global");
+                if (iter != values_of_resource_type.end()) {
+                    rt = resource_type::kGlobal;
+                    break;
+                }
+                iter = values_of_resource_type.find("database");
+                if (iter != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter->second;
+                    rt = resource_type::kDatabase;
+                    break;
+                }
+            } else if (values_of_resource_type.size() == 2) {
+                auto iter1 = values_of_resource_type.find("database");
+                auto iter2 = values_of_resource_type.find("table");
+                if (iter1 != values_of_resource_type.end() &&
+                    iter2 != values_of_resource_type.end()) {
+                    resource_policy.database_names = iter1->second;
+                    resource_policy.table_names = iter2->second;
+                    rt = resource_type::kDatabaseTable;
+                    break;
+                }
+            }
+            return dsn::ERR_RANGER_PARSE_ACL;
+        } while (false);
+
+        parse_policies_from_json(policy["policyItems"], 
resource_policy.policies.allow_policies);
+        parse_policies_from_json(policy["denyPolicyItems"], 
resource_policy.policies.deny_policies);
+        parse_policies_from_json(policy["allowExceptions"],
+                                 
resource_policy.policies.allow_policies_exclude);
+        parse_policies_from_json(policy["denyExceptions"],
+                                 
resource_policy.policies.deny_policies_exclude);
+
+        // 4. Add the ACL policy.
+        auto ret = _all_resource_policies.emplace(enum_to_string(rt),
+                                                  
resource_policies({resource_policy}));
+        if (!ret.second) {
+            ret.first->second.emplace_back(resource_policy);
+        }
+    }
+
+    return dsn::ERR_OK;
+}
+
+void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
+    dsn::task_ptr sync_task = dsn::tasking::create_task(
+        LPC_USE_RANGER_ACCESS_CONTROL, &_tracker, [this]() { 
dump_and_sync_policies(); });
+    _meta_svc->get_remote_storage()->create_node(
+        _ranger_policy_meta_root,
+        LPC_USE_RANGER_ACCESS_CONTROL,
+        [this, sync_task](dsn::error_code err) {
+            if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
+                LOG_DEBUG("Create Ranger policy meta root succeed.");
+                sync_task->enqueue();
+                return;
+            }
+            CHECK_EQ(err, dsn::ERR_TIMEOUT);
+            LOG_ERROR("Create Ranger policy meta root timeout, retry later.");
+            dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
+                                  &_tracker,
+                                  [this]() { 
start_to_dump_and_sync_policies(); },
+                                  0,
+                                  kLoadRangerPolicyRetryDelayMs);
+        });
+}
+
+void ranger_resource_policy_manager::dump_and_sync_policies()
+{
+    LOG_DEBUG("Start to sync Ranger policies to remote storage.");
+
+    dump_policies_to_remote_storage();
+    LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+
+    update_cached_policies();
+    LOG_DEBUG("Update using resources policies succeed.");
+
+    if (dsn::ERR_OK == sync_policies_to_app_envs()) {
+        LOG_ERROR("Sync policies to app envs failed.");
+    } else {
+        LOG_DEBUG("Sync policies to app envs succeeded.");
+    }
+}
+
+void ranger_resource_policy_manager::dump_policies_to_remote_storage()
+{
+    dsn::blob value = 
json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
+    _meta_svc->get_remote_storage()->set_data(
+        _ranger_policy_meta_root, value, LPC_USE_RANGER_ACCESS_CONTROL, 
[this](dsn::error_code e) {
+            if (e == dsn::ERR_OK) {
+                LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
+                return;
+            }
+            if (e == dsn::ERR_TIMEOUT) {
+                LOG_ERROR("Dump Ranger policies to remote storage timeout, 
retry later.");
+                dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
+                                      &_tracker,
+                                      [this]() { 
dump_policies_to_remote_storage(); },
+                                      0,
+                                      kLoadRangerPolicyRetryDelayMs);
+                return;
+            }
+            LOG_ERROR("Dump Ranger policies to remote storage failed.");
+        });
+}
+
+void ranger_resource_policy_manager::update_cached_policies()
+{
+    {
+        utils::auto_write_lock l(_global_policies_lock);
+        
_global_policies_cache.swap(_all_resource_policies[enum_to_string(resource_type::kGlobal)]);
+        // TODO(wanghao): provide a query method
+    }
+    {
+        utils::auto_write_lock l(_database_policies_lock);
+        _database_policies_cache.swap(
+            _all_resource_policies[enum_to_string(resource_type::kDatabase)]);
+        // TODO(wanghao): provide a query method
+    }
+}
+
+dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
+{
+    const auto &table_policies =
+        
_all_resource_policies.find(enum_to_string(resource_type::kDatabaseTable));
+    if (table_policies == _all_resource_policies.end()) {
+        LOG_INFO("DATABASE_TABLE level policy is empty, skip to sync app 
envs.");
+        return dsn::ERR_OK;
+    }
+
+    dsn::replication::configuration_list_apps_response list_resp;
+    dsn::replication::configuration_list_apps_request list_req;
+    list_req.status = dsn::app_status::AS_AVAILABLE;
+    _meta_svc->get_server_state()->list_apps(list_req, list_resp);
+    ERR_LOG_AND_RETURN_NOT_OK(list_resp.err, "list_apps failed.");
+    for (const auto &app : list_resp.infos) {
+        std::string database_name = 
get_database_name_from_app_name(app.app_name);
+        std::string table_name;
+        if (database_name.empty()) {
+            database_name = "*";
+            table_name = app.app_name;
+        } else {
+            table_name = app.app_name.substr(database_name.size());
+        }

Review Comment:
   Keep it separate, just get the `database_name` at the database level ACL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to