This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 040a8917f9 AWS: Add LakeFormation Integration tests (#4423)
040a8917f9 is described below

commit 040a8917f9a049592fbdc4e20657ee2ebeac6244
Author: Xiaoxuan <[email protected]>
AuthorDate: Mon Jul 25 09:24:39 2022 -0700

    AWS: Add LakeFormation Integration tests (#4423)
---
 .../aws/lakeformation/LakeFormationTestBase.java   | 509 +++++++++++++++++++++
 .../TestLakeFormationDataOperations.java           | 125 +++++
 .../TestLakeFormationMetadataOperations.java       | 292 ++++++++++++
 .../iceberg/aws/glue/GlueTableOperations.java      |  36 +-
 4 files changed, 960 insertions(+), 2 deletions(-)

diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java
 
b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java
new file mode 100644
index 0000000000..f023761ae6
--- /dev/null
+++ 
b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.lakeformation;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.aws.AssumeRoleAwsClientFactory;
+import org.apache.iceberg.aws.AwsIntegTestUtil;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.glue.GlueCatalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.regions.PartitionMetadata;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.GlueClientBuilder;
+import software.amazon.awssdk.services.iam.IamClient;
+import software.amazon.awssdk.services.iam.model.AttachRolePolicyRequest;
+import software.amazon.awssdk.services.iam.model.CreatePolicyRequest;
+import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
+import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
+import software.amazon.awssdk.services.iam.model.DeletePolicyRequest;
+import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
+import software.amazon.awssdk.services.iam.model.DetachRolePolicyRequest;
+import software.amazon.awssdk.services.iam.model.GetPolicyVersionRequest;
+import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
+import software.amazon.awssdk.services.iam.model.NoSuchEntityException;
+import software.amazon.awssdk.services.iam.model.PolicyVersion;
+import software.amazon.awssdk.services.lakeformation.LakeFormationClient;
+import 
software.amazon.awssdk.services.lakeformation.LakeFormationClientBuilder;
+import 
software.amazon.awssdk.services.lakeformation.model.AlreadyExistsException;
+import software.amazon.awssdk.services.lakeformation.model.CatalogResource;
+import software.amazon.awssdk.services.lakeformation.model.DataLakePrincipal;
+import software.amazon.awssdk.services.lakeformation.model.DataLakeSettings;
+import 
software.amazon.awssdk.services.lakeformation.model.DataLocationResource;
+import software.amazon.awssdk.services.lakeformation.model.DatabaseResource;
+import 
software.amazon.awssdk.services.lakeformation.model.DeregisterResourceRequest;
+import 
software.amazon.awssdk.services.lakeformation.model.EntityNotFoundException;
+import 
software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsRequest;
+import 
software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsResponse;
+import 
software.amazon.awssdk.services.lakeformation.model.GrantPermissionsRequest;
+import software.amazon.awssdk.services.lakeformation.model.Permission;
+import 
software.amazon.awssdk.services.lakeformation.model.PutDataLakeSettingsRequest;
+import 
software.amazon.awssdk.services.lakeformation.model.PutDataLakeSettingsResponse;
+import 
software.amazon.awssdk.services.lakeformation.model.RegisterResourceRequest;
+import software.amazon.awssdk.services.lakeformation.model.Resource;
+import software.amazon.awssdk.services.lakeformation.model.TableResource;
+import software.amazon.awssdk.services.sts.StsClient;
+import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+
+@SuppressWarnings({"VisibilityModifier", "HideUtilityClassConstructor"})
+public class LakeFormationTestBase {
+
+  static final Logger LOG = 
LoggerFactory.getLogger(LakeFormationTestBase.class);
+  static final int IAM_PROPAGATION_DELAY = 10000;
+  static final int ASSUME_ROLE_SESSION_DURATION = 3600;
+  static final String LF_REGISTER_PATH_ROLE_PREFIX = "LFRegisterPathRole_";
+  static final String LF_PRIVILEGED_ROLE_PREFIX = "LFPrivilegedRole_";
+  static final String LF_TEST_DB_PREFIX = "lf_test_db";
+  static final String LF_TEST_TABLE_PREFIX = "lf_test_table";
+  static final String TEST_PATH_PREFIX = "iceberg-lf-test/";
+  static final String DEFAULT_IAM_POLICY_VERSION = "v1";
+  static final String LF_AUTHORIZED_CALLER_VALUE = "emr";
+  static final String LF_REGISTER_PATH_ROLE_S3_POLICY_PREFIX = 
"LFRegisterPathRoleS3Policy_";
+  static final String LF_REGISTER_PATH_ROLE_LF_POLICY_PREFIX = 
"LFRegisterPathRoleLfPolicy_";
+  static final String LF_REGISTER_PATH_ROLE_IAM_POLICY_PREFIX = 
"LFRegisterPathRoleIamPolicy_";
+  static final String LF_PRIVILEGED_ROLE_POLICY_PREFIX = 
"LFPrivilegedRoleTestPolicy_";
+
+  static Map<String, String> assumeRoleProperties;
+  static String lfRegisterPathRoleName;
+  static String lfPrivilegedRoleName;
+  static String lfRegisterPathRoleArn;
+  static String lfPrivilegedRoleArn;
+  static String lfRegisterPathRoleS3PolicyName;
+  static String lfRegisterPathRoleLfPolicyName;
+  static String lfRegisterPathRoleIamPolicyName;
+  static String lfPrivilegedRolePolicyName;
+  static DataLakePrincipal principalUnderTest;
+  static String testBucketPath = "s3://" + AwsIntegTestUtil.testBucketName() + 
"/" + TEST_PATH_PREFIX;
+  static Schema schema = new Schema(Types.NestedField.required(1, "c1", 
Types.StringType.get(), "c1"));
+  static PartitionSpec partitionSpec = 
PartitionSpec.builderFor(schema).build();
+
+  static GlueCatalog glueCatalogRegisterPathRole;
+  static GlueCatalog glueCatalogPrivilegedRole;
+  static IamClient iam;
+  static LakeFormationClient lakeformation;
+  static GlueClient glue;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    lfRegisterPathRoleName = LF_REGISTER_PATH_ROLE_PREFIX + 
UUID.randomUUID().toString();
+    lfPrivilegedRoleName = LF_PRIVILEGED_ROLE_PREFIX + 
UUID.randomUUID().toString();
+    lfRegisterPathRoleS3PolicyName = LF_REGISTER_PATH_ROLE_S3_POLICY_PREFIX + 
UUID.randomUUID().toString();
+    lfRegisterPathRoleLfPolicyName = LF_REGISTER_PATH_ROLE_LF_POLICY_PREFIX + 
UUID.randomUUID().toString();
+    lfRegisterPathRoleIamPolicyName = LF_REGISTER_PATH_ROLE_IAM_POLICY_PREFIX 
+ UUID.randomUUID().toString();
+    lfPrivilegedRolePolicyName = LF_PRIVILEGED_ROLE_POLICY_PREFIX + 
UUID.randomUUID().toString();
+
+    iam = IamClient.builder()
+        .region(Region.AWS_GLOBAL)
+        .httpClientBuilder(UrlConnectionHttpClient.builder())
+        .build();
+
+    CreateRoleResponse response = iam.createRole(CreateRoleRequest.builder()
+        .roleName(lfRegisterPathRoleName)
+        .assumeRolePolicyDocument("{" +
+            "\"Version\":\"2012-10-17\"," +
+            "\"Statement\":[{" +
+            "\"Effect\":\"Allow\"," +
+            "\"Principal\":{" +
+            "\"Service\":[\"glue.amazonaws.com\"," +
+            "\"lakeformation.amazonaws.com\"]," +
+            "\"AWS\":\"arn:aws:iam::" + AwsIntegTestUtil.testAccountId() + 
":root\"}," +
+            "\"Action\": [\"sts:AssumeRole\"]}]}")
+        .maxSessionDuration(ASSUME_ROLE_SESSION_DURATION)
+        .build());
+    lfRegisterPathRoleArn = response.role().arn();
+
+    // create and attach test policy to lfRegisterPathRole
+    createAndAttachRolePolicy(createPolicyArn(lfRegisterPathRoleS3PolicyName),
+        lfRegisterPathRoleS3PolicyName, lfRegisterPathRolePolicyDocForS3(), 
lfRegisterPathRoleName);
+    createAndAttachRolePolicy(createPolicyArn(lfRegisterPathRoleLfPolicyName),
+        lfRegisterPathRoleLfPolicyName, 
lfRegisterPathRolePolicyDocForLakeFormation(), lfRegisterPathRoleName);
+    createAndAttachRolePolicy(createPolicyArn(lfRegisterPathRoleIamPolicyName),
+        lfRegisterPathRoleIamPolicyName,
+        lfRegisterPathRolePolicyDocForIam(lfRegisterPathRoleArn), 
lfRegisterPathRoleName);
+    waitForIamConsistency();
+
+    // create lfPrivilegedRole
+    response = iam.createRole(CreateRoleRequest.builder()
+        .roleName(lfPrivilegedRoleName)
+        .assumeRolePolicyDocument("{" +
+            "\"Version\":\"2012-10-17\"," +
+            "\"Statement\":[{" +
+            "\"Effect\":\"Allow\"," +
+            "\"Principal\":{" +
+            "\"AWS\":\"arn:aws:iam::" + AwsIntegTestUtil.testAccountId() + 
":root\"}," +
+            "\"Action\": [\"sts:AssumeRole\"," +
+            "\"sts:TagSession\"]}]}")
+        .maxSessionDuration(ASSUME_ROLE_SESSION_DURATION)
+        .build());
+    lfPrivilegedRoleArn = response.role().arn();
+    principalUnderTest = 
DataLakePrincipal.builder().dataLakePrincipalIdentifier(lfPrivilegedRoleArn).build();
+
+    // create and attach test policy to lfPrivilegedRole
+    createAndAttachRolePolicy(createPolicyArn(lfPrivilegedRolePolicyName),
+        lfPrivilegedRolePolicyName, lfPrivilegedRolePolicyDoc(), 
lfPrivilegedRoleName);
+    waitForIamConsistency();
+
+    // build lf and glue client with lfRegisterPathRole
+    lakeformation = buildLakeFormationClient(lfRegisterPathRoleArn, "test_lf", 
AwsIntegTestUtil.testRegion());
+    glue = buildGlueClient(lfRegisterPathRoleArn, "test_lf", 
AwsIntegTestUtil.testRegion());
+
+    // put lf data lake settings
+    GetDataLakeSettingsResponse getDataLakeSettingsResponse =
+        
lakeformation.getDataLakeSettings(GetDataLakeSettingsRequest.builder().build());
+    PutDataLakeSettingsResponse putDataLakeSettingsResponse
+        = 
lakeformation.putDataLakeSettings(putDataLakeSettingsRequest(lfRegisterPathRoleArn,
+        getDataLakeSettingsResponse.dataLakeSettings(), true));
+
+    // Build test glueCatalog with lfPrivilegedRole
+    glueCatalogPrivilegedRole = new GlueCatalog();
+    assumeRoleProperties = Maps.newHashMap();
+    assumeRoleProperties.put("warehouse", "s3://path");
+    assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_REGION, 
AwsIntegTestUtil.testRegion());
+    assumeRoleProperties.put(AwsProperties.GLUE_LAKEFORMATION_ENABLED, "true");
+    assumeRoleProperties.put(AwsProperties.GLUE_ACCOUNT_ID, 
AwsIntegTestUtil.testAccountId());
+    assumeRoleProperties.put(AwsProperties.HTTP_CLIENT_TYPE, 
AwsProperties.HTTP_CLIENT_TYPE_APACHE);
+    assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, 
lfPrivilegedRoleArn);
+    assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX +
+        LakeFormationAwsClientFactory.LF_AUTHORIZED_CALLER, 
LF_AUTHORIZED_CALLER_VALUE);
+    glueCatalogPrivilegedRole.initialize("test_registered", 
assumeRoleProperties);
+
+    // Build test glueCatalog with lfRegisterPathRole
+    assumeRoleProperties.put(AwsProperties.GLUE_LAKEFORMATION_ENABLED, 
"false");
+    assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, 
lfRegisterPathRoleArn);
+    assumeRoleProperties.remove(AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX +
+        LakeFormationAwsClientFactory.LF_AUTHORIZED_CALLER);
+    assumeRoleProperties.put(AwsProperties.CLIENT_FACTORY, 
AssumeRoleAwsClientFactory.class.getName());
+    glueCatalogRegisterPathRole = new GlueCatalog();
+    glueCatalogRegisterPathRole.initialize("test_privileged", 
assumeRoleProperties);
+    // register S3 test bucket path
+    deregisterResource(testBucketPath);
+    registerResource(testBucketPath);
+    waitForIamConsistency();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    GetDataLakeSettingsResponse getDataLakeSettingsResponse =
+        
lakeformation.getDataLakeSettings(GetDataLakeSettingsRequest.builder().build());
+    
lakeformation.putDataLakeSettings(putDataLakeSettingsRequest(lfRegisterPathRoleArn,
+        getDataLakeSettingsResponse.dataLakeSettings(), false));
+    detachAndDeleteRolePolicy(createPolicyArn(lfRegisterPathRoleS3PolicyName), 
lfRegisterPathRoleName);
+    detachAndDeleteRolePolicy(createPolicyArn(lfRegisterPathRoleLfPolicyName), 
lfRegisterPathRoleName);
+    
detachAndDeleteRolePolicy(createPolicyArn(lfRegisterPathRoleIamPolicyName), 
lfRegisterPathRoleName);
+    
iam.deleteRole(DeleteRoleRequest.builder().roleName(lfRegisterPathRoleName).build());
+    detachAndDeleteRolePolicy(createPolicyArn(lfPrivilegedRolePolicyName), 
lfPrivilegedRoleName);
+    
iam.deleteRole(DeleteRoleRequest.builder().roleName(lfPrivilegedRoleName).build());
+    deregisterResource(testBucketPath);
+  }
+
+  void grantDatabasePrivileges(String dbName, Permission... permissions) {
+    Resource dbResource = 
Resource.builder().database(DatabaseResource.builder().name(dbName).build()).build();
+    lakeformation.grantPermissions(GrantPermissionsRequest.builder()
+        .principal(principalUnderTest)
+        .resource(dbResource)
+        .permissions(permissions).build());
+  }
+
+  void grantDataPathPrivileges(String resourceLocation) {
+    Resource dataLocationResource = Resource.builder()
+        .dataLocation(DataLocationResource.builder()
+            
.resourceArn(getArnForS3Location(resourceLocation)).build()).build();
+    lakeformation.grantPermissions(GrantPermissionsRequest.builder()
+        .principal(principalUnderTest)
+        .resource(dataLocationResource)
+        .permissions(Permission.DATA_LOCATION_ACCESS).build());
+  }
+
+  void lfRegisterPathRoleCreateDb(String dbName) {
+    glueCatalogRegisterPathRole.createNamespace(Namespace.of(dbName));
+  }
+
+  void lfRegisterPathRoleDeleteDb(String dbName) {
+    glueCatalogRegisterPathRole.dropNamespace(Namespace.of(dbName));
+  }
+
+  void lfRegisterPathRoleCreateTable(String dbName, String tableName) {
+    
glueCatalogRegisterPathRole.createTable(TableIdentifier.of(Namespace.of(dbName),
 tableName),
+        schema, partitionSpec, getTableLocation(tableName), null);
+  }
+
+  void lfRegisterPathRoleDeleteTable(String dbName, String tableName) {
+    
glueCatalogRegisterPathRole.dropTable(TableIdentifier.of(Namespace.of(dbName), 
tableName), false);
+  }
+
+  String getTableLocation(String tableName) {
+    return testBucketPath + tableName;
+  }
+
+  void grantCreateDbPermission() {
+    lakeformation.grantPermissions(GrantPermissionsRequest.builder()
+        .principal(principalUnderTest)
+        .permissions(Permission.CREATE_DATABASE)
+        
.resource(Resource.builder().catalog(CatalogResource.builder().build()).build()).build());
+  }
+
+  void grantTablePrivileges(String dbName, String tableName, Permission... 
tableDdlPrivileges) {
+    Resource tableResource = Resource.builder()
+        
.table(TableResource.builder().databaseName(dbName).name(tableName).build()).build();
+    GrantPermissionsRequest grantDataLakePrivilegesRequest = 
GrantPermissionsRequest.builder()
+        .principal(principalUnderTest)
+        .resource(tableResource)
+        .permissionsWithGrantOption(tableDdlPrivileges)
+        .permissions(tableDdlPrivileges).build();
+    lakeformation.grantPermissions(grantDataLakePrivilegesRequest);
+  }
+
+  String getRandomDbName() {
+    return LF_TEST_DB_PREFIX + UUID.randomUUID().toString().replace("-", "");
+  }
+
+  String getRandomTableName() {
+    return LF_TEST_TABLE_PREFIX  + UUID.randomUUID().toString().replace("-", 
"");
+  }
+
+  private static void waitForIamConsistency() throws Exception {
+    Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
+  }
+
+  private static LakeFormationClient buildLakeFormationClient(String roleArn, 
String sessionName, String region) {
+    AssumeRoleRequest request = AssumeRoleRequest.builder()
+        .roleArn(roleArn)
+        .roleSessionName(sessionName)
+        .durationSeconds(ASSUME_ROLE_SESSION_DURATION)
+        .build();
+
+    LakeFormationClientBuilder clientBuilder = LakeFormationClient.builder();
+
+    clientBuilder.credentialsProvider(
+        StsAssumeRoleCredentialsProvider.builder()
+            
.stsClient(StsClient.builder().httpClientBuilder(UrlConnectionHttpClient.builder()).build())
+            .refreshRequest(request)
+            .build());
+
+    clientBuilder.region(Region.of(region));
+    clientBuilder.httpClientBuilder(UrlConnectionHttpClient.builder());
+    return clientBuilder.build();
+  }
+
+  private static GlueClient buildGlueClient(String roleArn, String 
sessionName, String region) {
+    AssumeRoleRequest request = AssumeRoleRequest.builder()
+        .roleArn(roleArn)
+        .roleSessionName(sessionName)
+        .durationSeconds(ASSUME_ROLE_SESSION_DURATION)
+        .build();
+
+    GlueClientBuilder clientBuilder = GlueClient.builder();
+
+    clientBuilder.credentialsProvider(
+        StsAssumeRoleCredentialsProvider.builder()
+            
.stsClient(StsClient.builder().httpClientBuilder(UrlConnectionHttpClient.builder()).build())
+            .refreshRequest(request)
+            .build());
+
+    clientBuilder.region(Region.of(region));
+    clientBuilder.httpClientBuilder(UrlConnectionHttpClient.builder());
+    return clientBuilder.build();
+  }
+
+  private static void registerResource(String s3Location) {
+    String arn = getArnForS3Location(s3Location);
+    try {
+      lakeformation.registerResource(RegisterResourceRequest.builder()
+          .resourceArn(arn)
+          .roleArn(lfRegisterPathRoleArn)
+          .useServiceLinkedRole(false).build());
+      // when a resource is registered, LF will update SLR with necessary 
permissions which has a propagation delay
+      waitForIamConsistency();
+    } catch (AlreadyExistsException e) {
+      LOG.warn("Resource {} already registered. Error: {}", arn, e);
+    } catch (Exception e) {
+      // ignore exception
+    }
+  }
+
+  private static void deregisterResource(String s3Location) {
+    String arn = getArnForS3Location(s3Location);
+    try {
+      
lakeformation.deregisterResource(DeregisterResourceRequest.builder().resourceArn(arn).build());
+    } catch (EntityNotFoundException e) {
+      LOG.info("Resource {} not found. Error: {}", arn, e);
+    }
+  }
+
+  private static String createPolicyArn(String policyName) {
+    return String.format("arn:%s:iam::%s:policy/%s",
+        PartitionMetadata.of(Region.of(AwsIntegTestUtil.testRegion())).id(),
+        AwsIntegTestUtil.testAccountId(),
+        policyName);
+  }
+
+  private static void createAndAttachRolePolicy(String policyArn, String 
policyName,
+      String policyDocument, String roleName) {
+    createOrReplacePolicy(policyArn, policyName, policyDocument, roleName);
+    attachRolePolicyIfNotExists(policyArn, policyName, roleName);
+  }
+
+  private static void attachRolePolicyIfNotExists(String policyArn, String 
policyName, String roleName) {
+    try {
+      
iam.getRolePolicy(GetRolePolicyRequest.builder().roleName(roleName).policyName(policyName).build());
+      LOG.info("Policy {} already attached to role {}", policyName, roleName);
+    } catch (NoSuchEntityException e) {
+      LOG.info("Attaching policy {} to role {} {}", policyName, roleName, e);
+      
iam.attachRolePolicy(AttachRolePolicyRequest.builder().roleName(roleName).policyArn(policyArn).build());
+    }
+  }
+
+  private static void createOrReplacePolicy(String policyArn, String 
policyName,
+      String policyDocument, String roleName) {
+    try {
+      PolicyVersion existingPolicy = 
iam.getPolicyVersion(GetPolicyVersionRequest.builder()
+          
.policyArn(policyArn).versionId(DEFAULT_IAM_POLICY_VERSION).build()).policyVersion();
+      String currentDocument = URLDecoder.decode(existingPolicy.document(), 
StandardCharsets.UTF_8.name());
+      if (Objects.equals(currentDocument, policyDocument)) {
+        LOG.info("Policy {} already exists and policy content did not change. 
Nothing to do.", policyArn);
+      } else {
+        LOG.info("Role policy exists but has different policy content. 
Existing content: {}, new content: {}",
+            currentDocument, policyDocument);
+        detachAndDeleteRolePolicy(policyArn, roleName);
+        createPolicy(policyName, policyDocument);
+      }
+    } catch (NoSuchEntityException e) {
+      createPolicy(policyName, policyDocument);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void createPolicy(String policyName, String policyDocument) {
+    LOG.info("Creating policy {} with version v1", policyName);
+    
iam.createPolicy(CreatePolicyRequest.builder().policyName(policyName).policyDocument(policyDocument).build());
+  }
+
+  private static void detachAndDeleteRolePolicy(String policyArn, String 
roleName) {
+    LOG.info("Detaching role policy {} if attached", policyArn);
+    try {
+      
iam.detachRolePolicy(DetachRolePolicyRequest.builder().policyArn(policyArn).roleName(roleName).build());
+    } catch (NoSuchEntityException ex) {
+      // nothing to do if it doesn't exist
+    }
+
+    LOG.info("Deleting role policy : {} if already exists", policyArn);
+    try {
+      
iam.deletePolicy(DeletePolicyRequest.builder().policyArn(policyArn).build());
+    } catch (NoSuchEntityException e) {
+      // nothing to do if it doesn't exist
+    }
+  }
+
+  private static String lfRegisterPathRolePolicyDocForS3() {
+    return "{" +
+        "\"Version\":\"2012-10-17\"," +
+        "\"Statement\":[{" +
+        "\"Effect\":\"Allow\"," +
+        "\"Action\": [\"s3:*\"]," +
+        "\"Resource\": [\"*\"]}]}";
+  }
+
+  private static String lfRegisterPathRolePolicyDocForLakeFormation() {
+    return "{" +
+        "\"Version\":\"2012-10-17\"," +
+        "\"Statement\":[{" +
+        "\"Sid\":\"policy1\"," +
+        "\"Effect\":\"Allow\"," +
+        "\"Action\":[\"lakeformation:GetDataLakeSettings\"," +
+        "\"lakeformation:PutDataLakeSettings\"," +
+        "\"lakeformation:GrantPermissions\"," +
+        "\"lakeformation:RevokePermissions\"," +
+        "\"lakeformation:RegisterResource\"," +
+        "\"lakeformation:DeregisterResource\"," +
+        "\"lakeformation:GetDataAccess\"," +
+        "\"glue:CreateDatabase\",\"glue:DeleteDatabase\"," +
+        "\"glue:Get*\", \"glue:CreateTable\", \"glue:DeleteTable\", 
\"glue:UpdateTable\"]," +
+        "\"Resource\":[\"*\"]}]}";
+  }
+
+  private static String lfRegisterPathRolePolicyDocForIam(String roleArn) {
+    return "{\n" +
+        "\"Version\":\"2012-10-17\"," +
+        "\"Statement\":{" +
+        "\"Effect\":\"Allow\"," +
+        "\"Action\": [" +
+        "\"iam:PassRole\"," +
+        "\"iam:GetRole\"" +
+        "]," +
+        "\"Resource\": [" +
+        "\"" + roleArn + "\"" +
+        "]}}";
+  }
+
+  private static String lfPrivilegedRolePolicyDoc() {
+    return "{" +
+        "\"Version\":\"2012-10-17\"," +
+        "\"Statement\":[{" +
+        "\"Sid\":\"policy1\"," +
+        "\"Effect\":\"Allow\"," +
+        "\"Action\":[\"glue:CreateDatabase\", \"glue:DeleteDatabase\"," +
+        "\"glue:Get*\", \"glue:UpdateTable\", \"glue:DeleteTable\", 
\"glue:CreateTable\"," +
+        "\"lakeformation:GetDataAccess\"]," +
+        "\"Resource\":[\"*\"]}]}";
+  }
+
+  private static PutDataLakeSettingsRequest putDataLakeSettingsRequest(String 
adminArn,
+      DataLakeSettings dataLakeSettings, boolean add) {
+    List<DataLakePrincipal> dataLakeAdmins =  
Lists.newArrayList(dataLakeSettings.dataLakeAdmins());
+    if (add) {
+      
dataLakeAdmins.add(DataLakePrincipal.builder().dataLakePrincipalIdentifier(adminArn).build());
+    } else {
+      dataLakeAdmins.removeIf(p -> 
p.dataLakePrincipalIdentifier().equals(adminArn));
+    }
+    DataLakeSettings newDataLakeSettings = DataLakeSettings.builder()
+        .dataLakeAdmins(dataLakeAdmins)
+        .allowExternalDataFiltering(true)
+        .externalDataFilteringAllowList(DataLakePrincipal.builder()
+            
.dataLakePrincipalIdentifier(AwsIntegTestUtil.testAccountId()).build())
+        .authorizedSessionTagValueList(LF_AUTHORIZED_CALLER_VALUE)
+        .build();
+
+    return 
PutDataLakeSettingsRequest.builder().dataLakeSettings(newDataLakeSettings).build();
+  }
+
+  private static String getArnForS3Location(String s3Location) {
+    return s3Location.replace("s3://", "arn:aws:s3:::");
+  }
+}
diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationDataOperations.java
 
b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationDataOperations.java
new file mode 100644
index 0000000000..a7680790d9
--- /dev/null
+++ 
b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationDataOperations.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.lakeformation;
+
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.AccessDeniedException;
+import software.amazon.awssdk.services.lakeformation.model.Permission;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+public class TestLakeFormationDataOperations
+    extends LakeFormationTestBase {
+
+  private static String testDbName;
+  private static String testTableName;
+
+  @Before
+  public void before() {
+    testDbName = getRandomDbName();
+    testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    lfRegisterPathRoleCreateTable(testDbName, testTableName);
+  }
+
+  @After
+  public void after() {
+    lfRegisterPathRoleDeleteTable(testDbName, testTableName);
+    lfRegisterPathRoleDeleteDb(testDbName);
+  }
+
+  @Test
+  public void testLoadTableWithNoTableAccess() {
+    AssertHelpers.assertThrows("attempt to load a table without SELECT 
permission should fail",
+        AccessDeniedException.class,
+        "Insufficient Lake Formation permission(s)",
+        () -> 
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName)));
+  }
+
+  @Test
+  public void testLoadTableSuccess() {
+    grantTablePrivileges(testDbName, testTableName, Permission.SELECT);
+    
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName));
+  }
+
+  @Test
+  public void testUpdateTableWithNoInsertAccess() {
+    grantTablePrivileges(testDbName, testTableName, Permission.SELECT);
+    Table table = 
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName));
+    DataFile dataFile = DataFiles.builder(partitionSpec)
+        .withPath(getTableLocation(testTableName) + "/path/to/data-a.parquet")
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+    AssertHelpers.assertThrows("attempt to insert to a table without INSERT 
permission should fail",
+        S3Exception.class,
+        "Access Denied",
+        () -> table.newAppend().appendFile(dataFile).commit());
+  }
+
+  @Test
+  public void testUpdateTableSuccess() {
+    grantTablePrivileges(testDbName, testTableName, Permission.SELECT, 
Permission.ALTER, Permission.INSERT);
+    grantDataPathPrivileges(getTableLocation(testTableName));
+    Table table = 
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName));
+    DataFile dataFile = DataFiles.builder(partitionSpec)
+        .withPath(getTableLocation(testTableName) + "/path/to/data-a.parquet")
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+    table.newAppend().appendFile(dataFile).commit();
+  }
+
+  @Test
+  public void testDeleteWithNoDataPathAccess() {
+    grantTablePrivileges(testDbName, testTableName, Permission.SELECT, 
Permission.INSERT, Permission.ALTER);
+    Table table = 
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName));
+    DataFile dataFile = DataFiles.builder(partitionSpec)
+        .withPath(getTableLocation(testTableName) + 
"/path/to/delete-a.parquet")
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+    AssertHelpers.assertThrows("attempt to delete without DATA_LOCATION_ACCESS 
permission should fail",
+        ForbiddenException.class,
+        "Glue cannot access the requested resources",
+        () -> table.newDelete().deleteFile(dataFile).commit());
+  }
+
+  @Test
+  public void testDeleteSuccess() {
+    grantTablePrivileges(testDbName, testTableName, Permission.SELECT, 
Permission.ALTER, Permission.INSERT);
+    grantDataPathPrivileges(getTableLocation(testTableName));
+    Table table = 
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName));
+    DataFile dataFile = DataFiles.builder(partitionSpec)
+        .withPath(getTableLocation(testTableName) + "/path/to/data-a.parquet")
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+    table.newDelete().deleteFile(dataFile).commit();
+  }
+}
diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationMetadataOperations.java
 
