yuqi1129 commented on code in PR #6100:
URL: https://github.com/apache/gravitino/pull/6100#discussion_r1903401304


##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHDFSPlugin.java:
##########
@@ -118,27 +127,372 @@ public List<String> policyResourceDefinesRule() {
     return ImmutableList.of(RangerDefines.PolicyResource.PATH.getName());
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<RangerPolicy> policies = wildcardSearchPolies(authzMetadataObject);
+    if (!policies.isEmpty()) {
+      /**
+       * Because Ranger doesn't support the precise search, Ranger will return 
the policy meets the
+       * wildcard(*,?) conditions, If you use `/a/b` condition to search 
policy, the Ranger will
+       * match `/a/b1`, `/a/b2`, `/a/b*`, So we need to manually precisely 
filter this research
+       * results.
+       */
+      List<String> nsMetadataObj = authzMetadataObject.names();
+      PathBasedMetadataObject pathAuthzMetadataObject =
+          (PathBasedMetadataObject) authzMetadataObject;
+      Map<String, String> preciseFilters = new HashMap<>();
+      for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+        preciseFilters.put(policyResourceDefinesRule().get(i), 
pathAuthzMetadataObject.path());
+      }
+      policies =
+          policies.stream()
+              .filter(
+                  policy ->
+                      policy.getResources().entrySet().stream()
+                          .allMatch(
+                              entry ->
+                                  preciseFilters.containsKey(entry.getKey())
+                                      && entry.getValue().getValues().size() 
== 1
+                                      && entry
+                                          .getValue()
+                                          .getValues()
+                                          
.contains(preciseFilters.get(entry.getKey()))))
+              .collect(Collectors.toList());
+    }
+    // Only return the policies that are managed by Gravitino.
+    if (policies.size() > 1) {
+      throw new AuthorizationPluginException("Each metadata object can have at 
most one policy.");
+    }
+
+    if (policies.isEmpty()) {
+      return null;
+    }
+
+    RangerPolicy policy = policies.get(0);
+    // Delegating Gravitino management policies cannot contain duplicate 
privilege
+    policy.getPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    policy.getDenyPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getRowFilterPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getDataMaskPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+
+    return policy;
+  }
+
+  @Override
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    Preconditions.checkArgument(authzMetadataObject instanceof 
PathBasedMetadataObject);
+    PathBasedMetadataObject pathBasedMetadataObject = 
(PathBasedMetadataObject) authzMetadataObject;
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    resourceDefines.stream()
+        .forEach(
+            resourceDefine -> {
+              searchFilters.put(
+                  SearchFilter.RESOURCE_PREFIX + resourceDefine, 
pathBasedMetadataObject.path());
+            });
+    try {
+      List<RangerPolicy> policies = rangerClient.findPolicies(searchFilters);
+      return policies;

Review Comment:
   Directly `return rangerClient.findPolicies(searchFilters)`;



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHDFSPlugin.java:
##########
@@ -118,27 +127,372 @@ public List<String> policyResourceDefinesRule() {
     return ImmutableList.of(RangerDefines.PolicyResource.PATH.getName());
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<RangerPolicy> policies = wildcardSearchPolies(authzMetadataObject);
+    if (!policies.isEmpty()) {
+      /**
+       * Because Ranger doesn't support the precise search, Ranger will return 
the policy meets the
+       * wildcard(*,?) conditions, If you use `/a/b` condition to search 
policy, the Ranger will
+       * match `/a/b1`, `/a/b2`, `/a/b*`, So we need to manually precisely 
filter this research
+       * results.
+       */
+      List<String> nsMetadataObj = authzMetadataObject.names();
+      PathBasedMetadataObject pathAuthzMetadataObject =
+          (PathBasedMetadataObject) authzMetadataObject;
+      Map<String, String> preciseFilters = new HashMap<>();
+      for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+        preciseFilters.put(policyResourceDefinesRule().get(i), 
pathAuthzMetadataObject.path());
+      }
+      policies =
+          policies.stream()
+              .filter(
+                  policy ->
+                      policy.getResources().entrySet().stream()
+                          .allMatch(
+                              entry ->
+                                  preciseFilters.containsKey(entry.getKey())
+                                      && entry.getValue().getValues().size() 
== 1
+                                      && entry
+                                          .getValue()
+                                          .getValues()
+                                          
.contains(preciseFilters.get(entry.getKey()))))
+              .collect(Collectors.toList());
+    }
+    // Only return the policies that are managed by Gravitino.
+    if (policies.size() > 1) {
+      throw new AuthorizationPluginException("Each metadata object can have at 
most one policy.");
+    }
+
+    if (policies.isEmpty()) {
+      return null;
+    }
+
+    RangerPolicy policy = policies.get(0);
+    // Delegating Gravitino management policies cannot contain duplicate 
privilege
+    policy.getPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    policy.getDenyPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getRowFilterPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getDataMaskPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+
+    return policy;
+  }
+
+  @Override
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    Preconditions.checkArgument(authzMetadataObject instanceof 
PathBasedMetadataObject);
+    PathBasedMetadataObject pathBasedMetadataObject = 
(PathBasedMetadataObject) authzMetadataObject;
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    resourceDefines.stream()
+        .forEach(
+            resourceDefine -> {
+              searchFilters.put(
+                  SearchFilter.RESOURCE_PREFIX + resourceDefine, 
pathBasedMetadataObject.path());
+            });
+    try {
+      List<RangerPolicy> policies = rangerClient.findPolicies(searchFilters);
+      return policies;
+    } catch (RangerServiceException e) {
+      throw new AuthorizationPluginException(e, "Failed to find the policies 
in the Ranger");
+    }
+  }
+
+  /**
+   * IF rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * IF rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   */
+  @Override
+  protected void doRenameMetadataObject(
+      AuthorizationMetadataObject authzMetadataObject,
+      AuthorizationMetadataObject newAuthzMetadataObject) {
+    List<Map<String, String>> loop;
+    if (newAuthzMetadataObject.type().equals(SCHEMA)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(TABLE)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(COLUMN)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(2), 
newAuthzMetadataObject.names().get(2)));
+    } else if (newAuthzMetadataObject.type().equals(PATH)) {
+      // do nothing when fileset is renamed

Review Comment:
   If the path of a `Hive` table changes, should we need to revoke the 
privileges from the old path? 



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHDFSPlugin.java:
##########
@@ -118,27 +127,372 @@ public List<String> policyResourceDefinesRule() {
     return ImmutableList.of(RangerDefines.PolicyResource.PATH.getName());
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<RangerPolicy> policies = wildcardSearchPolies(authzMetadataObject);
+    if (!policies.isEmpty()) {
+      /**
+       * Because Ranger doesn't support the precise search, Ranger will return 
the policy meets the
+       * wildcard(*,?) conditions, If you use `/a/b` condition to search 
policy, the Ranger will
+       * match `/a/b1`, `/a/b2`, `/a/b*`, So we need to manually precisely 
filter this research
+       * results.
+       */
+      List<String> nsMetadataObj = authzMetadataObject.names();
+      PathBasedMetadataObject pathAuthzMetadataObject =
+          (PathBasedMetadataObject) authzMetadataObject;
+      Map<String, String> preciseFilters = new HashMap<>();
+      for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+        preciseFilters.put(policyResourceDefinesRule().get(i), 
pathAuthzMetadataObject.path());
+      }
+      policies =
+          policies.stream()
+              .filter(
+                  policy ->
+                      policy.getResources().entrySet().stream()
+                          .allMatch(
+                              entry ->
+                                  preciseFilters.containsKey(entry.getKey())
+                                      && entry.getValue().getValues().size() 
== 1
+                                      && entry
+                                          .getValue()
+                                          .getValues()
+                                          
.contains(preciseFilters.get(entry.getKey()))))
+              .collect(Collectors.toList());
+    }
+    // Only return the policies that are managed by Gravitino.
+    if (policies.size() > 1) {
+      throw new AuthorizationPluginException("Each metadata object can have at 
most one policy.");
+    }
+
+    if (policies.isEmpty()) {
+      return null;
+    }
+
+    RangerPolicy policy = policies.get(0);
+    // Delegating Gravitino management policies cannot contain duplicate 
privilege
+    policy.getPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    policy.getDenyPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getRowFilterPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getDataMaskPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+
+    return policy;
+  }
+
+  @Override
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    Preconditions.checkArgument(authzMetadataObject instanceof 
PathBasedMetadataObject);
+    PathBasedMetadataObject pathBasedMetadataObject = 
(PathBasedMetadataObject) authzMetadataObject;
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    resourceDefines.stream()
+        .forEach(
+            resourceDefine -> {
+              searchFilters.put(
+                  SearchFilter.RESOURCE_PREFIX + resourceDefine, 
pathBasedMetadataObject.path());
+            });
+    try {
+      List<RangerPolicy> policies = rangerClient.findPolicies(searchFilters);
+      return policies;
+    } catch (RangerServiceException e) {
+      throw new AuthorizationPluginException(e, "Failed to find the policies 
in the Ranger");
+    }
+  }
+
+  /**
+   * IF rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * IF rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   */
+  @Override
+  protected void doRenameMetadataObject(
+      AuthorizationMetadataObject authzMetadataObject,
+      AuthorizationMetadataObject newAuthzMetadataObject) {
+    List<Map<String, String>> loop;
+    if (newAuthzMetadataObject.type().equals(SCHEMA)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(TABLE)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(COLUMN)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(2), 
newAuthzMetadataObject.names().get(2)));
+    } else if (newAuthzMetadataObject.type().equals(PATH)) {
+      // do nothing when fileset is renamed
+      return;
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported metadata object type: " + authzMetadataObject.type());
+    }
+
+    List<String> oldMetadataNames = new ArrayList<>();
+    List<String> newMetadataNames = new ArrayList<>();
+    for (int index = 0; index < loop.size(); index++) {
+      
oldMetadataNames.add(loop.get(index).keySet().stream().findFirst().get());
+      
newMetadataNames.add(loop.get(index).values().stream().findFirst().get());
+
+      AuthorizationMetadataObject.Type type =
+          (index == 0
+              ? RangerHadoopSQLMetadataObject.Type.SCHEMA
+              : (index == 1
+                  ? RangerHadoopSQLMetadataObject.Type.TABLE
+                  : RangerHadoopSQLMetadataObject.Type.COLUMN));

Review Comment:
   Those codes are not easy to understand



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHDFSPlugin.java:
##########
@@ -118,27 +127,372 @@ public List<String> policyResourceDefinesRule() {
     return ImmutableList.of(RangerDefines.PolicyResource.PATH.getName());
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<RangerPolicy> policies = wildcardSearchPolies(authzMetadataObject);
+    if (!policies.isEmpty()) {
+      /**
+       * Because Ranger doesn't support the precise search, Ranger will return 
the policy meets the
+       * wildcard(*,?) conditions, If you use `/a/b` condition to search 
policy, the Ranger will
+       * match `/a/b1`, `/a/b2`, `/a/b*`, So we need to manually precisely 
filter this research
+       * results.
+       */
+      List<String> nsMetadataObj = authzMetadataObject.names();
+      PathBasedMetadataObject pathAuthzMetadataObject =
+          (PathBasedMetadataObject) authzMetadataObject;
+      Map<String, String> preciseFilters = new HashMap<>();
+      for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+        preciseFilters.put(policyResourceDefinesRule().get(i), 
pathAuthzMetadataObject.path());
+      }
+      policies =
+          policies.stream()
+              .filter(
+                  policy ->
+                      policy.getResources().entrySet().stream()
+                          .allMatch(
+                              entry ->
+                                  preciseFilters.containsKey(entry.getKey())
+                                      && entry.getValue().getValues().size() 
== 1
+                                      && entry
+                                          .getValue()
+                                          .getValues()
+                                          
.contains(preciseFilters.get(entry.getKey()))))
+              .collect(Collectors.toList());
+    }
+    // Only return the policies that are managed by Gravitino.
+    if (policies.size() > 1) {
+      throw new AuthorizationPluginException("Each metadata object can have at 
most one policy.");
+    }
+
+    if (policies.isEmpty()) {
+      return null;
+    }
+
+    RangerPolicy policy = policies.get(0);
+    // Delegating Gravitino management policies cannot contain duplicate 
privilege
+    policy.getPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    policy.getDenyPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getRowFilterPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getDataMaskPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+
+    return policy;
+  }
+
+  @Override
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    Preconditions.checkArgument(authzMetadataObject instanceof 
PathBasedMetadataObject);
+    PathBasedMetadataObject pathBasedMetadataObject = 
(PathBasedMetadataObject) authzMetadataObject;
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    resourceDefines.stream()
+        .forEach(
+            resourceDefine -> {
+              searchFilters.put(
+                  SearchFilter.RESOURCE_PREFIX + resourceDefine, 
pathBasedMetadataObject.path());
+            });
+    try {
+      List<RangerPolicy> policies = rangerClient.findPolicies(searchFilters);
+      return policies;
+    } catch (RangerServiceException e) {
+      throw new AuthorizationPluginException(e, "Failed to find the policies 
in the Ranger");
+    }
+  }
+
+  /**
+   * IF rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * IF rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   */
+  @Override
+  protected void doRenameMetadataObject(
+      AuthorizationMetadataObject authzMetadataObject,
+      AuthorizationMetadataObject newAuthzMetadataObject) {
+    List<Map<String, String>> loop;
+    if (newAuthzMetadataObject.type().equals(SCHEMA)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(TABLE)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(COLUMN)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(2), 
newAuthzMetadataObject.names().get(2)));
+    } else if (newAuthzMetadataObject.type().equals(PATH)) {
+      // do nothing when fileset is renamed
+      return;
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported metadata object type: " + authzMetadataObject.type());
+    }
+
+    List<String> oldMetadataNames = new ArrayList<>();
+    List<String> newMetadataNames = new ArrayList<>();
+    for (int index = 0; index < loop.size(); index++) {
+      
oldMetadataNames.add(loop.get(index).keySet().stream().findFirst().get());
+      
newMetadataNames.add(loop.get(index).values().stream().findFirst().get());
+
+      AuthorizationMetadataObject.Type type =
+          (index == 0
+              ? RangerHadoopSQLMetadataObject.Type.SCHEMA
+              : (index == 1
+                  ? RangerHadoopSQLMetadataObject.Type.TABLE
+                  : RangerHadoopSQLMetadataObject.Type.COLUMN));
+      AuthorizationMetadataObject oldHadoopSQLMetadataObject =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(oldMetadataNames),
+              AuthorizationMetadataObject.getLastName(oldMetadataNames),
+              type);
+      AuthorizationMetadataObject newHadoopSQLMetadataObject =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(newMetadataNames),
+              AuthorizationMetadataObject.getLastName(newMetadataNames),
+              type);
+      updatePolicyByMetadataObject(
+          MetadataObject.Type.SCHEMA, oldHadoopSQLMetadataObject, 
newHadoopSQLMetadataObject);
+    }
+  }
+

