This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 203f5e646 [GOBBLIN-1810] Support general iceberg catalog (support 
configurable behavior for metadata retention policy) (#3680)
203f5e646 is described below

commit 203f5e64696a3a3aa0f89d6e029e2a35929c16fa
Author: Zihan Li <[email protected]>
AuthorDate: Fri Apr 21 14:17:36 2023 -0700

    [GOBBLIN-1810] Support general iceberg catalog (support configurable 
behavior for metadata retention policy) (#3680)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
    
    * Revert "[GOBBLIN-1810] Support general iceberg catalog in 
icebergMetadataWriter"
    
    This reverts commit b0844e8d7740b9eaa21132604f532a964c3f9e52.
    
    * [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
    
    * add java doc
    
    * support configurable behavior to use custom metadata retention policy in 
iceberg writer
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../apache/gobblin/iceberg/writer/IcebergMetadataWriter.java | 12 ++++++------
 .../iceberg/writer/IcebergMetadataWriterConfigKeys.java      |  2 ++
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 27a2629e5..1e88a83c7 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -855,12 +855,12 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         props.put(String.format(GMCE_LOW_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)),
             tableMetadata.lowWatermark.get().toString());
         //Set whether to delete metadata files after commit
-        props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
Boolean.toString(
-            
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
-                
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
-        props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
Integer.toString(
-            conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
-                TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+        if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, 
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
+          props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
Boolean.toString(
+              
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
+          props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
Integer.toString(
+              conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+        }
         //Update schema(commit)
         updateSchema(tableMetadata, props, topicName);
         //Update properties
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
index cd51ca3c6..73c206d5f 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
@@ -41,6 +41,8 @@ public class IcebergMetadataWriterConfigKeys {
   public static final String ICEBERG_NEW_PARTITION_WHITELIST = 
"iceberg.new.partition.whitelist";
   public static final String ICEBERG_NEW_PARTITION_BLACKLIST = 
"iceberg.new.partition.blacklist";
   public static final String STATE_COMPLETION_WATERMARK_KEY_OF_TABLE = 
"completion.watermark.%s";
+  public static final String ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY = 
"iceberg.enable.custom.metadata.retention.policy";
+  public static final boolean 
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY = true;
 
 
 }

Reply via email to