FANNG1 commented on code in PR #5701:
URL: https://github.com/apache/gravitino/pull/5701#discussion_r1865515898


##########
bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.oss.credential;
+
+import com.aliyun.credentials.Client;
+import com.aliyun.credentials.models.Config;
+import com.aliyun.credentials.models.CredentialModel;
+import com.aliyun.credentials.utils.AuthConstant;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.OSSTokenCredential;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.config.OSSCredentialConfig;
+import org.apache.gravitino.oss.credential.policy.Condition;
+import org.apache.gravitino.oss.credential.policy.Effect;
+import org.apache.gravitino.oss.credential.policy.Policy;
+import org.apache.gravitino.oss.credential.policy.Statement;
+import org.apache.gravitino.oss.credential.policy.StringLike;
+
+/** Generates OSS token to access OSS data. */
+public class OSSTokenProvider implements CredentialProvider {
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private String accessKeyId;
+  private String secretAccessKey;
+  private String roleArn;
+  private String externalID;
+  private int tokenExpireSecs;
+  private String region;
+
+  /**
+   * Initializes the credential provider with catalog properties.
+   *
+   * @param properties catalog properties that can be used to configure the 
provider. The specific
+   *     properties required vary by implementation.
+   */
+  @Override
+  public void initialize(Map<String, String> properties) {
+    OSSCredentialConfig credentialConfig = new OSSCredentialConfig(properties);
+    this.roleArn = credentialConfig.ossRoleArn();
+    this.externalID = credentialConfig.externalID();
+    this.tokenExpireSecs = credentialConfig.tokenExpireInSecs();
+    this.accessKeyId = credentialConfig.accessKeyID();
+    this.secretAccessKey = credentialConfig.secretAccessKey();
+    this.region = credentialConfig.region();
+  }
+
+  /**
+   * Returns the type of credential, it should be identical in Gravitino.
+   *
+   * @return A string identifying the type of credentials.
+   */
+  @Override
+  public String credentialType() {
+    return OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE;
+  }
+
+  /**
+   * Obtains a credential based on the provided context information.
+   *
+   * @param context A context object providing necessary information for 
retrieving credentials.
+   * @return A Credential object containing the authentication information 
needed to access a system
+   *     or resource. Null will be returned if no credential is available.
+   */
+  @Nullable
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      return null;
+    }
+    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+    CredentialModel credentialModel =
+        createOSSCredentialModel(
+            roleArn,
+            pathBasedCredentialContext.getReadPaths(),
+            pathBasedCredentialContext.getWritePaths(),
+            pathBasedCredentialContext.getUserName());
+    return new OSSTokenCredential(
+        credentialModel.accessKeyId,
+        credentialModel.accessKeySecret,
+        credentialModel.securityToken,
+        credentialModel.expiration);
+  }
+
+  private CredentialModel createOSSCredentialModel(
+      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
+    Config config = new Config();
+    config.setAccessKeyId(accessKeyId);
+    config.setAccessKeySecret(secretAccessKey);
+    config.setType(AuthConstant.RAM_ROLE_ARN);
+    config.setRoleArn(roleArn);
+    config.setRoleSessionName(getRoleName(userName));
+    if (StringUtils.isNotBlank(externalID)) {
+      config.setExternalId(externalID);
+    }
+    config.setRoleSessionExpiration(tokenExpireSecs);
+    config.setPolicy(createPolicy(readLocations, writeLocations));
+    return new Client(config).getCredential();
+  }
+
+  // reference:
+  // 
https://help.aliyun.com/zh/oss/user-guide/tutorial-use-ram-policies-to-control-access-to-oss?spm=a2c4g.11186623.help-menu-31815.d_2_4_6_1.76901df8T7gfl8

Review Comment:
   Can you replace it with an English version?