Review Comment:
   Why the operation type here is always `MetadataObject.Type.SCHEMA`?



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHDFSPlugin.java:
##########
@@ -118,27 +127,372 @@ public List<String> policyResourceDefinesRule() {
     return ImmutableList.of(RangerDefines.PolicyResource.PATH.getName());
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<RangerPolicy> policies = wildcardSearchPolies(authzMetadataObject);
+    if (!policies.isEmpty()) {
+      /**
+       * Because Ranger doesn't support the precise search, Ranger will return 
the policy meets the
+       * wildcard(*,?) conditions, If you use `/a/b` condition to search 
policy, the Ranger will
+       * match `/a/b1`, `/a/b2`, `/a/b*`, So we need to manually precisely 
filter this research
+       * results.
+       */
+      List<String> nsMetadataObj = authzMetadataObject.names();
+      PathBasedMetadataObject pathAuthzMetadataObject =
+          (PathBasedMetadataObject) authzMetadataObject;
+      Map<String, String> preciseFilters = new HashMap<>();
+      for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+        preciseFilters.put(policyResourceDefinesRule().get(i), 
pathAuthzMetadataObject.path());
+      }
+      policies =
+          policies.stream()
+              .filter(
+                  policy ->
+                      policy.getResources().entrySet().stream()
+                          .allMatch(
+                              entry ->
+                                  preciseFilters.containsKey(entry.getKey())
+                                      && entry.getValue().getValues().size() 
== 1
+                                      && entry
+                                          .getValue()
+                                          .getValues()
+                                          
.contains(preciseFilters.get(entry.getKey()))))
+              .collect(Collectors.toList());
+    }
+    // Only return the policies that are managed by Gravitino.
+    if (policies.size() > 1) {
+      throw new AuthorizationPluginException("Each metadata object can have at 
most one policy.");
+    }
+
+    if (policies.isEmpty()) {
+      return null;
+    }
+
+    RangerPolicy policy = policies.get(0);
+    // Delegating Gravitino management policies cannot contain duplicate 
privilege
+    policy.getPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    policy.getDenyPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getRowFilterPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getDataMaskPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+
+    return policy;
+  }
+
+  @Override
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    Preconditions.checkArgument(authzMetadataObject instanceof 
PathBasedMetadataObject);
+    PathBasedMetadataObject pathBasedMetadataObject = 
(PathBasedMetadataObject) authzMetadataObject;
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    resourceDefines.stream()
+        .forEach(
+            resourceDefine -> {
+              searchFilters.put(
+                  SearchFilter.RESOURCE_PREFIX + resourceDefine, 
pathBasedMetadataObject.path());
+            });
+    try {
+      List<RangerPolicy> policies = rangerClient.findPolicies(searchFilters);
+      return policies;
+    } catch (RangerServiceException e) {
+      throw new AuthorizationPluginException(e, "Failed to find the policies 
in the Ranger");
+    }
+  }
+
+  /**
+   * IF rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * IF rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   */
+  @Override
+  protected void doRenameMetadataObject(
+      AuthorizationMetadataObject authzMetadataObject,
+      AuthorizationMetadataObject newAuthzMetadataObject) {
+    List<Map<String, String>> loop;

Review Comment:
   loop is not a very meaningful name



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHDFSPlugin.java:
##########
@@ -118,27 +127,372 @@ public List<String> policyResourceDefinesRule() {
     return ImmutableList.of(RangerDefines.PolicyResource.PATH.getName());
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<RangerPolicy> policies = wildcardSearchPolies(authzMetadataObject);
+    if (!policies.isEmpty()) {
+      /**
+       * Because Ranger doesn't support the precise search, Ranger will return 
the policy meets the
+       * wildcard(*,?) conditions, If you use `/a/b` condition to search 
policy, the Ranger will
+       * match `/a/b1`, `/a/b2`, `/a/b*`, So we need to manually precisely 
filter this research
+       * results.
+       */
+      List<String> nsMetadataObj = authzMetadataObject.names();
+      PathBasedMetadataObject pathAuthzMetadataObject =
+          (PathBasedMetadataObject) authzMetadataObject;
+      Map<String, String> preciseFilters = new HashMap<>();
+      for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+        preciseFilters.put(policyResourceDefinesRule().get(i), 
pathAuthzMetadataObject.path());
+      }
+      policies =
+          policies.stream()
+              .filter(
+                  policy ->
+                      policy.getResources().entrySet().stream()
+                          .allMatch(
+                              entry ->
+                                  preciseFilters.containsKey(entry.getKey())
+                                      && entry.getValue().getValues().size() 
== 1
+                                      && entry
+                                          .getValue()
+                                          .getValues()
+                                          
.contains(preciseFilters.get(entry.getKey()))))
+              .collect(Collectors.toList());
+    }
+    // Only return the policies that are managed by Gravitino.
+    if (policies.size() > 1) {
+      throw new AuthorizationPluginException("Each metadata object can have at 
most one policy.");
+    }
+
+    if (policies.isEmpty()) {
+      return null;
+    }
+
+    RangerPolicy policy = policies.get(0);
+    // Delegating Gravitino management policies cannot contain duplicate 
privilege
+    policy.getPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    policy.getDenyPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getRowFilterPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getDataMaskPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+
+    return policy;
+  }
+
+  @Override
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    Preconditions.checkArgument(authzMetadataObject instanceof 
PathBasedMetadataObject);
+    PathBasedMetadataObject pathBasedMetadataObject = 
(PathBasedMetadataObject) authzMetadataObject;
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    resourceDefines.stream()
+        .forEach(
+            resourceDefine -> {
+              searchFilters.put(
+                  SearchFilter.RESOURCE_PREFIX + resourceDefine, 
pathBasedMetadataObject.path());
+            });
+    try {
+      List<RangerPolicy> policies = rangerClient.findPolicies(searchFilters);
+      return policies;
+    } catch (RangerServiceException e) {
+      throw new AuthorizationPluginException(e, "Failed to find the policies 
in the Ranger");
+    }
+  }
+
+  /**
+   * IF rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * IF rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   */
+  @Override
+  protected void doRenameMetadataObject(
+      AuthorizationMetadataObject authzMetadataObject,
+      AuthorizationMetadataObject newAuthzMetadataObject) {
+    List<Map<String, String>> loop;
+    if (newAuthzMetadataObject.type().equals(SCHEMA)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(TABLE)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(COLUMN)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(2), 
newAuthzMetadataObject.names().get(2)));
+    } else if (newAuthzMetadataObject.type().equals(PATH)) {
+      // do nothing when fileset is renamed
+      return;
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported metadata object type: " + authzMetadataObject.type());
+    }
+
+    List<String> oldMetadataNames = new ArrayList<>();
+    List<String> newMetadataNames = new ArrayList<>();
+    for (int index = 0; index < loop.size(); index++) {
+      
oldMetadataNames.add(loop.get(index).keySet().stream().findFirst().get());
+      
newMetadataNames.add(loop.get(index).values().stream().findFirst().get());
+
+      AuthorizationMetadataObject.Type type =
+          (index == 0
+              ? RangerHadoopSQLMetadataObject.Type.SCHEMA
+              : (index == 1
+                  ? RangerHadoopSQLMetadataObject.Type.TABLE
+                  : RangerHadoopSQLMetadataObject.Type.COLUMN));
+      AuthorizationMetadataObject oldHadoopSQLMetadataObject =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(oldMetadataNames),
+              AuthorizationMetadataObject.getLastName(oldMetadataNames),
+              type);
+      AuthorizationMetadataObject newHadoopSQLMetadataObject =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(newMetadataNames),
+              AuthorizationMetadataObject.getLastName(newMetadataNames),
+              type);
+      updatePolicyByMetadataObject(
+          MetadataObject.Type.SCHEMA, oldHadoopSQLMetadataObject, 
newHadoopSQLMetadataObject);
+    }
+  }
+
+  @Override
+  protected void updatePolicyByMetadataObject(
+      MetadataObject.Type operationType,
+      AuthorizationMetadataObject oldAuthzMetaobject,
+      AuthorizationMetadataObject newAuthzMetaobject) {
+    List<RangerPolicy> oldPolicies = wildcardSearchPolies(oldAuthzMetaobject);
+    List<RangerPolicy> existNewPolicies = 
wildcardSearchPolies(newAuthzMetaobject);
+    if (oldPolicies.isEmpty()) {
+      LOG.warn("Cannot find the Ranger policy for the metadata object({})!", 
oldAuthzMetaobject);

Review Comment:
   If the old policies is empty, according to the code path below, I believe 
you can perform a fast return. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to