This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3cddc9f28c AWS: Add table level S3 tags (#4402)
3cddc9f28c is described below
commit 3cddc9f28c93b9231060ecb6b90e2d524bd5d160
Author: Rajarshi Sarkar <[email protected]>
AuthorDate: Wed Sep 28 21:12:17 2022 +0530
AWS: Add table level S3 tags (#4402)
---
.../iceberg/aws/glue/TestGlueCatalogTable.java | 39 ++++++++++++
.../java/org/apache/iceberg/aws/AwsProperties.java | 69 ++++++++++++++++++++++
.../org/apache/iceberg/aws/glue/GlueCatalog.java | 34 +++++++----
.../iceberg/aws/glue/GlueTableOperations.java | 5 ++
.../apache/iceberg/aws/glue/TestGlueCatalog.java | 38 ++++++++++++
5 files changed, 173 insertions(+), 12 deletions(-)
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
index b97f356dbf..051d5bff8b 100644
---
a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
+++
b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java
@@ -22,6 +22,7 @@ import static
org.apache.iceberg.expressions.Expressions.truncate;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
@@ -36,6 +37,8 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
@@ -57,10 +60,12 @@ import
software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.GetTableVersionsRequest;
import software.amazon.awssdk.services.glue.model.TableInput;
import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.Tag;
public class TestGlueCatalogTable extends GlueTestBase {
@@ -550,4 +555,38 @@ public class TestGlueCatalogTable extends GlueTestBase {
Assertions.assertThat(glueCatalog.dropTable(identifier, true)).isTrue();
Assertions.assertThat(glueCatalog.dropNamespace(Namespace.of(namespace))).isTrue();
}
+
+ @Test
+ public void testTableLevelS3Tags() {
+ String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix;
+ S3FileIO fileIO = new S3FileIO(clientFactory::s3);
+ Map<String, String> properties =
+ ImmutableMap.of(
+ AwsProperties.S3_WRITE_TABLE_TAG_ENABLED,
+ "true",
+ AwsProperties.S3_WRITE_NAMESPACE_TAG_ENABLED,
+ "true");
+ glueCatalog.initialize(
+ catalogName, testBucketPath, new AwsProperties(properties), glue,
null, fileIO);
+ String namespace = createNamespace();
+ String tableName = getRandomName();
+ createTable(namespace, tableName);
+
+ // Get metadata object tag from S3
+ GetTableResponse response =
+
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
+ String metaLocation =
+
response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+ String key = metaLocation.split(testBucketName, -1)[1].substring(1);
+ List<Tag> tags =
+ s3.getObjectTagging(
+
GetObjectTaggingRequest.builder().bucket(testBucketName).key(key).build())
+ .tagSet();
+ Map<String, String> tagMap =
tags.stream().collect(Collectors.toMap(Tag::key, Tag::value));
+
+ Assert.assertTrue(tagMap.containsKey(AwsProperties.S3_TAG_ICEBERG_TABLE));
+ Assert.assertEquals(tableName,
tagMap.get(AwsProperties.S3_TAG_ICEBERG_TABLE));
+
Assert.assertTrue(tagMap.containsKey(AwsProperties.S3_TAG_ICEBERG_NAMESPACE));
+ Assert.assertEquals(namespace,
tagMap.get(AwsProperties.S3_TAG_ICEBERG_NAMESPACE));
+ }
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 639f61d934..293223ab14 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
+import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.exceptions.ValidationException;
@@ -394,6 +395,48 @@ public class AwsProperties implements Serializable {
*/
public static final String S3_WRITE_TAGS_PREFIX = "s3.write.tags.";
+ /**
+ * Used by {@link GlueCatalog} to tag objects when writing. To set, we can
pass a catalog
+ * property.
+ *
+ * <p>For more details, see
+ * https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html
+ *
+ * <p>Example: s3.write.table-tag-enabled=true
+ */
+ public static final String S3_WRITE_TABLE_TAG_ENABLED =
"s3.write.table-tag-enabled";
+
+ public static final boolean S3_WRITE_TABLE_TAG_ENABLED_DEFAULT = false;
+
+ /**
+ * Used by {@link GlueCatalog} to tag objects when writing. To set, we can
pass a catalog
+ * property.
+ *
+ * <p>For more details, see
+ * https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html
+ *
+ * <p>Example: s3.write.namespace-tag-enabled=true
+ */
+ public static final String S3_WRITE_NAMESPACE_TAG_ENABLED =
"s3.write.namespace-tag-enabled";
+
+ public static final boolean S3_WRITE_NAMESPACE_TAG_ENABLED_DEFAULT = false;
+
+ /**
+ * Tag name that will be used by {@link #S3_WRITE_TAGS_PREFIX} when {@link
+ * #S3_WRITE_TABLE_TAG_ENABLED} is enabled
+ *
+ * <p>Example: iceberg.table=tableName
+ */
+ public static final String S3_TAG_ICEBERG_TABLE = "iceberg.table";
+
+ /**
+ * Tag name that will be used by {@link #S3_WRITE_TAGS_PREFIX} when {@link
+ * #S3_WRITE_NAMESPACE_TAG_ENABLED} is enabled
+ *
+ * <p>Example: iceberg.namespace=namespaceName
+ */
+ public static final String S3_TAG_ICEBERG_NAMESPACE = "iceberg.namespace";
+
/**
* Used by {@link S3FileIO} to tag objects when deleting. When this config
is set, objects are
* tagged with the configured key-value pairs before deletion. This is
considered a soft-delete,
@@ -498,6 +541,8 @@ public class AwsProperties implements Serializable {
private ObjectCannedACL s3FileIoAcl;
private boolean isS3ChecksumEnabled;
private final Set<Tag> s3WriteTags;
+ private boolean s3WriteTableTagEnabled;
+ private boolean s3WriteNamespaceTagEnabled;
private final Set<Tag> s3DeleteTags;
private int s3FileIoDeleteThreads;
private boolean isS3DeleteEnabled;
@@ -546,6 +591,8 @@ public class AwsProperties implements Serializable {
this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir");
this.isS3ChecksumEnabled = S3_CHECKSUM_ENABLED_DEFAULT;
this.s3WriteTags = Sets.newHashSet();
+ this.s3WriteTableTagEnabled = S3_WRITE_TABLE_TAG_ENABLED_DEFAULT;
+ this.s3WriteNamespaceTagEnabled = S3_WRITE_NAMESPACE_TAG_ENABLED_DEFAULT;
this.s3DeleteTags = Sets.newHashSet();
this.s3FileIoDeleteThreads = Runtime.getRuntime().availableProcessors();
this.isS3DeleteEnabled = S3_DELETE_ENABLED_DEFAULT;
@@ -679,6 +726,12 @@ public class AwsProperties implements Serializable {
"Deletion batch size must be between 1 and %s",
S3FILEIO_DELETE_BATCH_SIZE_MAX));
this.s3WriteTags = toS3Tags(properties, S3_WRITE_TAGS_PREFIX);
+ this.s3WriteTableTagEnabled =
+ PropertyUtil.propertyAsBoolean(
+ properties, S3_WRITE_TABLE_TAG_ENABLED,
S3_WRITE_TABLE_TAG_ENABLED_DEFAULT);
+ this.s3WriteNamespaceTagEnabled =
+ PropertyUtil.propertyAsBoolean(
+ properties, S3_WRITE_NAMESPACE_TAG_ENABLED,
S3_WRITE_NAMESPACE_TAG_ENABLED_DEFAULT);
this.s3DeleteTags = toS3Tags(properties, S3_DELETE_TAGS_PREFIX);
this.s3FileIoDeleteThreads =
PropertyUtil.propertyAsInt(
@@ -856,6 +909,22 @@ public class AwsProperties implements Serializable {
return s3WriteTags;
}
+ public boolean s3WriteTableTagEnabled() {
+ return s3WriteTableTagEnabled;
+ }
+
+ public void setS3WriteTableTagEnabled(boolean s3WriteTableNameTagEnabled) {
+ this.s3WriteTableTagEnabled = s3WriteTableNameTagEnabled;
+ }
+
+ public boolean s3WriteNamespaceTagEnabled() {
+ return s3WriteNamespaceTagEnabled;
+ }
+
+ public void setS3WriteNamespaceTagEnabled(boolean
s3WriteNamespaceTagEnabled) {
+ this.s3WriteNamespaceTagEnabled = s3WriteNamespaceTagEnabled;
+ }
+
public Set<Tag> s3DeleteTags() {
return s3DeleteTags;
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index 682996a63e..71ccce8c1f 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -199,20 +199,30 @@ public class GlueCatalog extends BaseMetastoreCatalog
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
if (catalogProperties != null) {
ImmutableMap.Builder<String, String>
tableSpecificCatalogPropertiesBuilder =
- ImmutableMap.<String, String>builder()
- .putAll(catalogProperties)
- .put(
- AwsProperties.LAKE_FORMATION_DB_NAME,
- IcebergToGlueConverter.getDatabaseName(
- tableIdentifier,
awsProperties.glueCatalogSkipNameValidation()))
- .put(
- AwsProperties.LAKE_FORMATION_TABLE_NAME,
- IcebergToGlueConverter.getTableName(
- tableIdentifier,
awsProperties.glueCatalogSkipNameValidation()));
+ ImmutableMap.<String, String>builder().putAll(catalogProperties);
+ boolean skipNameValidation =
awsProperties.glueCatalogSkipNameValidation();
- if (awsProperties.glueLakeFormationEnabled()) {
+ if (awsProperties.s3WriteTableTagEnabled()) {
+ tableSpecificCatalogPropertiesBuilder.put(
+
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_TABLE),
+ IcebergToGlueConverter.getTableName(tableIdentifier,
skipNameValidation));
+ }
+
+ if (awsProperties.s3WriteNamespaceTagEnabled()) {
tableSpecificCatalogPropertiesBuilder.put(
- AwsProperties.S3_PRELOAD_CLIENT_ENABLED, String.valueOf(true));
+
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_NAMESPACE),
+ IcebergToGlueConverter.getDatabaseName(tableIdentifier,
skipNameValidation));
+ }
+
+ if (awsProperties.glueLakeFormationEnabled()) {
+ tableSpecificCatalogPropertiesBuilder
+ .put(
+ AwsProperties.LAKE_FORMATION_DB_NAME,
+ IcebergToGlueConverter.getDatabaseName(tableIdentifier,
skipNameValidation))
+ .put(
+ AwsProperties.LAKE_FORMATION_TABLE_NAME,
+ IcebergToGlueConverter.getTableName(tableIdentifier,
skipNameValidation))
+ .put(AwsProperties.S3_PRELOAD_CLIENT_ENABLED,
String.valueOf(true));
}
// FileIO initialization depends on tableSpecificCatalogProperties, so a
new FileIO is
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
index bc04584e0a..3a1334fcc1 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
@@ -355,4 +355,9 @@ class GlueTableOperations extends
BaseMetastoreTableOperations {
}
}
}
+
+ @VisibleForTesting
+ Map<String, String> tableCatalogProperties() {
+ return tableCatalogProperties;
+ }
}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java
b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java
index d2f54aafd4..34d54a1ebc 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java
@@ -594,4 +594,42 @@ public class TestGlueCatalog {
ImmutableMap.of());
Assert.assertEquals(glueCatalog.isValidIdentifier(TableIdentifier.parse("db-1.a-1")),
true);
}
+
+ @Test
+ public void testTableLevelS3TagProperties() {
+ Map<String, String> properties =
+ ImmutableMap.of(
+ AwsProperties.S3_WRITE_TABLE_TAG_ENABLED,
+ "true",
+ AwsProperties.S3_WRITE_NAMESPACE_TAG_ENABLED,
+ "true");
+ AwsProperties awsProperties = new AwsProperties(properties);
+ glueCatalog.initialize(
+ CATALOG_NAME,
+ WAREHOUSE_PATH,
+ awsProperties,
+ glue,
+ LockManagers.defaultLockManager(),
+ null,
+ properties);
+ GlueTableOperations glueTableOperations =
+ (GlueTableOperations)
+ glueCatalog.newTableOps(TableIdentifier.of(Namespace.of("db"),
"table"));
+ Map<String, String> tableCatalogProperties =
glueTableOperations.tableCatalogProperties();
+
+ Assert.assertTrue(
+ tableCatalogProperties.containsKey(
+
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_TABLE)));
+ Assert.assertEquals(
+ "table",
+ tableCatalogProperties.get(
+
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_TABLE)));
+ Assert.assertTrue(
+ tableCatalogProperties.containsKey(
+
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_NAMESPACE)));
+ Assert.assertEquals(
+ "db",
+ tableCatalogProperties.get(
+
AwsProperties.S3_WRITE_TAGS_PREFIX.concat(AwsProperties.S3_TAG_ICEBERG_NAMESPACE)));
+ }
}