##########
bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.oss.credential;
+
+import com.aliyun.credentials.Client;
+import com.aliyun.credentials.models.Config;
+import com.aliyun.credentials.models.CredentialModel;
+import com.aliyun.credentials.utils.AuthConstant;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.OSSTokenCredential;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.config.OSSCredentialConfig;
+import org.apache.gravitino.oss.credential.policy.Condition;
+import org.apache.gravitino.oss.credential.policy.Effect;
+import org.apache.gravitino.oss.credential.policy.Policy;
+import org.apache.gravitino.oss.credential.policy.Statement;
+import org.apache.gravitino.oss.credential.policy.StringLike;
+
+/** Generates OSS token to access OSS data. */
+public class OSSTokenProvider implements CredentialProvider {
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private String accessKeyId;
+  private String secretAccessKey;
+  private String roleArn;
+  private String externalID;
+  private int tokenExpireSecs;
+  private String region;
+
+  /**
+   * Initializes the credential provider with catalog properties.
+   *
+   * @param properties catalog properties that can be used to configure the 
provider. The specific
+   *     properties required vary by implementation.
+   */
+  @Override
+  public void initialize(Map<String, String> properties) {
+    OSSCredentialConfig credentialConfig = new OSSCredentialConfig(properties);
+    this.roleArn = credentialConfig.ossRoleArn();
+    this.externalID = credentialConfig.externalID();
+    this.tokenExpireSecs = credentialConfig.tokenExpireInSecs();
+    this.accessKeyId = credentialConfig.accessKeyID();
+    this.secretAccessKey = credentialConfig.secretAccessKey();
+    this.region = credentialConfig.region();
+  }
+
+  /**
+   * Returns the type of credential, it should be identical in Gravitino.
+   *
+   * @return A string identifying the type of credentials.
+   */
+  @Override
+  public String credentialType() {
+    return OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE;
+  }
+
+  /**
+   * Obtains a credential based on the provided context information.
+   *
+   * @param context A context object providing necessary information for 
retrieving credentials.
+   * @return A Credential object containing the authentication information 
needed to access a system
+   *     or resource. Null will be returned if no credential is available.
+   */
+  @Nullable
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      return null;
+    }
+    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+    CredentialModel credentialModel =
+        createOSSCredentialModel(
+            roleArn,
+            pathBasedCredentialContext.getReadPaths(),
+            pathBasedCredentialContext.getWritePaths(),
+            pathBasedCredentialContext.getUserName());
+    return new OSSTokenCredential(
+        credentialModel.accessKeyId,
+        credentialModel.accessKeySecret,
+        credentialModel.securityToken,
+        credentialModel.expiration);
+  }
+
+  private CredentialModel createOSSCredentialModel(
+      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
+    Config config = new Config();
+    config.setAccessKeyId(accessKeyId);
+    config.setAccessKeySecret(secretAccessKey);
+    config.setType(AuthConstant.RAM_ROLE_ARN);
+    config.setRoleArn(roleArn);
+    config.setRoleSessionName(getRoleName(userName));
+    if (StringUtils.isNotBlank(externalID)) {
+      config.setExternalId(externalID);
+    }
+    config.setRoleSessionExpiration(tokenExpireSecs);
+    config.setPolicy(createPolicy(readLocations, writeLocations));
+    return new Client(config).getCredential();
+  }
+
+  // reference:
+  // 
https://help.aliyun.com/zh/oss/user-guide/tutorial-use-ram-policies-to-control-access-to-oss?spm=a2c4g.11186623.help-menu-31815.d_2_4_6_1.76901df8T7gfl8
+  public String createPolicy(Set<String> readLocations, Set<String> 
writeLocations) {
+    Policy.Builder policyBuilder = Policy.builder().version("1");
+
+    // Allow read and write access to the specified locations
+    Statement.Builder allowGetObjectStatementBuilder =
+        Statement.builder()
+            .effect(Effect.ALLOW)
+            .addAction("oss:GetObject")
+            .addAction("oss:GetObjectVersion");
+    // Add support for bucket-level policies
+    Map<String, Statement.Builder> bucketListStatementBuilder = new 
HashMap<>();
+    Map<String, Statement.Builder> bucketGetLocationStatementBuilder = new 
HashMap<>();
+
+    String arnPrefix = getArnPrefix();
+    Stream.concat(readLocations.stream(), writeLocations.stream())
+        .distinct()
+        .forEach(
+            location -> {
+              URI uri = URI.create(location);
+              
allowGetObjectStatementBuilder.addResource(getOssUriWithArn(arnPrefix, uri));
+              String bucketArn = arnPrefix + getBucketName(uri);
+              // ListBucket
+              bucketListStatementBuilder.computeIfAbsent(
+                  bucketArn,
+                  key ->
+                      Statement.builder()
+                          .effect(Effect.ALLOW)
+                          .addAction("oss:ListBucket")
+                          .addResource(key)
+                          .condition(getCondition(uri)));
+              // GetBucketLocation
+              bucketGetLocationStatementBuilder.computeIfAbsent(
+                  bucketArn,
+                  key ->
+                      Statement.builder()
+                          .effect(Effect.ALLOW)
+                          .addAction("oss:GetBucketLocation")
+                          .addResource(key));
+            });
+
+    if (!writeLocations.isEmpty()) {
+      Statement.Builder allowPutObjectStatementBuilder =
+          Statement.builder()
+              .effect(Effect.ALLOW)
+              .addAction("oss:PutObject")
+              .addAction("oss:DeleteObject");
+      writeLocations.forEach(
+          location -> {
+            URI uri = URI.create(location);
+            
allowPutObjectStatementBuilder.addResource(getOssUriWithArn(arnPrefix, uri));
+          });
+      policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
+    }
+
+    if (!bucketListStatementBuilder.isEmpty()) {
+      bucketListStatementBuilder
+          .values()
+          .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
+    } else {
+      // add list privilege with 0 resources
+      policyBuilder.addStatement(
+          
Statement.builder().effect(Effect.ALLOW).addAction("oss:ListBucket").build());
+    }
+    bucketGetLocationStatementBuilder
+        .values()
+        .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
+
+    policyBuilder.addStatement(allowGetObjectStatementBuilder.build());
+    try {
+      return objectMapper.writeValueAsString(policyBuilder.build());
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Condition getCondition(URI uri) {
+    return Condition.builder()
+        .stringLike(
+            StringLike.builder()
+                .addPrefix(concatPathWithSep(trimLeadingSlash(uri.getPath()), 
"*", "/"))
+                .build())
+        .build();
+  }
+
+  private String getArnPrefix() {
+    if (StringUtils.isNotEmpty(region)) {
+      return "acs:oss:" + region + ":*:";
+    }
+    return "acs:oss:*:*:";
+  }
+
+  private String getBucketName(URI uri) {
+    return uri.getHost();
+  }
+
+  private String getOssUriWithArn(String arnPrefix, URI uri) {

Review Comment:
   could you add UT for this class?



##########
bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.oss.credential;
+
+import com.aliyun.credentials.Client;
+import com.aliyun.credentials.models.Config;
+import com.aliyun.credentials.models.CredentialModel;
+import com.aliyun.credentials.utils.AuthConstant;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.OSSTokenCredential;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.config.OSSCredentialConfig;
+import org.apache.gravitino.oss.credential.policy.Condition;
+import org.apache.gravitino.oss.credential.policy.Effect;
+import org.apache.gravitino.oss.credential.policy.Policy;
+import org.apache.gravitino.oss.credential.policy.Statement;
+import org.apache.gravitino.oss.credential.policy.StringLike;
+
+/** Generates OSS token to access OSS data. */
+public class OSSTokenProvider implements CredentialProvider {
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private String accessKeyId;
+  private String secretAccessKey;
+  private String roleArn;
+  private String externalID;
+  private int tokenExpireSecs;
+  private String region;
+
+  /**
+   * Initializes the credential provider with catalog properties.
+   *
+   * @param properties catalog properties that can be used to configure the 
provider. The specific
+   *     properties required vary by implementation.
+   */
+  @Override
+  public void initialize(Map<String, String> properties) {
+    OSSCredentialConfig credentialConfig = new OSSCredentialConfig(properties);
+    this.roleArn = credentialConfig.ossRoleArn();
+    this.externalID = credentialConfig.externalID();
+    this.tokenExpireSecs = credentialConfig.tokenExpireInSecs();
+    this.accessKeyId = credentialConfig.accessKeyID();
+    this.secretAccessKey = credentialConfig.secretAccessKey();
+    this.region = credentialConfig.region();
+  }
+
+  /**
+   * Returns the type of credential, it should be identical in Gravitino.
+   *
+   * @return A string identifying the type of credentials.
+   */
+  @Override
+  public String credentialType() {
+    return OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE;
+  }
+
+  /**
+   * Obtains a credential based on the provided context information.
+   *
+   * @param context A context object providing necessary information for 
retrieving credentials.
+   * @return A Credential object containing the authentication information 
needed to access a system
+   *     or resource. Null will be returned if no credential is available.
+   */
+  @Nullable
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      return null;
+    }
+    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+    CredentialModel credentialModel =
+        createOSSCredentialModel(
+            roleArn,
+            pathBasedCredentialContext.getReadPaths(),
+            pathBasedCredentialContext.getWritePaths(),
+            pathBasedCredentialContext.getUserName());
+    return new OSSTokenCredential(
+        credentialModel.accessKeyId,
+        credentialModel.accessKeySecret,
+        credentialModel.securityToken,
+        credentialModel.expiration);
+  }
+
+  private CredentialModel createOSSCredentialModel(
+      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
+    Config config = new Config();
+    config.setAccessKeyId(accessKeyId);
+    config.setAccessKeySecret(secretAccessKey);
+    config.setType(AuthConstant.RAM_ROLE_ARN);
+    config.setRoleArn(roleArn);
+    config.setRoleSessionName(getRoleName(userName));
+    if (StringUtils.isNotBlank(externalID)) {
+      config.setExternalId(externalID);
+    }
+    config.setRoleSessionExpiration(tokenExpireSecs);
+    config.setPolicy(createPolicy(readLocations, writeLocations));
+    return new Client(config).getCredential();

Review Comment:
   Is the reason why creating a new Client in each call is to set `userName` to 
`RoleSessionName`?



##########
bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.oss.credential;
+
+import com.aliyun.credentials.Client;
+import com.aliyun.credentials.models.Config;
+import com.aliyun.credentials.models.CredentialModel;
+import com.aliyun.credentials.utils.AuthConstant;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.OSSTokenCredential;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.config.OSSCredentialConfig;
+import org.apache.gravitino.oss.credential.policy.Condition;
+import org.apache.gravitino.oss.credential.policy.Effect;
+import org.apache.gravitino.oss.credential.policy.Policy;
+import org.apache.gravitino.oss.credential.policy.Statement;
+import org.apache.gravitino.oss.credential.policy.StringLike;
+
+/** Generates OSS token to access OSS data. */
+public class OSSTokenProvider implements CredentialProvider {
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private String accessKeyId;
+  private String secretAccessKey;
+  private String roleArn;
+  private String externalID;
+  private int tokenExpireSecs;
+  private String region;
+
+  /**
+   * Initializes the credential provider with catalog properties.
+   *
+   * @param properties catalog properties that can be used to configure the 
provider. The specific
+   *     properties required vary by implementation.
+   */
+  @Override
+  public void initialize(Map<String, String> properties) {
+    OSSCredentialConfig credentialConfig = new OSSCredentialConfig(properties);
+    this.roleArn = credentialConfig.ossRoleArn();
+    this.externalID = credentialConfig.externalID();
+    this.tokenExpireSecs = credentialConfig.tokenExpireInSecs();
+    this.accessKeyId = credentialConfig.accessKeyID();
+    this.secretAccessKey = credentialConfig.secretAccessKey();
+    this.region = credentialConfig.region();
+  }
+
+  /**
+   * Returns the type of credential, it should be identical in Gravitino.
+   *
+   * @return A string identifying the type of credentials.
+   */
+  @Override
+  public String credentialType() {
+    return OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE;
+  }
+
+  /**
+   * Obtains a credential based on the provided context information.
+   *
+   * @param context A context object providing necessary information for 
retrieving credentials.
+   * @return A Credential object containing the authentication information 
needed to access a system
+   *     or resource. Null will be returned if no credential is available.
+   */
+  @Nullable
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      return null;
+    }
+    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+    CredentialModel credentialModel =
+        createOSSCredentialModel(
+            roleArn,
+            pathBasedCredentialContext.getReadPaths(),
+            pathBasedCredentialContext.getWritePaths(),
+            pathBasedCredentialContext.getUserName());
+    return new OSSTokenCredential(
+        credentialModel.accessKeyId,
+        credentialModel.accessKeySecret,
+        credentialModel.securityToken,
+        credentialModel.expiration);
+  }
+
+  private CredentialModel createOSSCredentialModel(
+      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
+    Config config = new Config();
+    config.setAccessKeyId(accessKeyId);
+    config.setAccessKeySecret(secretAccessKey);
+    config.setType(AuthConstant.RAM_ROLE_ARN);
+    config.setRoleArn(roleArn);
+    config.setRoleSessionName(getRoleName(userName));
+    if (StringUtils.isNotBlank(externalID)) {
+      config.setExternalId(externalID);
+    }
+    config.setRoleSessionExpiration(tokenExpireSecs);
+    config.setPolicy(createPolicy(readLocations, writeLocations));
+    return new Client(config).getCredential();
+  }
+
+  // reference:
+  // 
https://help.aliyun.com/zh/oss/user-guide/tutorial-use-ram-policies-to-control-access-to-oss?spm=a2c4g.11186623.help-menu-31815.d_2_4_6_1.76901df8T7gfl8
+  public String createPolicy(Set<String> readLocations, Set<String> 
writeLocations) {
+    Policy.Builder policyBuilder = Policy.builder().version("1");
+
+    // Allow read and write access to the specified locations
+    Statement.Builder allowGetObjectStatementBuilder =
+        Statement.builder()
+            .effect(Effect.ALLOW)
+            .addAction("oss:GetObject")
+            .addAction("oss:GetObjectVersion");
+    // Add support for bucket-level policies
+    Map<String, Statement.Builder> bucketListStatementBuilder = new 
HashMap<>();
+    Map<String, Statement.Builder> bucketGetLocationStatementBuilder = new 
HashMap<>();
+
+    String arnPrefix = getArnPrefix();
+    Stream.concat(readLocations.stream(), writeLocations.stream())
+        .distinct()
+        .forEach(
+            location -> {
+              URI uri = URI.create(location);
+              
allowGetObjectStatementBuilder.addResource(getOssUriWithArn(arnPrefix, uri));
+              String bucketArn = arnPrefix + getBucketName(uri);
+              // ListBucket
+              bucketListStatementBuilder.computeIfAbsent(
+                  bucketArn,
+                  key ->
+                      Statement.builder()
+                          .effect(Effect.ALLOW)
+                          .addAction("oss:ListBucket")
+                          .addResource(key)
+                          .condition(getCondition(uri)));
+              // GetBucketLocation
+              bucketGetLocationStatementBuilder.computeIfAbsent(
+                  bucketArn,
+                  key ->
+                      Statement.builder()
+                          .effect(Effect.ALLOW)
+                          .addAction("oss:GetBucketLocation")
+                          .addResource(key));
+            });
+
+    if (!writeLocations.isEmpty()) {
+      Statement.Builder allowPutObjectStatementBuilder =
+          Statement.builder()
+              .effect(Effect.ALLOW)
+              .addAction("oss:PutObject")
+              .addAction("oss:DeleteObject");
+      writeLocations.forEach(
+          location -> {
+            URI uri = URI.create(location);
+            
allowPutObjectStatementBuilder.addResource(getOssUriWithArn(arnPrefix, uri));
+          });
+      policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
+    }
+
+    if (!bucketListStatementBuilder.isEmpty()) {
+      bucketListStatementBuilder
+          .values()
+          .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
+    } else {
+      // add list privilege with 0 resources
+      policyBuilder.addStatement(
+          
Statement.builder().effect(Effect.ALLOW).addAction("oss:ListBucket").build());
+    }
+    bucketGetLocationStatementBuilder
+        .values()
+        .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
+
+    policyBuilder.addStatement(allowGetObjectStatementBuilder.build());
+    try {
+      return objectMapper.writeValueAsString(policyBuilder.build());
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Condition getCondition(URI uri) {
+    return Condition.builder()
+        .stringLike(
+            StringLike.builder()
+                .addPrefix(concatPathWithSep(trimLeadingSlash(uri.getPath()), 
"*", "/"))
+                .build())
+        .build();
+  }
+
+  private String getArnPrefix() {
+    if (StringUtils.isNotEmpty(region)) {

Review Comment:
   Could you run pass the test if setting region explicitly?  I failed to run 
it.



##########
bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.oss.credential;
+
+import com.aliyun.credentials.Client;
+import com.aliyun.credentials.models.Config;
+import com.aliyun.credentials.models.CredentialModel;
+import com.aliyun.credentials.utils.AuthConstant;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.OSSTokenCredential;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.config.OSSCredentialConfig;
+import org.apache.gravitino.oss.credential.policy.Condition;
+import org.apache.gravitino.oss.credential.policy.Effect;
+import org.apache.gravitino.oss.credential.policy.Policy;
+import org.apache.gravitino.oss.credential.policy.Statement;
+import org.apache.gravitino.oss.credential.policy.StringLike;
+
+/** Generates OSS token to access OSS data. */
+public class OSSTokenProvider implements CredentialProvider {
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private String accessKeyId;
+  private String secretAccessKey;
+  private String roleArn;
+  private String externalID;
+  private int tokenExpireSecs;
+  private String region;
+
+  /**
+   * Initializes the credential provider with catalog properties.
+   *
+   * @param properties catalog properties that can be used to configure the 
provider. The specific
+   *     properties required vary by implementation.
+   */
+  @Override
+  public void initialize(Map<String, String> properties) {
+    OSSCredentialConfig credentialConfig = new OSSCredentialConfig(properties);
+    this.roleArn = credentialConfig.ossRoleArn();
+    this.externalID = credentialConfig.externalID();
+    this.tokenExpireSecs = credentialConfig.tokenExpireInSecs();
+    this.accessKeyId = credentialConfig.accessKeyID();
+    this.secretAccessKey = credentialConfig.secretAccessKey();
+    this.region = credentialConfig.region();
+  }
+
+  /**
+   * Returns the type of credential, it should be identical in Gravitino.
+   *
+   * @return A string identifying the type of credentials.
+   */
+  @Override
+  public String credentialType() {
+    return OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE;
+  }
+
+  /**
+   * Obtains a credential based on the provided context information.
+   *
+   * @param context A context object providing necessary information for 
retrieving credentials.
+   * @return A Credential object containing the authentication information 
needed to access a system
+   *     or resource. Null will be returned if no credential is available.
+   */
+  @Nullable
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      return null;
+    }
+    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+    CredentialModel credentialModel =
+        createOSSCredentialModel(
+            roleArn,
+            pathBasedCredentialContext.getReadPaths(),
+            pathBasedCredentialContext.getWritePaths(),
+            pathBasedCredentialContext.getUserName());
+    return new OSSTokenCredential(
+        credentialModel.accessKeyId,
+        credentialModel.accessKeySecret,
+        credentialModel.securityToken,
+        credentialModel.expiration);
+  }
+
+  private CredentialModel createOSSCredentialModel(
+      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
+    Config config = new Config();
+    config.setAccessKeyId(accessKeyId);
+    config.setAccessKeySecret(secretAccessKey);
+    config.setType(AuthConstant.RAM_ROLE_ARN);
+    config.setRoleArn(roleArn);
+    config.setRoleSessionName(getRoleName(userName));
+    if (StringUtils.isNotBlank(externalID)) {
+      config.setExternalId(externalID);
+    }
+    config.setRoleSessionExpiration(tokenExpireSecs);
+    config.setPolicy(createPolicy(readLocations, writeLocations));
+    return new Client(config).getCredential();

Review Comment:
   Is there a leak for the client object?



##########
api/src/main/java/org/apache/gravitino/credential/OSSTokenCredential.java:
##########
@@ -0,0 +1,112 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+
+/** OSS token credential. */
+public class OSSTokenCredential implements Credential {
+
+  /** OSS token credential type. */
+  public static final String OSS_TOKEN_CREDENTIAL_TYPE = "oss-token";
+  /** OSS session access key ID used to access OSS data. */
+  public static final String GRAVITINO_OSS_SESSION_ACCESS_KEY_ID = 
"oss-access-key-id";
+  /** OSS session secret access key used to access OSS data. */
+  public static final String GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY = 
"oss-secret-access-key";
+  /** OSS session token. */
+  public static final String GRAVITINO_OSS_TOKEN = "oss-security-token";

Review Comment:
   Rename `oss-security-token` to `oss-session-token` to keep consistent with 
S3?



##########
common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java:
##########
@@ -62,6 +72,9 @@ public static Map<String, String> 
toIcebergProperties(Credential credential) {
     if (credential instanceof S3TokenCredential || credential instanceof 
S3SecretKeyCredential) {
       return transformProperties(credential.credentialInfo(), 
icebergCredentialPropertyMap);
     }
+    if (credential instanceof OSSTokenCredential) {

Review Comment:
   could you add test to transform `OSSTokenCredential`?



##########
bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/credential/policy/Policy.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.oss.credential.policy;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class Policy {
+
+  @JsonProperty("Version")
+  private String version;
+
+  @JsonProperty("Statement")
+  private List<Statement> statements;
+
+  private Policy(Builder builder) {
+    this.version = builder.version;
+    this.statements = builder.statements;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private String version;
+    private List<Statement> statements = new ArrayList<>();
+
+    public Builder version(String version) {
+      this.version = version;
+      return this;
+    }
+
+    public Builder addStatement(Statement statement) {
+      this.statements.add(statement);
+      return this;
+    }
+
+    public Policy build() {
+      return new Policy(this);
+    }
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+  public void setVersion(String version) {
+    this.version = version;
+  }
+
+  public List<Statement> getStatements() {

Review Comment:
   is it used for serialization and deserialization?



##########
bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/credential/policy/Condition.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.oss.credential.policy;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class Condition {
+
+  @JsonProperty("StringLike")
+  private StringLike stringLike;
+
+  private Condition(Builder builder) {
+    this.stringLike = builder.stringLike;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private StringLike stringLike;
+
+    public Builder stringLike(StringLike stringLike) {
+      this.stringLike = stringLike;
+      return this;
+    }
+
+    public Condition build() {
+      return new Condition(this);
+    }
+  }
+
+  public StringLike getStringLike() {

Review Comment:
   `getStringLike` and `setStringLike` is not used any more?



##########
bundles/aliyun-bundle/src/main/java/org/apache/gravitino/oss/credential/policy/Policy.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.oss.credential.policy;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class Policy {
+
+  @JsonProperty("Version")
+  private String version;
+
+  @JsonProperty("Statement")
+  private List<Statement> statements;
+
+  private Policy(Builder builder) {
+    this.version = builder.version;
+    this.statements = builder.statements;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private String version;
+    private List<Statement> statements = new ArrayList<>();
+
+    public Builder version(String version) {
+      this.version = version;
+      return this;
+    }
+
+    public Builder addStatement(Statement statement) {
+      this.statements.add(statement);
+      return this;
+    }
+
+    public Policy build() {
+      return new Policy(this);
+    }
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+  public void setVersion(String version) {
+    this.version = version;
+  }
+
+  public List<Statement> getStatements() {

Review Comment:
   remove `getStatements` and `setStatements` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to