This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 78136dd0a feat(Ranger): refactor the logic when ranger performs ACL 
(#1518)
78136dd0a is described below

commit 78136dd0a5947cd33285a9d4ca0cb65abeccb685
Author: WHBANG <[email protected]>
AuthorDate: Thu Jun 15 15:48:41 2023 +0800

    feat(Ranger): refactor the logic when ranger performs ACL (#1518)
    
    https://github.com/apache/incubator-pegasus/issues/1054
    
    This patch fixes the judgment logic when ranger matches policies:
    
    1. Traverse all resource policies
       i. If the current policy matches deny_condition
          a. does not match any deny_exclude, returns kDenied, and the 
traversal ends
          b. A deny_exclude is matched, return kPending, and continue to the 
next policy judgment
       ii. No policy is matched or the return value is kPending, enter 2
    2. Traverse all resource policies again
       i. If the current policy matches allow_condition
          a. does not match any allow_exclude, returns kAllowed, and the 
traversal ends
          b. An allow_exclude is matched, return kPending, and continue to the 
next policy judgment
       ii. If the return value is kPending, it will return kDenied
    3. dose not match any policy, return kDenied
---
 src/runtime/ranger/access_type.h                   |  11 +
 src/runtime/ranger/ranger_resource_policy.cpp      | 248 +++++++++++++++++---
 src/runtime/ranger/ranger_resource_policy.h        | 208 ++++++++++++++++-
 .../ranger/ranger_resource_policy_manager.cpp      | 100 ++++----
 .../ranger/ranger_resource_policy_manager.h        |  12 +-
 src/runtime/security/meta_access_controller.cpp    |   4 +-
 src/runtime/security/replica_access_controller.cpp |   7 +-
 src/runtime/security/replica_access_controller.h   |   9 +-
 .../test/ranger_resource_policy_manager_test.cpp   | 256 ++++++++++++++++-----
 src/runtime/test/ranger_resource_policy_test.cpp   | 135 +++++++++--
 10 files changed, 832 insertions(+), 158 deletions(-)

diff --git a/src/runtime/ranger/access_type.h b/src/runtime/ranger/access_type.h
index dfa3d242d..b28795b65 100644
--- a/src/runtime/ranger/access_type.h
+++ b/src/runtime/ranger/access_type.h
@@ -20,6 +20,8 @@
 #include <cstdint>
 #include <type_traits>
 
+#include "utils/enum_helper.h"
+
 namespace dsn {
 namespace ranger {
 
@@ -35,6 +37,15 @@ enum class access_type : uint8_t
     kMetadata = 1 << 5,
     kControl = 1 << 6
 };
+ENUM_BEGIN(access_type, access_type::kInvalid)
+ENUM_REG(access_type::kRead)
+ENUM_REG(access_type::kWrite)
+ENUM_REG(access_type::kCreate)
+ENUM_REG(access_type::kDrop)
+ENUM_REG(access_type::kList)
+ENUM_REG(access_type::kMetadata)
+ENUM_REG(access_type::kControl)
+ENUM_END(access_type)
 
 using act = std::underlying_type<access_type>::type;
 
diff --git a/src/runtime/ranger/ranger_resource_policy.cpp 
b/src/runtime/ranger/ranger_resource_policy.cpp
index 86ab3d536..2342da754 100644
--- a/src/runtime/ranger/ranger_resource_policy.cpp
+++ b/src/runtime/ranger/ranger_resource_policy.cpp
@@ -18,6 +18,7 @@
 #include "ranger_resource_policy.h"
 
 #include "runtime/ranger/access_type.h"
+#include "utils/fmt_logging.h"
 
 namespace dsn {
 namespace ranger {
@@ -27,45 +28,240 @@ bool policy_item::match(const access_type &ac_type, const 
std::string &user_name
     return static_cast<bool>(access_types & ac_type) && users.count(user_name) 
!= 0;
 }
 
-bool acl_policies::allowed(const access_type &ac_type, const std::string 
&user_name) const
+template <>
+policy_check_status
+acl_policies::policies_check<policy_check_type::kAllow>(const access_type 
&ac_type,
+                                                        const std::string 
&user_name) const
 {
-    // 1. Check if it is not allowed.
-    for (const auto &deny_policy : deny_policies) {
-        // 1.1. In 'deny_policies'.
-        if (!deny_policy.match(ac_type, user_name)) {
+    return do_policies_check<policy_check_type::kAllow, 
policy_check_status::kAllowed>(ac_type,
+                                                                               
        user_name);
+}
+
+template <>
+policy_check_status
+acl_policies::policies_check<policy_check_type::kDeny>(const access_type 
&ac_type,
+                                                       const std::string 
&user_name) const
+{
+    return do_policies_check<policy_check_type::kDeny, 
policy_check_status::kDenied>(ac_type,
+                                                                               
      user_name);
+}
+
+template <>
+policy_check_status
+acl_policies::do_policies_check<policy_check_type::kAllow, 
policy_check_status::kAllowed>(
+    const access_type &ac_type, const std::string &user_name) const
+{
+    for (const auto &policy : allow_policies) {
+        // 1. Doesn't match an allow_policies.
+        if (!policy.match(ac_type, user_name)) {
             continue;
         }
-        bool in_deny_policies_exclude = false;
-        for (const auto &deny_policy_exclude : deny_policies_exclude) {
-            if (deny_policy_exclude.match(ac_type, user_name)) {
-                in_deny_policies_exclude = true;
-                break;
+        // 2. Matches a policy.
+        for (const auto &exclude_policy : allow_policies_exclude) {
+            if (exclude_policy.match(ac_type, user_name)) {
+                // 2.1. Matches an allow_policies_exclude.
+                return policy_check_status::kPending;
             }
         }
-        // 1.2. Not in any 'deny_policies_exclude', it's not allowed.
-        if (!in_deny_policies_exclude) {
-            return false;
-        }
+        // 2.2. Doesn't match any allow_exclude_policies.
+        return policy_check_status::kAllowed;
     }
+    // 3. Doesn't match any policy.
+    return policy_check_status::kNotMatched;
+}
 
-    // 2. Check if it is allowed.
-    for (const auto &allow_policy : allow_policies) {
-        // 2.1. In 'allow_policies'.
-        if (!allow_policy.match(ac_type, user_name)) {
+template <>
+policy_check_status
+acl_policies::do_policies_check<policy_check_type::kDeny, 
policy_check_status::kDenied>(
+    const access_type &ac_type, const std::string &user_name) const
+{
+    for (const auto &policy : deny_policies) {
+        // 1. Doesn't match a deny_policies.
+        if (!policy.match(ac_type, user_name)) {
             continue;
         }
-        for (const auto &allow_policy_exclude : allow_policies_exclude) {
-            // 2.2. In some 'allow_policies_exclude', it's not allowed.
-            if (allow_policy_exclude.match(ac_type, user_name)) {
-                return false;
+        // 2. Matches a policy.
+        for (const auto &exclude_policy : deny_policies_exclude) {
+            if (exclude_policy.match(ac_type, user_name)) {
+                // 2.1. Matches a deny_policies_exclude.
+                return policy_check_status::kPending;
+            }
+        }
+        // 2.2. Doesn't match any deny_exclude_policies.
+        return policy_check_status::kDenied;
+    }
+    // 3. Doesn't match any policy.
+    return policy_check_status::kNotMatched;
+}
+
+access_control_result
+check_ranger_resource_policy_allowed(const std::vector<ranger_resource_policy> 
&policies,
+                                     const access_type &ac_type,
+                                     const std::string &user_name,
+                                     const match_database_type &md_type,
+                                     const std::string &database_name,
+                                     const std::string &default_database_name)
+{
+    // Check if it is denied by any policy in current resource.
+    auto check_res = do_check_ranger_resource_policy<policy_check_type::kDeny>(
+        policies, ac_type, user_name, md_type, database_name, 
default_database_name);
+    if (access_control_result::kDenied == check_res) {
+        return access_control_result::kDenied;
+    }
+    CHECK(access_control_result::kPending == check_res, "the access control 
result must kPending.");
+
+    // Check if it is allowed by any policy in current resource.
+    check_res = do_check_ranger_resource_policy<policy_check_type::kAllow>(
+        policies, ac_type, user_name, md_type, database_name, 
default_database_name);
+    if (access_control_result::kAllowed == check_res) {
+        return access_control_result::kAllowed;
+    }
+    CHECK(access_control_result::kPending == check_res, "the access control 
result must kPending.");
+
+    // The check that does not match any policy in current reosource returns 
false.
+    return access_control_result::kDenied;
+}
+
+template <>
+access_control_result 
do_check_ranger_resource_policy<policy_check_type::kAllow>(
+    const std::vector<ranger_resource_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name,
+    const match_database_type &md_type,
+    const std::string &database_name,
+    const std::string &default_database_name)
+{
+    for (const auto &policy : policies) {
+        if (match_database_type::kNeed == md_type) {
+            // Lagacy table not match any database.
+            if (database_name.empty() && policy.database_names.count("*") == 0 
&&
+                policy.database_names.count(default_database_name) == 0) {
+                continue;
+            }
+            // New table not match any database.
+            if (!database_name.empty() && policy.database_names.count("*") == 
0 &&
+                policy.database_names.count(database_name) == 0) {
+                continue;
             }
         }
-        // 2.3. Not in any 'allow_policies_exclude', it's allowed.
-        return true;
+        auto check_status =
+            policy.policies.policies_check<policy_check_type::kAllow>(ac_type, 
user_name);
+        if (policy_check_status::kAllowed == check_status) {
+            return access_control_result::kAllowed;
+        }
+
+        // In a 'allow_policies' and in a 'allow_policies_exclude' or not 
match.
+        CHECK(policy_check_status::kPending == check_status ||
+                  policy_check_status::kNotMatched == check_status,
+              "the policy check status must be kPending or kNotMatched");
     }
+    return access_control_result::kPending;
+}
 
-    // 3. Otherwise, it's not allowed.
-    return false;
+template <>
+access_control_result 
do_check_ranger_resource_policy<policy_check_type::kDeny>(
+    const std::vector<ranger_resource_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name,
+    const match_database_type &md_type,
+    const std::string &database_name,
+    const std::string &default_database_name)
+{
+    for (const auto &policy : policies) {
+        if (match_database_type::kNeed == md_type) {
+            // Lagacy table not match any database.
+            if (database_name.empty() && policy.database_names.count("*") == 0 
&&
+                policy.database_names.count(default_database_name) == 0) {
+                continue;
+            }
+            // New table not match any database.
+            if (!database_name.empty() && policy.database_names.count("*") == 
0 &&
+                policy.database_names.count(database_name) == 0) {
+                continue;
+            }
+        }
+        auto check_status =
+            policy.policies.policies_check<policy_check_type::kDeny>(ac_type, 
user_name);
+        if (policy_check_status::kDenied == check_status) {
+            return access_control_result::kDenied;
+        }
+
+        // In a 'deny_policies' and in a 'deny_policies_exclude' or not match.
+        CHECK(policy_check_status::kPending == check_status ||
+                  policy_check_status::kNotMatched == check_status,
+              "the policy check status must be kPending or kNotMatched");
+    }
+    return access_control_result::kPending;
+}
+
+access_control_result check_ranger_database_table_policy_allowed(
+    const std::vector<matched_database_table_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name)
+{
+    // Check if it is denied by any DATABASE_TABLE policy.
+    auto check_res = 
do_check_ranger_database_table_policy<policy_check_type::kDeny>(
+        policies, ac_type, user_name);
+    if (access_control_result::kDenied == check_res) {
+        return access_control_result::kDenied;
+    }
+    CHECK(access_control_result::kPending == check_res, "the access control 
result must kPending.");
+
+    // Check if it is allowed by any DATABASE_TABLE policy.
+    check_res = 
do_check_ranger_database_table_policy<policy_check_type::kAllow>(
+        policies, ac_type, user_name);
+    if (access_control_result::kAllowed == check_res) {
+        return access_control_result::kAllowed;
+    }
+    CHECK(access_control_result::kPending == check_res, "the access control 
result must kPending.");
+
+    // The check that does not match any DATABASE_TABLE policy returns false.
+    return access_control_result::kDenied;
+}
+
+template <>
+access_control_result 
do_check_ranger_database_table_policy<policy_check_type::kDeny>(
+    const std::vector<matched_database_table_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name)
+{
+    for (const auto &policy : policies) {
+        auto check_status =
+            policy.policies.policies_check<policy_check_type::kDeny>(ac_type, 
user_name);
+        // When policy_check_type is 'kDeny' and in a 'deny_policies' and not 
in any
+        // 'deny_policies_exclude'.
+        if (policy_check_status::kDenied == check_status) {
+            return access_control_result::kDenied;
+        }
+
+        // In a 'policies' and in a 'policies_exclude' or not match.
+        CHECK(policy_check_status::kPending == check_status ||
+                  policy_check_status::kNotMatched == check_status,
+              "the policy check status must be kPending or kNotMatched");
+    }
+    return access_control_result::kPending;
+}
+
+template <>
+access_control_result 
do_check_ranger_database_table_policy<policy_check_type::kAllow>(
+    const std::vector<matched_database_table_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name)
+{
+    for (const auto &policy : policies) {
+        auto check_status =
+            policy.policies.policies_check<policy_check_type::kAllow>(ac_type, 
user_name);
+        // When policy_check_type is 'kAllow' and in a 'allow_policies' and 
not in any
+        // 'allow_policies_exclude'.
+        if (policy_check_status::kAllowed == check_status) {
+            return access_control_result::kAllowed;
+        }
+        // In a 'policies' and in a 'policies_exclude' or not match.
+        CHECK(policy_check_status::kPending == check_status ||
+                  policy_check_status::kNotMatched == check_status,
+              "the policy check status must be kPending or kNotMatched");
+    }
+    return access_control_result::kPending;
 }
 
 } // namespace ranger
diff --git a/src/runtime/ranger/ranger_resource_policy.h 
b/src/runtime/ranger/ranger_resource_policy.h
index 61ea68017..95b93d595 100644
--- a/src/runtime/ranger/ranger_resource_policy.h
+++ b/src/runtime/ranger/ranger_resource_policy.h
@@ -23,10 +23,56 @@
 
 #include "access_type.h"
 #include "common/json_helper.h"
+#include "utils/enum_helper.h"
 
 namespace dsn {
 namespace ranger {
 
+// Types of policy checks.
+// kAllow means this checks for 'allow_policies' and 'allow_policies_exclude'.
+// kDeny means this checks for 'deny_policies' and 'deny_policies_exclude'.
+enum class policy_check_type
+{
+    kAllow = 0,
+    kDeny,
+    kInvalid
+};
+ENUM_BEGIN(policy_check_type, policy_check_type::kInvalid)
+ENUM_REG(policy_check_type::kAllow)
+ENUM_REG(policy_check_type::kDeny)
+ENUM_END(policy_check_type)
+
+// The return status code when a policy('kAllow' or 'kDeny' policy_check_type) 
is checked.
+// kAllowed means in a 'allow_policies' and not in any 
'allow_policies_exclude'.
+// kDenied means in a 'deny_policies' and not in any 'deny_policies_exclude'.
+// kNotMatched means not match any 'allow_policies' or 'deny_policies'.
+// kPending means in a 'allow_policies/deny_policies' and in a
+// 'allow_policies_exclude/deny_policies_exclude'.
+enum class policy_check_status
+{
+    kAllowed = 0,
+    kDenied,
+    kNotMatched,
+    kPending,
+    kInvalid
+};
+
+enum class access_control_result
+{
+    kAllowed = 0,
+    kDenied,
+    kPending
+};
+
+// Used to determine whether the policy needs to match the database. kNotNeed 
means no, kNeed means
+// yes.
+enum class match_database_type
+{
+    kNotNeed = 0,
+    kNeed
+
+};
+
 // Ranger policy data structure
 struct policy_item
 {
@@ -56,10 +102,37 @@ struct acl_policies
                               deny_policies,
                               deny_policies_exclude);
 
-    // Check whether the 'user_name' is allowed to access the resource by type 
of 'ac_type'.
-    bool allowed(const access_type &ac_type, const std::string &user_name) 
const;
+    // Check if 'allow_policies' or 'deny_policies' allow or deny 'user_name' 
to access the resource
+    // by type 'ac_type'.
+    template <policy_check_type check_type>
+    policy_check_status policies_check(const access_type &ac_type,
+                                       const std::string &user_name) const;
+
+    template <policy_check_type check_type, policy_check_status check_status>
+    policy_check_status do_policies_check(const access_type &ac_type,
+                                          const std::string &user_name) const;
 };
 
+template <>
+policy_check_status
+acl_policies::policies_check<policy_check_type::kAllow>(const access_type 
&ac_type,
+                                                        const std::string 
&user_name) const;
+
+template <>
+policy_check_status
+acl_policies::policies_check<policy_check_type::kDeny>(const access_type 
&ac_type,
+                                                       const std::string 
&user_name) const;
+
+template <>
+policy_check_status
+acl_policies::do_policies_check<policy_check_type::kAllow, 
policy_check_status::kAllowed>(
+    const access_type &ac_type, const std::string &user_name) const;
+
+template <>
+policy_check_status
+acl_policies::do_policies_check<policy_check_type::kDeny, 
policy_check_status::kDenied>(
+    const access_type &ac_type, const std::string &user_name) const;
+
 // A policy data structure definition of ranger resources
 struct ranger_resource_policy
 {
@@ -68,8 +141,137 @@ struct ranger_resource_policy
     std::unordered_set<std::string> table_names;
     acl_policies policies;
 
-    DEFINE_JSON_SERIALIZATION(name, database_names, table_names, policies)
+    DEFINE_JSON_SERIALIZATION(name, database_names, table_names, policies);
+};
+
+// A policy data structure definition of the DATABASE_TABLE resource, which 
will be set in
+// 'app_envs'
+struct matched_database_table_policy
+{
+    std::string matched_database_name;
+    std::string matched_table_name;
+    acl_policies policies;
+
+    DEFINE_JSON_SERIALIZATION(matched_database_name, matched_table_name, 
policies);
 };
 
+// Returns 'access_control_result::kAllowed' if 'policies' allows 'user_name' 
to access
+// 'database_name' via 'ac_type', returns 'access_control_result::kDenied' 
means not.
+// 'need_match_database' being true means that the 'policies' needs to be 
matched to the database
+// first, false means not.
+// If 'ac_type' is DATABASE access type, it needs to match database, if 
'ac_type' is a GLOBAL access
+// type, it does not need to match.
+/*
+                *** Ranger Policy Evaluation Flow ***
+
+                    +-----------------+
+                     \ Resource access \
+                      \    request      \
+                       +-------+---------+
+                               |
+                         +-----v-------+
+                        /               \
+                       /     Has a       \
+         +-----N------+  resource policy  <----------------N-----------------+
+         |             \  been matched ? /                                   |
+         |              \               /                                    |
+         |               +-----+-------+                                     |
+         |                     |                                             |
+         |                     Y                                             |
+         |                     |                                             |
+         |               +-----v-------+                  +-------------+    |
+         |              /               \                /               \   |
+         |             /    Has more     \              /   Has more      \  |
+         |      +----->   policies with   +---N--+---->+  policies with    +-+
+         |      |      \ Deny Condition? /       |      \ Allow Condition?/
+         |      |       \               /        |       \               /
+         |      |        +------+------+         |        +------+------+
+         |      |               |                |               |
+         |      |               Y                |               Y
+         |      |               |                |               |
+         |      |        +------v------+         |        +------v------+
+         |      |       /    Request    \        |       /    Request    \
+         |      |      / matches a deny  \       |      /matches an allow \
+         |      +--N--+ condition in the  +      +--N--+ condition in the  +
+         |      |      \     policy?     /       |      \    policy?      /
+         |      |       \               /        |       \               /
+         |      |        +------+------+         |        +------+------+
+         |      |               |                |               |
+         |      |               Y                |               Y
+         |      |               |                |               |
+         |      |        +------v------+         |        +------v------+
+         |      |       /    Request    \        |       /    Request    \
+         |      |      /  matches a deny \       |      / matches an allow\
+         |      +--Y--+   exclude in the  +      +--Y--+   exclude in the  +
+         |             \      policy?    /              \      policy?    /
+         |              \               /                \               /
+         |               +------+------+                  +------+------+
+         |                      |                                |
+         |                      N                                N
+         |                      |                                |
+   +-----v-----+         +------v------+                  +------v------+
+   |    DENY   |         |    DENY     |                  |    ALLOW    |
+   +-----------+         +-------------+                  +-------------+
+*/
+access_control_result
+check_ranger_resource_policy_allowed(const std::vector<ranger_resource_policy> 
&policies,
+                                     const access_type &ac_type,
+                                     const std::string &user_name,
+                                     const match_database_type &md_type,
+                                     const std::string &database_name,
+                                     const std::string &default_database_name);
+
+template <policy_check_type check_type>
+access_control_result
+do_check_ranger_resource_policy(const std::vector<ranger_resource_policy> 
&policies,
+                                const access_type &ac_type,
+                                const std::string &user_name,
+                                const match_database_type &md_type,
+                                const std::string &database_name,
+                                const std::string &default_database_name);
+
+template <>
+access_control_result 
do_check_ranger_resource_policy<policy_check_type::kAllow>(
+    const std::vector<ranger_resource_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name,
+    const match_database_type &md_type,
+    const std::string &database_name,
+    const std::string &default_database_name);
+
+template <>
+access_control_result 
do_check_ranger_resource_policy<policy_check_type::kDeny>(
+    const std::vector<ranger_resource_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name,
+    const match_database_type &md_type,
+    const std::string &database_name,
+    const std::string &default_database_name);
+
+// Return 'access_control_result::kAllowed' if 'policies' allow 'user_name' to 
access, this is used
+// for DATABASE_TABLE resource, returns 'access_control_result::kDenied' means 
not.
+access_control_result check_ranger_database_table_policy_allowed(
+    const std::vector<matched_database_table_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name);
+
+template <policy_check_type check_type>
+access_control_result
+do_check_ranger_database_table_policy(const 
std::vector<matched_database_table_policy> &policies,
+                                      const access_type &ac_type,
+                                      const std::string &user_name);
+
+template <>
+access_control_result 
do_check_ranger_database_table_policy<policy_check_type::kDeny>(
+    const std::vector<matched_database_table_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name);
+
+template <>
+access_control_result 
do_check_ranger_database_table_policy<policy_check_type::kAllow>(
+    const std::vector<matched_database_table_policy> &policies,
+    const access_type &ac_type,
+    const std::string &user_name);
+
 } // namespace ranger
 } // namespace dsn
diff --git a/src/runtime/ranger/ranger_resource_policy_manager.cpp 
b/src/runtime/ranger/ranger_resource_policy_manager.cpp
index 44d22d789..df3172b93 100644
--- a/src/runtime/ranger/ranger_resource_policy_manager.cpp
+++ b/src/runtime/ranger/ranger_resource_policy_manager.cpp
@@ -216,9 +216,8 @@ void ranger_resource_policy_manager::start()
                            std::chrono::milliseconds(1));
 }
 
-bool ranger_resource_policy_manager::allowed(const int rpc_code,
-                                             const std::string &user_name,
-                                             const std::string &database_name)
+access_control_result ranger_resource_policy_manager::allowed(
+    const int rpc_code, const std::string &user_name, const std::string 
&database_name) const
 {
     do {
         const auto &ac_type = _ac_type_of_global_rpcs.find(rpc_code);
@@ -227,16 +226,14 @@ bool ranger_resource_policy_manager::allowed(const int 
rpc_code,
             break;
         }
 
-        // Check if it is allowed by any GLOBAL policy.
+        // Check if it is denied by any GLOBAL policy.
         utils::auto_read_lock l(_global_policies_lock);
-        for (const auto &policy : _global_policies_cache) {
-            if (policy.policies.allowed(ac_type->second, user_name)) {
-                return true;
-            }
-        }
-
-        // The check that does not match any GLOBAL policy returns false.
-        return false;
+        return check_ranger_resource_policy_allowed(_global_policies_cache,
+                                                    ac_type->second,
+                                                    user_name,
+                                                    
match_database_type::kNotNeed,
+                                                    "",
+                                                    "");
     } while (false);
 
     do {
@@ -246,29 +243,18 @@ bool ranger_resource_policy_manager::allowed(const int 
rpc_code,
             break;
         }
 
-        // legacy table belongs to the default database.
-        std::string db_name =
-            database_name.empty() ? 
FLAGS_legacy_table_database_mapping_policy_name : database_name;
-
-        // Check if it is allowed by any DATABASE policy.
         utils::auto_read_lock l(_database_policies_lock);
-        for (const auto &policy : _database_policies_cache) {
-            if (!policy.policies.allowed(ac_type->second, user_name)) {
-                continue;
-            }
-            // "*" can match any table, including legacy table and new table.
-            if (policy.database_names.count("*") != 0 ||
-                policy.database_names.count(db_name) != 0) {
-                return true;
-            }
-        }
-
-        // The check that does not match any DATABASE policy returns false.
-        return false;
+        return check_ranger_resource_policy_allowed(
+            _database_policies_cache,
+            ac_type->second,
+            user_name,
+            match_database_type::kNeed,
+            database_name,
+            FLAGS_legacy_table_database_mapping_policy_name);
     } while (false);
 
-    // The check that does not match any policy returns false.
-    return false;
+    // The check that does not match any resource returns false.
+    return access_control_result::kDenied;
 }
 
 void ranger_resource_policy_manager::parse_policies_from_json(const 
rapidjson::Value &data,
@@ -599,34 +585,54 @@ dsn::error_code 
ranger_resource_policy_manager::sync_policies_to_app_envs()
         req->__set_app_name(app.app_name);
         req->__set_keys(
             
{dsn::replication::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES});
-        bool is_policy_matched = false;
+        std::vector<matched_database_table_policy> 
matched_database_table_policies;
         for (const auto &policy : table_policies->second) {
-            // If this table does not match any database, its Ranger policies 
will be cleaned up.
+            // If this table does not match any database, this policy will be 
skipped and will not
+            // be written into app_envs.
             if (policy.database_names.count(database_name) == 0 &&
                 policy.database_names.count("*") == 0) {
                 continue;
             }
+            // If this table does not match any database table, this policy 
will be skipped and will
+            // not be written into app_envs.
+            if (policy.table_names.count(table_name) == 0 && 
policy.table_names.count("*") == 0) {
+                continue;
+            }
+            // This table matches a policy.
+            matched_database_table_policy database_table_policy(
+                {database_name, table_name, policy.policies});
+            // This table matches the policy whose database is "*".
+            if (policy.database_names.count(database_name) == 0) {
+                CHECK(policy.database_names.count("*") != 0,
+                      "the list of database_name must contain *");
+                database_table_policy.matched_database_name = "*";
+            }
+            // This table matches the policy whose database table is "*".
+            if (policy.table_names.count(table_name) == 0) {
+                CHECK(policy.table_names.count("*") != 0, "the list of 
table_name must contain *");
+                database_table_policy.matched_table_name = "*";
+            }
+            
matched_database_table_policies.emplace_back(database_table_policy);
+        }
+        if (matched_database_table_policies.empty()) {
+            // There is no matched policy, clear app Ranger policy
+            
req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_DEL);
 
-            is_policy_matched = true;
+            dsn::replication::update_app_env_rpc rpc(std::move(req), 
LPC_USE_RANGER_ACCESS_CONTROL);
+            _meta_svc->get_server_state()->del_app_envs(rpc);
+            _meta_svc->get_server_state()->wait_all_task();
+            LOG_AND_RETURN_NOT_OK(ERROR, rpc.response().err, "del_app_envs 
failed.");
+        } else {
             
req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_SET);
             req->__set_values(
-                
{json::json_forwarder<acl_policies>::encode(policy.policies).to_string()});
+                
{json::json_forwarder<std::vector<matched_database_table_policy>>::encode(
+                     matched_database_table_policies)
+                     .to_string()});
 
             dsn::replication::update_app_env_rpc rpc(std::move(req), 
LPC_USE_RANGER_ACCESS_CONTROL);
             _meta_svc->get_server_state()->set_app_envs(rpc);
             _meta_svc->get_server_state()->wait_all_task();
             LOG_AND_RETURN_NOT_OK(ERROR, rpc.response().err, "set_app_envs 
failed.");
-            break;
-        }
-
-        // There is no matched policy, clear the table's Ranger policies.
-        if (!is_policy_matched) {
-            
req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_DEL);
-
-            dsn::replication::update_app_env_rpc rpc(std::move(req), 
LPC_USE_RANGER_ACCESS_CONTROL);
-            _meta_svc->get_server_state()->del_app_envs(rpc);
-            _meta_svc->get_server_state()->wait_all_task();
-            LOG_AND_RETURN_NOT_OK(ERROR, rpc.response().err, "del_app_envs 
failed.");
         }
     }
 
diff --git a/src/runtime/ranger/ranger_resource_policy_manager.h 
b/src/runtime/ranger/ranger_resource_policy_manager.h
index 7e46bec31..52aeb2c72 100644
--- a/src/runtime/ranger/ranger_resource_policy_manager.h
+++ b/src/runtime/ranger/ranger_resource_policy_manager.h
@@ -74,9 +74,11 @@ public:
     // When using Ranger for ACL, periodically pull policies from Ranger 
service.
     void start();
 
-    // Return true if the 'user_name' is allowed to access 'database_name' via 
'rpc_code'.
-    bool
-    allowed(const int rpc_code, const std::string &user_name, const 
std::string &database_name);
+    // Return 'access_control_result::kAllowed' if the 'user_name' is allowed 
to access
+    // 'database_name' via 'rpc_code'.
+    access_control_result allowed(const int rpc_code,
+                                  const std::string &user_name,
+                                  const std::string &database_name) const;
 
 private:
     // Parse Ranger ACL policies from 'data' in JSON format into 'policies'.
@@ -118,8 +120,8 @@ private:
     std::string _ranger_policy_meta_root;
 
     replication::meta_service *_meta_svc;
-    utils::rw_lock_nr _global_policies_lock;
-    utils::rw_lock_nr _database_policies_lock;
+    mutable utils::rw_lock_nr _global_policies_lock;
+    mutable utils::rw_lock_nr _database_policies_lock;
 
     // The access type of RPCs which access global level resources.
     access_type_of_rpc_code _ac_type_of_global_rpcs;
diff --git a/src/runtime/security/meta_access_controller.cpp 
b/src/runtime/security/meta_access_controller.cpp
index 74b75f097..833481595 100644
--- a/src/runtime/security/meta_access_controller.cpp
+++ b/src/runtime/security/meta_access_controller.cpp
@@ -20,6 +20,7 @@
 #include <vector>
 
 #include "runtime/ranger/ranger_resource_policy_manager.h"
+#include "runtime/ranger/ranger_resource_policy.h"
 #include "runtime/rpc/network.h"
 #include "runtime/rpc/rpc_message.h"
 #include "runtime/task/task_code.h"
@@ -113,7 +114,8 @@ bool meta_access_controller::allowed(message_ex *msg, const 
std::string &app_nam
               user_name,
               msg->rpc_code(),
               database_name);
-    return _ranger_resource_policy_manager->allowed(rpc_code, user_name, 
database_name);
+    return _ranger_resource_policy_manager->allowed(rpc_code, user_name, 
database_name) ==
+           ranger::access_control_result::kAllowed;
 }
 
 void meta_access_controller::register_allowed_rpc_code_list(
diff --git a/src/runtime/security/replica_access_controller.cpp 
b/src/runtime/security/replica_access_controller.cpp
index 5832b05d3..c50a60ea3 100644
--- a/src/runtime/security/replica_access_controller.cpp
+++ b/src/runtime/security/replica_access_controller.cpp
@@ -69,7 +69,8 @@ bool replica_access_controller::allowed(message_ex *msg, 
ranger::access_type req
     // use Ranger policy for ACL.
     {
         utils::auto_read_lock l(_lock);
-        return _ranger_policies.allowed(req_type, user_name);
+        return check_ranger_database_table_policy_allowed(_ranger_policies, 
req_type, user_name) ==
+               ranger::access_control_result::kAllowed;
     }
 }
 
@@ -102,9 +103,9 @@ void 
replica_access_controller::update_ranger_policies(const std::string &polici
             return;
         }
     }
-    ranger::acl_policies tmp_policies;
+    matched_database_table_policies tmp_policies;
     auto tmp_policies_str = policies;
-    dsn::json::json_forwarder<ranger::acl_policies>::decode(
+    dsn::json::json_forwarder<matched_database_table_policies>::decode(
         dsn::blob::create_from_bytes(std::move(tmp_policies_str)), 
tmp_policies);
     {
         utils::auto_write_lock l(_lock);
diff --git a/src/runtime/security/replica_access_controller.h 
b/src/runtime/security/replica_access_controller.h
index f3790d5d4..dcaeaa758 100644
--- a/src/runtime/security/replica_access_controller.h
+++ b/src/runtime/security/replica_access_controller.h
@@ -19,8 +19,10 @@
 
 #include <string>
 #include <unordered_set>
+#include <vector>
 
 #include "access_controller.h"
+#include "common/json_helper.h"
 #include "runtime/ranger/access_type.h"
 #include "runtime/ranger/ranger_resource_policy.h"
 #include "utils/synchronize.h"
@@ -29,6 +31,9 @@ namespace dsn {
 class message_ex;
 
 namespace security {
+
+using matched_database_table_policies = 
std::vector<ranger::matched_database_table_policy>;
+
 class replica_access_controller : public access_controller
 {
 public:
@@ -46,6 +51,8 @@ public:
     // table changes
     void update_ranger_policies(const std::string &policies) override;
 
+    DEFINE_JSON_SERIALIZATION(_ranger_policies);
+
 private:
     // Security check to avoid allowed_users is not empty in special scenarios.
     void check_allowed_users_valid() const;
@@ -64,7 +71,7 @@ private:
     std::string _env_policies;
 
     // The Ranger policies for ACL.
-    ranger::acl_policies _ranger_policies;
+    matched_database_table_policies _ranger_policies;
 
     std::string _name;
 
diff --git a/src/runtime/test/ranger_resource_policy_manager_test.cpp 
b/src/runtime/test/ranger_resource_policy_manager_test.cpp
index 6660cab42..1971810c9 100644
--- a/src/runtime/test/ranger_resource_policy_manager_test.cpp
+++ b/src/runtime/test/ranger_resource_policy_manager_test.cpp
@@ -193,27 +193,174 @@ TEST(ranger_resource_policy_manager_test, 
ranger_resource_policy_serialized_test
     {
         access_type ac_type;
         std::string user_name;
-        bool expected_result;
-    } tests[] = {{access_type::kRead, "user", false},      
{access_type::kRead, "user1", true},
-                 {access_type::kWrite, "user1", true},     
{access_type::kCreate, "user1", false},
-                 {access_type::kDrop, "user1", false},     
{access_type::kList, "user1", true},
-                 {access_type::kMetadata, "user1", false}, 
{access_type::kControl, "user1", false},
-                 {access_type::kRead, "user2", true},      
{access_type::kWrite, "user2", false},
-                 {access_type::kCreate, "user2", false},   
{access_type::kDrop, "user2", false},
-                 {access_type::kList, "user2", true},      
{access_type::kMetadata, "user2", false},
-                 {access_type::kControl, "user2", false},  
{access_type::kRead, "user3", false},
-                 {access_type::kWrite, "user3", false},    
{access_type::kCreate, "user3", false},
-                 {access_type::kDrop, "user3", false},     
{access_type::kList, "user3", true},
-                 {access_type::kMetadata, "user3", false}, 
{access_type::kControl, "user3", false},
-                 {access_type::kRead, "user4", true},      
{access_type::kWrite, "user4", false},
-                 {access_type::kCreate, "user4", false},   
{access_type::kDrop, "user4", false},
-                 {access_type::kList, "user4", true},      
{access_type::kMetadata, "user4", false},
-                 {access_type::kControl, "user4", false}};
+        policy_check_type check_type;
+        policy_check_status expected_result;
+    } tests[] = {
+        // user does not match any 'user_name' in allow_policies.
+        {access_type::kRead, "user", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        {access_type::kRead, "user1", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        {access_type::kWrite, "user1", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // user1: 'kCreate' and 'kDrop' do not match any ACLs in 
allow_policies.
+        {access_type::kCreate,
+         "user1",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kDrop, "user1", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        {access_type::kList, "user1", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // user1: 'kMetadata' do not match any ACLs in allow_policies.
+        {access_type::kMetadata,
+         "user1",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kControl,
+         "user1",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kRead, "user2", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // user2: in a 'allow_policies' and in 'allow_policies_exclude'
+        {access_type::kWrite, "user2", policy_check_type::kAllow, 
policy_check_status::kPending},
+        // user2: 'kCreate' and 'kDrop' do not match any ACLs in 
allow_policies.
+        {access_type::kCreate,
+         "user2",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kDrop, "user2", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        {access_type::kList, "user2", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // user2: 'kMetadata' and 'kControl' do not match any ACLs in 
allow_policies.
+        {access_type::kMetadata,
+         "user2",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kControl,
+         "user2",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kRead, "user3", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        {access_type::kWrite, "user3", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // user3: 'kCreate' and 'kDrop' do not match any ACLs in 
allow_policies.
+        {access_type::kCreate,
+         "user3",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kDrop, "user3", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        {access_type::kList, "user3", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // user3: 'kMetadata' and 'kControl' do not match any ACLs in 
allow_policies.
+        {access_type::kMetadata,
+         "user3",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kControl,
+         "user3",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kRead, "user4", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        {access_type::kWrite, "user4", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // user4: 'kCreate' and 'kDrop' do not match any ACLs in 
allow_policies.
+        {access_type::kCreate,
+         "user4",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kDrop, "user4", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        {access_type::kList, "user4", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // user4: 'kMetadata' and 'kControl' do not match any ACLs in 
allow_policies.
+        {access_type::kMetadata,
+         "user4",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        {access_type::kControl,
+         "user4",
+         policy_check_type::kAllow,
+         policy_check_status::kNotMatched},
+        // user, user1, user2 do not match any 'user_name' in deny_policies.
+        {access_type::kRead, "user", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kRead, "user1", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kWrite, "user1", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kCreate, "user1", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kDrop, "user1", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kList, "user1", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kMetadata,
+         "user1",
+         policy_check_type::kDeny,
+         policy_check_status::kNotMatched},
+        {access_type::kControl,
+         "user1",
+         policy_check_type::kDeny,
+         policy_check_status::kNotMatched},
+        {access_type::kRead, "user2", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kWrite, "user2", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kCreate, "user2", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kDrop, "user2", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kList, "user2", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kMetadata,
+         "user2",
+         policy_check_type::kDeny,
+         policy_check_status::kNotMatched},
+        {access_type::kControl,
+         "user2",
+         policy_check_type::kDeny,
+         policy_check_status::kNotMatched},
+        {access_type::kRead, "user3", policy_check_type::kDeny, 
policy_check_status::kDenied},
+        {access_type::kWrite, "user3", policy_check_type::kDeny, 
policy_check_status::kDenied},
+        // user3: 'kCreate', 'kDrop', 'kList', 'kMetadata', 'kControl' do not 
match any ACLs in
+        // allow_policies.
+        {access_type::kCreate, "user3", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kDrop, "user3", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kList, "user3", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kMetadata,
+         "user3",
+         policy_check_type::kDeny,
+         policy_check_status::kNotMatched},
+        {access_type::kControl,
+         "user3",
+         policy_check_type::kDeny,
+         policy_check_status::kNotMatched},
+        // user4: in a 'deny_policies' and in 'deny_policies_exclude'
+        {access_type::kRead, "user4", policy_check_type::kDeny, 
policy_check_status::kPending},
+        {access_type::kWrite, "user4", policy_check_type::kDeny, 
policy_check_status::kDenied},
+        // user4: 'kCreate', 'kDrop', 'kList', 'kMetadata', 'kControl' do not 
match any ACLs in
+        // allow_policies.
+        {access_type::kCreate, "user4", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kDrop, "user4", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kList, "user4", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        {access_type::kMetadata,
+         "user4",
+         policy_check_type::kDeny,
+         policy_check_status::kNotMatched},
+        {access_type::kControl,
+         "user4",
+         policy_check_type::kDeny,
+         policy_check_status::kNotMatched}};
     for (const auto &test : tests) {
-        auto actual_result = policy.policies.allowed(test.ac_type, 
test.user_name);
-        EXPECT_EQ(test.expected_result, actual_result);
-        actual_result = policy_serialized.policies.allowed(test.ac_type, 
test.user_name);
-        EXPECT_EQ(test.expected_result, actual_result);
+        policy_check_status actual_result_1 = policy_check_status::kInvalid;
+        policy_check_status actual_result_2 = policy_check_status::kInvalid;
+        switch (test.check_type) {
+        case policy_check_type::kAllow:
+            actual_result_1 = 
policy.policies.policies_check<policy_check_type::kAllow>(
+                test.ac_type, test.user_name);
+            actual_result_2 = 
policy_serialized.policies.policies_check<policy_check_type::kAllow>(
+                test.ac_type, test.user_name);
+            break;
+        case policy_check_type::kDeny:
+            actual_result_1 = 
policy.policies.policies_check<policy_check_type::kDeny>(
+                test.ac_type, test.user_name);
+            actual_result_2 = 
policy_serialized.policies.policies_check<policy_check_type::kDeny>(
+                test.ac_type, test.user_name);
+            break;
+        case policy_check_type::kInvalid:
+        default:
+            break;
+        }
+        EXPECT_EQ(test.expected_result, actual_result_1)
+            << fmt::format("ac_type: {}, user_name: {}, check_type: {}",
+                           enum_to_string(test.ac_type),
+                           test.user_name,
+                           enum_to_string(test.check_type));
+
+        EXPECT_EQ(test.expected_result, actual_result_2)
+            << fmt::format("ac_type: {}, user_name: {}, check_type: {}",
+                           enum_to_string(test.ac_type),
+                           test.user_name,
+                           enum_to_string(test.check_type));
     }
 }
 
@@ -328,40 +475,41 @@ TEST_F(ranger_resource_policy_manager_function_test, 
allowed)
         std::string rpc_code;
         std::string user_name;
         std::string database_name;
-        bool expected_result;
-    } tests[] = {{"TASK_CODE_INVALID", "user1", "database1", false},
-                 {"RPC_CM_CREATE_APP", "user1", "database1", false},
-                 {"RPC_CM_CREATE_APP", "user2", "database1", false},
-                 {"RPC_CM_LIST_APPS", "user1", "database1", true},
-                 {"RPC_CM_LIST_APPS", "user2", "database1", true},
-                 {"RPC_CM_GET_MAX_REPLICA_COUNT", "user1", "database1", true},
-                 {"RPC_CM_GET_MAX_REPLICA_COUNT", "user2", "database1", false},
-                 {"TASK_CODE_INVALID", "user3", "database2", false},
-                 {"RPC_CM_CREATE_APP", "user3", "database2", true},
-                 {"RPC_CM_CREATE_APP", "user4", "database2", true},
-                 {"RPC_CM_START_BACKUP_APP", "user3", "database2", true},
-                 {"RPC_CM_START_BACKUP_APP", "user4", "database2", false},
-                 {"TASK_CODE_INVALID", "user5", "", false},
-                 // Next two case matched to the default database policy and 
"*" database.
-                 {"RPC_CM_CREATE_APP", "user5", "", true},
-                 {"RPC_CM_CREATE_APP", "user6", "", true},
-                 // Next two case matched to the database policy named "*".
-                 {"RPC_CM_CREATE_APP", "user5", "any_database_name", true},
-                 {"RPC_CM_CREATE_APP", "user6", "any_database_name", false},
-                 {"RPC_CM_CREATE_APP", "user6", "database2", false},
-                 {"TASK_CODE_INVALID", "user7", "database3", false},
-                 {"RPC_CM_LIST_NODES", "user7", "database3", true},
-                 {"RPC_CM_LIST_NODES", "user8", "database3", false},
-                 // RPC_CM_LIST_APPS has been removed from global resources.
-                 {"RPC_CM_LIST_APPS", "user7", "database3", false},
-                 {"RPC_CM_LIST_APPS", "user8", "database3", false},
-                 {"TASK_CODE_INVALID", "user9", "database4", false},
-                 {"RPC_CM_LIST_NODES", "user9", "database4", false},
-                 {"RPC_CM_LIST_NODES", "user10", "database4", false},
-                 {"RPC_CM_LIST_APPS", "user9", "database4", false},
-                 {"RPC_CM_LIST_APPS", "user10", "database4", false},
-                 {"RPC_CM_CONTROL_META", "user9", "database4", true},
-                 {"RPC_CM_CONTROL_META", "user10", "database4", false}};
+        access_control_result expected_result;
+    } tests[] = {
+        {"TASK_CODE_INVALID", "user1", "database1", 
access_control_result::kDenied},
+        {"RPC_CM_CREATE_APP", "user1", "database1", 
access_control_result::kDenied},
+        {"RPC_CM_CREATE_APP", "user2", "database1", 
access_control_result::kDenied},
+        {"RPC_CM_LIST_APPS", "user1", "database1", 
access_control_result::kAllowed},
+        {"RPC_CM_LIST_APPS", "user2", "database1", 
access_control_result::kAllowed},
+        {"RPC_CM_GET_MAX_REPLICA_COUNT", "user1", "database1", 
access_control_result::kAllowed},
+        {"RPC_CM_GET_MAX_REPLICA_COUNT", "user2", "database1", 
access_control_result::kDenied},
+        {"TASK_CODE_INVALID", "user3", "database2", 
access_control_result::kDenied},
+        {"RPC_CM_CREATE_APP", "user3", "database2", 
access_control_result::kAllowed},
+        {"RPC_CM_CREATE_APP", "user4", "database2", 
access_control_result::kAllowed},
+        {"RPC_CM_START_BACKUP_APP", "user3", "database2", 
access_control_result::kAllowed},
+        {"RPC_CM_START_BACKUP_APP", "user4", "database2", 
access_control_result::kDenied},
+        {"TASK_CODE_INVALID", "user5", "", access_control_result::kDenied},
+        // Next two case matched to the default database policy and "*" 
database.
+        {"RPC_CM_CREATE_APP", "user5", "", access_control_result::kAllowed},
+        {"RPC_CM_CREATE_APP", "user6", "", access_control_result::kAllowed},
+        // Next two case matched to the database policy named "*".
+        {"RPC_CM_CREATE_APP", "user5", "any_database_name", 
access_control_result::kAllowed},
+        {"RPC_CM_CREATE_APP", "user6", "any_database_name", 
access_control_result::kDenied},
+        {"RPC_CM_CREATE_APP", "user6", "database2", 
access_control_result::kDenied},
+        {"TASK_CODE_INVALID", "user7", "database3", 
access_control_result::kDenied},
+        {"RPC_CM_LIST_NODES", "user7", "database3", 
access_control_result::kAllowed},
+        {"RPC_CM_LIST_NODES", "user8", "database3", 
access_control_result::kDenied},
+        // RPC_CM_LIST_APPS has been removed from global resources.
+        {"RPC_CM_LIST_APPS", "user7", "database3", 
access_control_result::kDenied},
+        {"RPC_CM_LIST_APPS", "user8", "database3", 
access_control_result::kDenied},
+        {"TASK_CODE_INVALID", "user9", "database4", 
access_control_result::kDenied},
+        {"RPC_CM_LIST_NODES", "user9", "database4", 
access_control_result::kDenied},
+        {"RPC_CM_LIST_NODES", "user10", "database4", 
access_control_result::kDenied},
+        {"RPC_CM_LIST_APPS", "user9", "database4", 
access_control_result::kDenied},
+        {"RPC_CM_LIST_APPS", "user10", "database4", 
access_control_result::kDenied},
+        {"RPC_CM_CONTROL_META", "user9", "database4", 
access_control_result::kAllowed},
+        {"RPC_CM_CONTROL_META", "user10", "database4", 
access_control_result::kDenied}};
     for (const auto &test : tests) {
         auto code = task_code::try_get(test.rpc_code, TASK_CODE_INVALID);
         auto actual_result = allowed(code, test.user_name, test.database_name);
diff --git a/src/runtime/test/ranger_resource_policy_test.cpp 
b/src/runtime/test/ranger_resource_policy_test.cpp
index fbf6bf718..fcb924803 100644
--- a/src/runtime/test/ranger_resource_policy_test.cpp
+++ b/src/runtime/test/ranger_resource_policy_test.cpp
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <fmt/core.h>
 #include <gtest/gtest-message.h>
 #include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
@@ -57,29 +58,127 @@ TEST(ranger_resource_policy_test, acl_policies_allowed)
 {
     acl_policies policy;
     policy.allow_policies = {{access_type::kRead | access_type::kWrite | 
access_type::kCreate,
-                              {"user1", "user2", "user3", "user4"}}};
-    policy.allow_policies_exclude = {{access_type::kWrite | 
access_type::kCreate, {"user2"}}};
-    policy.deny_policies = {{access_type::kRead | access_type::kWrite, 
{"user3", "user4"}}};
-    policy.deny_policies_exclude = {{access_type::kRead, {"user4"}}};
+                              {"user1", "user2", "user3"}},
+                             {access_type::kRead | access_type::kWrite | 
access_type::kCreate,
+                              {"user4", "user5", "user6"}}};
+    policy.allow_policies_exclude = {{access_type::kWrite | 
access_type::kCreate, {"user2"}},
+                                     {access_type::kWrite | 
access_type::kCreate, {"user5"}}};
+    policy.deny_policies = {{access_type::kRead | access_type::kWrite, 
{"user3", "user4"}},
+                            {access_type::kRead | access_type::kWrite, 
{"user5", "user6"}}};
+    policy.deny_policies_exclude = {{access_type::kRead, {"user4"}},
+                                    {access_type::kWrite, {"user6"}}};
     struct test_case
     {
         access_type ac_type;
         std::string user_name;
-        bool expected_result;
-    } tests[] = {{access_type::kRead, "user", false},      
{access_type::kRead, "user1", true},
-                 {access_type::kWrite, "user1", true},     
{access_type::kCreate, "user1", true},
-                 {access_type::kDrop, "user1", false},     
{access_type::kList, "user1", false},
-                 {access_type::kMetadata, "user1", false}, 
{access_type::kControl, "user1", false},
-                 {access_type::kRead, "user2", true},      
{access_type::kWrite, "user2", false},
-                 {access_type::kCreate, "user2", false},   
{access_type::kDrop, "user2", false},
-                 {access_type::kList, "user2", false},     
{access_type::kMetadata, "user2", false},
-                 {access_type::kControl, "user2", false},  
{access_type::kRead, "user3", false},
-                 {access_type::kCreate, "user3", true},    
{access_type::kList, "user3", false},
-                 {access_type::kRead, "user4", true},      
{access_type::kWrite, "user4", false},
-                 {access_type::kCreate, "user4", true},    
{access_type::kList, "user4", false}};
+        policy_check_type check_type;
+        policy_check_status expected_result;
+    } tests[] = {
+        // not in any "allow_policies"
+        {access_type::kRead, "user", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        // not in any "deny_policies"
+        {access_type::kRead, "user", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kRead, "user1", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // not in any 'deny_policies'
+        {access_type::kRead, "user1", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // not in any "allow_policies"
+        {access_type::kList, "user1", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        // not in any "deny_policies"
+        {access_type::kList, "user1", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kRead, "user2", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // not in any "deny_policies"
+        {access_type::kRead, "user2", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // in a 'allow_policies' and in a 'allow_policies_exclude'
+        {access_type::kWrite, "user2", policy_check_type::kAllow, 
policy_check_status::kPending},
+        // not in any "deny_policies"
+        {access_type::kWrite, "user2", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // in a 'allow_policies' and in a 'allow_policies_exclude'
+        {access_type::kCreate, "user2", policy_check_type::kAllow, 
policy_check_status::kPending},
+        // not in any "deny_policies"
+        {access_type::kCreate, "user2", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kRead, "user3", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // in a 'deny_policies' and not in any 'deny_policies_exclude'
+        {access_type::kRead, "user3", policy_check_type::kDeny, 
policy_check_status::kDenied},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kCreate, "user3", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // not in any "deny_policies"
+        {access_type::kCreate, "user3", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // not in any "allow_policies"
+        {access_type::kList, "user3", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        // not in any "deny_policies"
+        {access_type::kList, "user3", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kRead, "user4", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // in a 'deny_policies' and in a 'deny_policies_exclude'
+        {access_type::kRead, "user4", policy_check_type::kDeny, 
policy_check_status::kPending},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kWrite, "user4", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // in a 'deny_policies' and not in any 'deny_policies_exclude'
+        {access_type::kWrite, "user4", policy_check_type::kDeny, 
policy_check_status::kDenied},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kCreate, "user4", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // not in any "deny_policies"
+        {access_type::kCreate, "user4", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // not in any "allow_policies"
+        {access_type::kList, "user4", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        // not in any "deny_policies"
+        {access_type::kList, "user4", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kRead, "user5", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // in a 'deny_policies' and  not in any 'deny_policies_exclude'
+        {access_type::kRead, "user5", policy_check_type::kDeny, 
policy_check_status::kDenied},
+        // in a 'allow_policies' and in a 'allow_policies_exclude'
+        {access_type::kWrite, "user5", policy_check_type::kAllow, 
policy_check_status::kPending},
+        // in a 'deny_policies' and not in any 'deny_policies_exclude'
+        {access_type::kWrite, "user5", policy_check_type::kDeny, 
policy_check_status::kDenied},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kCreate, "user5", policy_check_type::kAllow, 
policy_check_status::kPending},
+        // not in any "deny_policies"
+        {access_type::kCreate, "user5", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // not in any "allow_policies"
+        {access_type::kList, "user5", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        // not in any "deny_policies"
+        {access_type::kList, "user5", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kRead, "user6", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // in a 'deny_policies' and  not in any 'deny_policies_exclude'
+        {access_type::kRead, "user6", policy_check_type::kDeny, 
policy_check_status::kDenied},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kWrite, "user6", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // in a 'deny_policies' and in a 'deny_policies_exclude'
+        {access_type::kWrite, "user6", policy_check_type::kDeny, 
policy_check_status::kPending},
+        // in a 'allow_policies' and not in any 'allow_policies_exclude'
+        {access_type::kCreate, "user6", policy_check_type::kAllow, 
policy_check_status::kAllowed},
+        // not in any "deny_policies"
+        {access_type::kCreate, "user6", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+        // not in any "allow_policies"
+        {access_type::kList, "user6", policy_check_type::kAllow, 
policy_check_status::kNotMatched},
+        // not in any "deny_policies"
+        {access_type::kList, "user6", policy_check_type::kDeny, 
policy_check_status::kNotMatched},
+    };
     for (const auto &test : tests) {
-        auto actual_result = policy.allowed(test.ac_type, test.user_name);
-        EXPECT_EQ(test.expected_result, actual_result);
+        policy_check_status actual_result = policy_check_status::kInvalid;
+        switch (test.check_type) {
+        case policy_check_type::kAllow:
+            actual_result =
+                policy.policies_check<policy_check_type::kAllow>(test.ac_type, 
test.user_name);
+            break;
+        case policy_check_type::kDeny:
+            actual_result =
+                policy.policies_check<policy_check_type::kDeny>(test.ac_type, 
test.user_name);
+            break;
+        case policy_check_type::kInvalid:
+        default:
+            break;
+        }
+        EXPECT_EQ(test.expected_result, actual_result)
+            << fmt::format("ac_type: {}, user_name: {}, check_type: {}",
+                           enum_to_string(test.ac_type),
+                           test.user_name,
+                           enum_to_string(test.check_type));
     }
 }
 } // namespace ranger


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to