b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationMetadataOperations.java
new file mode 100644
index 0000000000..f3d60b1182
--- /dev/null
+++ 
b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationMetadataOperations.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.lakeformation;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.AccessDeniedException;
+import software.amazon.awssdk.services.lakeformation.model.Permission;
+
+public class TestLakeFormationMetadataOperations
+    extends LakeFormationTestBase {
+  @Test
+  public void testCreateAndDropDatabaseSuccessful() {
+    String testDbName = getRandomDbName();
+
+    grantCreateDbPermission();
+    glueCatalogPrivilegedRole.createNamespace(Namespace.of(testDbName));
+
+    grantDatabasePrivileges(testDbName, Permission.DROP);
+    glueCatalogPrivilegedRole.dropNamespace(Namespace.of(testDbName));
+  }
+
+  @Test
+  public void testCreateDatabaseNoPrivileges() {
+    String testDbName = getRandomDbName();
+    AssertHelpers.assertThrows("attempt to create a database without 
CREATE_DATABASE permission should fail",
+        AccessDeniedException.class,
+        "Insufficient Lake Formation permission(s)",
+        () -> 
glueCatalogPrivilegedRole.createNamespace(Namespace.of(testDbName)));
+  }
+
+  @Test
+  public void testDropDatabaseNoPrivileges() {
+    String testDbName = getRandomDbName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    try {
+      AssertHelpers.assertThrows("attempt to drop a database without DROP 
permission should fail",
+          AccessDeniedException.class,
+          "Insufficient Lake Formation permission(s)",
+          () -> 
glueCatalogPrivilegedRole.dropNamespace(Namespace.of(testDbName)));
+    } finally {
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testShowDatabasesSuccessful() {
+    String testDbName = getRandomDbName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    grantDatabasePrivileges(testDbName, Permission.ALTER);
+    try {
+      List<Namespace> namespaces = glueCatalogPrivilegedRole.listNamespaces();
+      Assert.assertTrue(namespaces.contains(Namespace.of(testDbName)));
+    } finally {
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testCreateTableNoCreateTablePermission() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    grantCreateDbPermission();
+    lfRegisterPathRoleCreateDb(testDbName);
+    String tableLocation = getTableLocation(testTableName);
+    grantDataPathPrivileges(tableLocation);
+    try {
+      AssertHelpers.assertThrows("attempt to create a table without 
CREATE_TABLE permission should fail",
+          AccessDeniedException.class,
+          "Insufficient Lake Formation permission(s)",
+          () -> glueCatalogPrivilegedRole.createTable(
+              TableIdentifier.of(testDbName, testTableName), schema, 
partitionSpec, tableLocation, null));
+    } finally {
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testShowTablesSuccessful() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    lfRegisterPathRoleCreateTable(testDbName, testTableName);
+    grantTablePrivileges(testDbName, testTableName, Permission.ALTER);
+    try {
+      List<TableIdentifier> tables = 
glueCatalogPrivilegedRole.listTables(Namespace.of(testDbName));
+      
Assert.assertTrue(tables.contains(TableIdentifier.of(Namespace.of(testDbName), 
testTableName)));
+    } finally {
+      lfRegisterPathRoleDeleteTable(testDbName, testTableName);
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testShowTablesNoPrivileges() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    lfRegisterPathRoleCreateTable(testDbName, testTableName);
+    try {
+      AssertHelpers.assertThrows("attempt to show tables without any 
permissions should fail",
+          AccessDeniedException.class,
+          "Insufficient Lake Formation permission(s)",
+          () -> 
glueCatalogPrivilegedRole.listTables(Namespace.of(testDbName)));
+    } finally {
+      lfRegisterPathRoleDeleteTable(testDbName, testTableName);
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testCreateTableNoDataPathPermission() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    grantDatabasePrivileges(testDbName, Permission.CREATE_TABLE);
+    try {
+      AssertHelpers.assertThrows("attempt to create a table without 
DATA_LOCATION_ACCESS permission should fail",
+          ForbiddenException.class,
+          "Glue cannot access the requested resources",
+          () -> 
glueCatalogPrivilegedRole.createTable(TableIdentifier.of(testDbName, 
testTableName),
+              schema, partitionSpec, getTableLocation(testTableName), null));
+    } finally {
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testCreateTableSuccess() {
+    String testDbName = getRandomDbName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    String testTableName = getRandomTableName();
+    String tableLocation = getTableLocation(testTableName);
+    grantDataPathPrivileges(tableLocation);
+    grantDatabasePrivileges(testDbName, Permission.CREATE_TABLE);
+    try {
+      glueCatalogPrivilegedRole.createTable(TableIdentifier.of(testDbName, 
testTableName),
+          schema, partitionSpec, tableLocation, null);
+    } finally {
+      grantTablePrivileges(testDbName, testTableName, Permission.DELETE, 
Permission.DROP);
+      glueCatalogPrivilegedRole.dropTable(TableIdentifier.of(testDbName, 
testTableName), false);
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testDropTableSuccessWhenPurgeIsFalse() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    lfRegisterPathRoleCreateTable(testDbName, testTableName);
+    grantTablePrivileges(testDbName, testTableName, Permission.DROP, 
Permission.SELECT);
+    try {
+      glueCatalogPrivilegedRole.dropTable(TableIdentifier.of(testDbName, 
testTableName), false);
+    } finally {
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testDropTableNoDropPermission() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    lfRegisterPathRoleCreateTable(testDbName, testTableName);
+    grantTablePrivileges(testDbName, testTableName, Permission.SELECT);
+    try {
+      AssertHelpers.assertThrows("attempt to drop a table without DROP 
permission should fail",
+          AccessDeniedException.class,
+          "Insufficient Lake Formation permission(s)",
+          () -> 
glueCatalogPrivilegedRole.dropTable(TableIdentifier.of(testDbName, 
testTableName), false));
+    } finally {
+      lfRegisterPathRoleDeleteTable(testDbName, testTableName);
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testAlterTableSetPropertiesSuccessful() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    lfRegisterPathRoleCreateTable(testDbName, testTableName);
+    Map<String, String> properties = Maps.newHashMap();
+    grantTablePrivileges(testDbName, testTableName, Permission.ALTER, 
Permission.INSERT);
+    grantDataPathPrivileges(getTableLocation(testTableName));
+    try {
+      Table table = 
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName));
+      properties.putAll(table.properties());
+      properties.put(TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+      UpdateProperties updateProperties = table.updateProperties();
+      properties.forEach(updateProperties::set);
+      updateProperties.commit();
+    } finally {
+      lfRegisterPathRoleDeleteTable(testDbName, testTableName);
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testAlterTableSetPropertiesNoDataPathAccess() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    lfRegisterPathRoleCreateTable(testDbName, testTableName);
+    Map<String, String> properties = Maps.newHashMap();
+    grantTablePrivileges(testDbName, testTableName, Permission.ALTER, 
Permission.INSERT);
+    try {
+      Table table = 
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName));
+      properties.putAll(table.properties());
+      properties.put(TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+      UpdateProperties updateProperties = table.updateProperties();
+      properties.forEach(updateProperties::set);
+      AssertHelpers.assertThrows("attempt to alter a table without ALTER 
permission should fail",
+          ForbiddenException.class,
+          "Glue cannot access the requested resources",
+          () -> updateProperties.commit());
+    } finally {
+      lfRegisterPathRoleDeleteTable(testDbName, testTableName);
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testAlterTableSetPropertiesNoPrivileges() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    lfRegisterPathRoleCreateTable(testDbName, testTableName);
+    grantDataPathPrivileges(getTableLocation(testTableName));
+    try {
+      AssertHelpers.assertThrows("attempt to alter a table without ALTER 
permission should fail",
+          AccessDeniedException.class,
+          "Insufficient Lake Formation permission(s)",
+          () -> 
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName)));
+    } finally {
+      lfRegisterPathRoleDeleteTable(testDbName, testTableName);
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+
+  @Test
+  public void testAlterTableSetPropertiesNoAlterPermission() {
+    String testDbName = getRandomDbName();
+    String testTableName = getRandomTableName();
+    lfRegisterPathRoleCreateDb(testDbName);
+    lfRegisterPathRoleCreateTable(testDbName, testTableName);
+    Map<String, String> properties = Maps.newHashMap();
+    grantTablePrivileges(testDbName, testTableName, Permission.SELECT, 
Permission.INSERT);
+    try {
+      Table table = 
glueCatalogPrivilegedRole.loadTable(TableIdentifier.of(Namespace.of(testDbName),
 testTableName));
+      properties.putAll(table.properties());
+      properties.put(TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+      UpdateProperties updateProperties = table.updateProperties();
+      properties.forEach(updateProperties::set);
+      AssertHelpers.assertThrows("attempt to alter a table without ALTER 
privileges should fail",
+          ForbiddenException.class,
+          "Glue cannot access the requested resources",
+          updateProperties::commit);
+    } finally {
+      lfRegisterPathRoleDeleteTable(testDbName, testTableName);
+      lfRegisterPathRoleDeleteDb(testDbName);
+    }
+  }
+}
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java 
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
index 0320498148..2db156b6e3 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
@@ -44,12 +44,15 @@ import 
software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.services.glue.GlueClient;
 import 
software.amazon.awssdk.services.glue.model.ConcurrentModificationException;
 import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
 import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
 import software.amazon.awssdk.services.glue.model.GetTableRequest;
 import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
 import software.amazon.awssdk.services.glue.model.Table;
 import software.amazon.awssdk.services.glue.model.TableInput;
 import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.utils.ImmutableMap;
 
 class GlueTableOperations extends BaseMetastoreTableOperations {
 
@@ -117,10 +120,14 @@ class GlueTableOperations extends 
BaseMetastoreTableOperations {
 
   @Override
   protected void doCommit(TableMetadata base, TableMetadata metadata) {
-    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 
1);
     CommitStatus commitStatus = CommitStatus.FAILURE;
 
+    String newMetadataLocation = null;
+    boolean glueTempTableCreated = false;
     try {
+      glueTempTableCreated = createGlueTempTableIfNecessary(base, 
metadata.location());
+
+      newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
       lock(newMetadataLocation);
       Table glueTable = getGlueTable();
       checkMetadataLocation(glueTable, base);
@@ -171,6 +178,31 @@ class GlueTableOperations extends 
BaseMetastoreTableOperations {
       }
     } finally {
       cleanupMetadataAndUnlock(commitStatus, newMetadataLocation);
+      cleanupGlueTempTableIfNecessary(glueTempTableCreated, commitStatus);
+    }
+  }
+
+  private boolean createGlueTempTableIfNecessary(TableMetadata base, String 
metadataLocation) {
+    if (awsProperties.glueLakeFormationEnabled() && base == null) {
+      // LakeFormation credential require TableArn as input,so creating a 
dummy table
+      // beforehand for create table scenario
+      glue.createTable(CreateTableRequest.builder()
+          .databaseName(databaseName)
+          .tableInput(TableInput.builder()
+              .parameters(ImmutableMap.of(TABLE_TYPE_PROP, 
ICEBERG_TABLE_TYPE_VALUE))
+              .name(tableName)
+              
.storageDescriptor(StorageDescriptor.builder().location(metadataLocation).build())
+              .build())
+          .build());
+      return true;
+    }
+
+    return false;
+  }
+
+  private void cleanupGlueTempTableIfNecessary(boolean glueTempTableCreated, 
CommitStatus commitStatus) {
+    if (glueTempTableCreated && commitStatus != CommitStatus.SUCCESS) {
+      
glue.deleteTable(DeleteTableRequest.builder().databaseName(databaseName).name(tableName).build());
     }
   }
 
@@ -253,7 +285,7 @@ class GlueTableOperations extends 
BaseMetastoreTableOperations {
   @VisibleForTesting
   void cleanupMetadataAndUnlock(CommitStatus commitStatus, String 
metadataLocation) {
     try {
-      if (commitStatus == CommitStatus.FAILURE) {
+      if (commitStatus == CommitStatus.FAILURE && metadataLocation != null && 
!metadataLocation.isEmpty()) {
         // if anything went wrong, clean up the uncommitted metadata file
         io().deleteFile(metadataLocation);
       }

Reply via email to