This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cddc9f28c AWS: Add table level S3 tags (#4402)
3cddc9f28c is described below

commit 3cddc9f28c93b9231060ecb6b90e2d524bd5d160
Author: Rajarshi Sarkar <[email protected]>
AuthorDate: Wed Sep 28 21:12:17 2022 +0530

    AWS: Add table level S3 tags (#4402)
---
 .../iceberg/aws/glue/TestGlueCatalogTable.java     | 39 ++++++++++++
 .../java/org/apache/iceberg/aws/AwsProperties.java | 69 ++++++++++++++++++++++
 .../org/apache/iceberg/aws/glue/GlueCatalog.java   | 34 +++++++----
 .../iceberg/aws/glue/GlueTableOperations.java      |  5 ++
 .../apache/iceberg/aws/glue/TestGlueCatalog.java   | 38 ++++++++++++
 5 files changed, 173 insertions(+), 12 deletions(-)

diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
 
b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
index b97f356dbf..051d5bff8b 100644
--- 
a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
+++ 
b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.expressions.Expressions.truncate;
 
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.iceberg.AppendFiles;
@@ -36,6 +37,8 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.Transaction;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
@@ -57,10 +60,12 @@ import 
software.amazon.awssdk.services.glue.model.GetTableResponse;
 import software.amazon.awssdk.services.glue.model.GetTableVersionsRequest;
 import software.amazon.awssdk.services.glue.model.TableInput;
 import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.Tag;
 
 public class TestGlueCatalogTable extends GlueTestBase {
 
@@ -550,4 +555,38 @@ public class TestGlueCatalogTable extends GlueTestBase {
     Assertions.assertThat(glueCatalog.dropTable(identifier, true)).isTrue();
     
Assertions.assertThat(glueCatalog.dropNamespace(Namespace.of(namespace))).isTrue();
   }
+
+  @Test
+  public void testTableLevelS3Tags() {
+    String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix;
+    S3FileIO fileIO = new S3FileIO(clientFactory::s3);
+    Map<String, String> properties =
+        ImmutableMap.of(
+            AwsProperties.S3_WRITE_TABLE_TAG_ENABLED,
+            "true",
+            AwsProperties.S3_WRITE_NAMESPACE_TAG_ENABLED,
+            "true");
+    glueCatalog.initialize(
+        catalogName, testBucketPath, new AwsProperties(properties), glue, 
null, fileIO);
+    String namespace = createNamespace();
+    String tableName = getRandomName();
+    createTable(namespace, tableName);
+
+    // Get metadata object tag from S3
+    GetTableResponse response =
+        
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
+    String metaLocation =
+        
response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+    String key = metaLocation.split(testBucketName, -1)[1].substring(1);
+    List<Tag> tags =
+        s3.getObjectTagging(
+                
GetObjectTaggingRequest.builder().bucket(testBucketName).key(key).build())
+            .tagSet();
+    Map<String, String> tagMap = 
tags.stream().collect(Collectors.toMap(Tag::key, Tag::value));
+
+    Assert.assertTrue(tagMap.containsKey(AwsProperties.S3_TAG_ICEBERG_TABLE));
+    Assert.assertEquals(tableName, 
tagMap.get(AwsProperties.S3_TAG_ICEBERG_TABLE));
+    
Assert.assertTrue(tagMap.containsKey(AwsProperties.S3_TAG_ICEBERG_NAMESPACE));
+    Assert.assertEquals(namespace, 
tagMap.get(AwsProperties.S3_TAG_ICEBERG_NAMESPACE));
+  }
 }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java 
b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 639f61d934..293223ab14 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
+import org.apache.iceberg.aws.glue.GlueCatalog;
 import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
 import org.apache.iceberg.aws.s3.S3FileIO;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -394,6 +395,48 @@ public class AwsProperties implements Serializable {
    */
   public static final String S3_WRITE_TAGS_PREFIX = "s3.write.tags.";
 
+  /**
+   * Used by {@link GlueCatalog} to tag objects when writing. To set, we can 
pass a catalog
+   * property.
+   *
+   * <p>For more details, see
+   * https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html
+   *
+   * <p>Example: s3.write.table-tag-enabled=true
+   */
+  public static final String S3_WRITE_TABLE_TAG_ENABLED = 
"s3.write.table-tag-enabled";
+
+  public static final boolean S3_WRITE_TABLE_TAG_ENABLED_DEFAULT = false;
+
+  /**
+   * Used by {@link GlueCatalog} to tag objects when writing. To set, we can 
pass a catalog
+   * property.
+   *
+   * <p>For more details, see
+   * https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html
+   *
+   * <p>Example: s3.write.namespace-tag-enabled=true
+   */
+  public static final String S3_WRITE_NAMESPACE_TAG_ENABLED = 
"s3.write.namespace-tag-enabled";
+
+  public static final boolean S3_WRITE_NAMESPACE_TAG_ENABLED_DEFAULT = false;
+
+  /**
+   * Tag name that will be used by {@link #S3_WRITE_TAGS_PREFIX} when {@link
+   * #S3_WRITE_TABLE_TAG_ENABLED} is enabled
+   *
+   * <p>Example: iceberg.table=tableName
+   */
+  public static final String S3_TAG_ICEBERG_TABLE = "iceberg.table";
+
+  /**
+   * Tag name that will be used by {@link #S3_WRITE_TAGS_PREFIX} when {@link
+   * #S3_WRITE_NAMESPACE_TAG_ENABLED} is enabled
+   *
+   * <p>Example: iceberg.namespace=namespaceName
+   */
+  public static final String S3_TAG_ICEBERG_NAMESPACE = "iceberg.namespace";
+
   /**
    * Used by {@link S3FileIO} to tag objects when deleting. When this config 
is set, objects are
    * tagged with the configured key-value pairs before deletion. This is 
considered a soft-delete,
@@ -498,6 +541,8 @@ public class AwsProperties implements Serializable {
   private ObjectCannedACL s3FileIoAcl;
   private boolean isS3ChecksumEnabled;
   private final Set<Tag> s3WriteTags;
+  private boolean s3WriteTableTagEnabled;
+  private boolean s3WriteNamespaceTagEnabled;
   private final Set<Tag> s3DeleteTags;
   private int s3FileIoDeleteThreads;
   private boolean isS3DeleteEnabled;
@@ -546,6 +591,8 @@ public class AwsProperties implements Serializable {
     this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir");
     this.isS3ChecksumEnabled = S3_CHECKSUM_ENABLED_DEFAULT;
     this.s3WriteTags = Sets.newHashSet();
+    this.s3WriteTableTagEnabled = S3_WRITE_TABLE_TAG_ENABLED_DEFAULT;
+    this.s3WriteNamespaceTagEnabled = S3_WRITE_NAMESPACE_TAG_ENABLED_DEFAULT;
     this.s3DeleteTags = Sets.newHashSet();
     this.s3FileIoDeleteThreads = Runtime.getRuntime().availableProcessors();
     this.isS3DeleteEnabled = S3_DELETE_ENABLED_DEFAULT;
@@ -679,6 +726,12 @@ public class AwsProperties implements Serializable {
             "Deletion batch size must be between 1 and %s", 
S3FILEIO_DELETE_BATCH_SIZE_MAX));
 
     this.s3WriteTags = toS3Tags(properties, S3_WRITE_TAGS_PREFIX);
+    this.s3WriteTableTagEnabled =
+        PropertyUtil.propertyAsBoolean(
+            properties, S3_WRITE_TABLE_TAG_ENABLED, 
S3_WRITE_TABLE_TAG_ENABLED_DEFAULT);
+    this.s3WriteNamespaceTagEnabled =
+        PropertyUtil.propertyAsBoolean(
+            properties, S3_WRITE_NAMESPACE_TAG_ENABLED, 
S3_WRITE_NAMESPACE_TAG_ENABLED_DEFAULT);
     this.s3DeleteTags = toS3Tags(properties, S3_DELETE_TAGS_PREFIX);
     this.s3FileIoDeleteThreads =
         PropertyUtil.propertyAsInt(
@@ -856,6 +909,22 @@ public class AwsProperties implements Serializable {
     return s3WriteTags;
   }
 
+  public boolean s3WriteTableTagEnabled() {
+    return s3WriteTableTagEnabled;
+  }
+
+  public void setS3WriteTableTagEnabled(boolean s3WriteTableNameTagEnabled) {
+    this.s3WriteTableTagEnabled = s3WriteTableNameTagEnabled;
+  }
+
+  public boolean s3WriteNamespaceTagEnabled() {
+    return s3WriteNamespaceTagEnabled;
+  }
+
+  public void setS3WriteNamespaceTagEnabled(boolean 
s3WriteNamespaceTagEnabled) {
+    this.s3WriteNamespaceTagEnabled = s3WriteNamespaceTagEnabled;
+  }
+
   public Set<Tag> s3DeleteTags() {
     return s3DeleteTags;
   }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java 
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index 682996a63e..71ccce8c1f 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -199,20 +199,30 @@ public class GlueCatalog extends BaseMetastoreCatalog
   protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
     if (catalogProperties != null) {
       ImmutableMap.Builder<String, String> 
tableSpecificCatalogPropertiesBuilder =
-          ImmutableMap.<String, String>builder()
-              .putAll(catalogProperties)
-              .put(
-                  AwsProperties.LAKE_FORMATION_DB_NAME,
-                  IcebergToGlueConverter.getDatabaseName(
-                      tableIdentifier, 
awsProperties.glueCatalogSkipNameValidation()))
-              .put(
-                  AwsProperties.LAKE_FORMATION_TABLE_NAME,
-                  IcebergToGlueConverter.getTableName(
-                      tableIdentifier, 
awsProperties.glueCatalogSkipNameValidation()));
+          ImmutableMap.<String, String>builder().putAll(catalogProperties);
+      boolean skipNameValidation = 
awsProperties.glueCatalogSkipNameValidation();
 
-      if (awsProperties.glueLakeFormationEnabled()) {
+      if (awsProperties.s3WriteTableTagEnabled()) {
+        tableSpecificCatalogPropertiesBuilder.put(
+            
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_TABLE),
+            IcebergToGlueConverter.getTableName(tableIdentifier, 
skipNameValidation));
+      }
+
+      if (awsProperties.s3WriteNamespaceTagEnabled()) {
         tableSpecificCatalogPropertiesBuilder.put(
-            AwsProperties.S3_PRELOAD_CLIENT_ENABLED, String.valueOf(true));
+            
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_NAMESPACE),
+            IcebergToGlueConverter.getDatabaseName(tableIdentifier, 
skipNameValidation));
+      }
+
+      if (awsProperties.glueLakeFormationEnabled()) {
+        tableSpecificCatalogPropertiesBuilder
+            .put(
+                AwsProperties.LAKE_FORMATION_DB_NAME,
+                IcebergToGlueConverter.getDatabaseName(tableIdentifier, 
skipNameValidation))
+            .put(
+                AwsProperties.LAKE_FORMATION_TABLE_NAME,
+                IcebergToGlueConverter.getTableName(tableIdentifier, 
skipNameValidation))
+            .put(AwsProperties.S3_PRELOAD_CLIENT_ENABLED, 
String.valueOf(true));
       }
 
       // FileIO initialization depends on tableSpecificCatalogProperties, so a 
new FileIO is
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java 
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
index bc04584e0a..3a1334fcc1 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
@@ -355,4 +355,9 @@ class GlueTableOperations extends 
BaseMetastoreTableOperations {
       }
     }
   }
+
+  @VisibleForTesting
+  Map<String, String> tableCatalogProperties() {
+    return tableCatalogProperties;
+  }
 }
diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java 
b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java
index d2f54aafd4..34d54a1ebc 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java
@@ -594,4 +594,42 @@ public class TestGlueCatalog {
         ImmutableMap.of());
     
Assert.assertEquals(glueCatalog.isValidIdentifier(TableIdentifier.parse("db-1.a-1")),
 true);
   }
+
+  @Test
+  public void testTableLevelS3TagProperties() {
+    Map<String, String> properties =
+        ImmutableMap.of(
+            AwsProperties.S3_WRITE_TABLE_TAG_ENABLED,
+            "true",
+            AwsProperties.S3_WRITE_NAMESPACE_TAG_ENABLED,
+            "true");
+    AwsProperties awsProperties = new AwsProperties(properties);
+    glueCatalog.initialize(
+        CATALOG_NAME,
+        WAREHOUSE_PATH,
+        awsProperties,
+        glue,
+        LockManagers.defaultLockManager(),
+        null,
+        properties);
+    GlueTableOperations glueTableOperations =
+        (GlueTableOperations)
+            glueCatalog.newTableOps(TableIdentifier.of(Namespace.of("db"), 
"table"));
+    Map<String, String> tableCatalogProperties = 
glueTableOperations.tableCatalogProperties();
+
+    Assert.assertTrue(
+        tableCatalogProperties.containsKey(
+            
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_TABLE)));
+    Assert.assertEquals(
+        "table",
+        tableCatalogProperties.get(
+            
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_TABLE)));
+    Assert.assertTrue(
+        tableCatalogProperties.containsKey(
+            
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_NAMESPACE)));
+    Assert.assertEquals(
+        "db",
+        tableCatalogProperties.get(
+            
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_NAMESPACE)));
+  }
 }

Reply via email to