This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4107ae6  [GOBBLIN-1574] Added whitelist for iceberg tables to add new 
partitio… (#3426)
4107ae6 is described below

commit 4107ae649d8314ddc17a47b30110194ab9653dec
Author: vbohra <[email protected]>
AuthorDate: Thu Nov 18 15:54:14 2021 -0800

    [GOBBLIN-1574] Added whitelist for iceberg tables to add new partitio… 
(#3426)
    
    * [GOBBLIN-1574] Added whitelist for iceberg tables to add new partition 
column
    
    * fix to failing test case
    
    * Updated IncebergMetadataWriterTest to blacklist the test table from 
non-completeness tests
    
    * moved dataset name update in tablemetadata
    
    * Added newPartition checks in Table Metadata
    
    * Fixed test case to include new_parition_enabled
    
    Co-authored-by: Vikram Bohra <[email protected]>
---
 .../iceberg/writer/IcebergMetadataWriter.java      | 36 ++++++++++++++--------
 .../writer/IcebergMetadataWriterConfigKeys.java    |  5 +++
 .../iceberg/writer/IcebergMetadataWriterTest.java  |  1 +
 3 files changed, 29 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index db01793..df5739e 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -165,6 +165,8 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   private final DateTimeFormatter HOURLY_DATEPARTITION_FORMAT;
   private final String newPartitionColumn;
   private final String newPartitionColumnType;
+  private final boolean newPartitionEnabled;
+  private final WhitelistBlacklist newPartitionTableWhitelistBlacklist;
   private Optional<KafkaAuditCountVerifier> auditCountVerifier;
   private final String auditCheckGranularity;
 
@@ -227,6 +229,9 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     this.auditCountVerifier = Optional.fromNullable(this.completenessEnabled ? 
new KafkaAuditCountVerifier(state) : null);
     this.newPartitionColumn = state.getProp(NEW_PARTITION_KEY, 
DEFAULT_NEW_PARTITION);
     this.newPartitionColumnType = state.getProp(NEW_PARTITION_TYPE_KEY, 
DEFAULT_PARTITION_COLUMN_TYPE);
+    this.newPartitionEnabled = 
state.getPropAsBoolean(ICEBERG_NEW_PARTITION_ENABLED, 
DEFAULT_ICEBERG_NEW_PARTITION_ENABLED);
+    this.newPartitionTableWhitelistBlacklist = new 
WhitelistBlacklist(state.getProp(ICEBERG_NEW_PARTITION_WHITELIST, ""),
+        state.getProp(ICEBERG_NEW_PARTITION_BLACKLIST, ""));
     this.auditCheckGranularity = state.getProp(AUDIT_CHECK_GRANULARITY, 
DEFAULT_AUDIT_CHECK_GRANULARITY);
   }
 
@@ -475,6 +480,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     if(!table.spec().fields().stream().anyMatch(x -> 
x.name().equalsIgnoreCase(fieldName))) {
       table.updateSpec().addField(fieldName).commit();
     }
+    table.refresh();
     return table;
   }
 
@@ -684,7 +690,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString());
         StructLike partition = getIcebergPartitionVal(hiveSpecs, 
file.getFilePath(), partitionSpec);
 
-        if(tableMetadata.completenessEnabled && gmce.getOperationType() == 
OperationType.add_files) {
+        if(tableMetadata.newPartitionColumnEnabled && gmce.getOperationType() 
== OperationType.add_files) {
           tableMetadata.prevCompletenessWatermark = 
Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
               String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
           // Assumes first partition value to be partitioned by date
@@ -705,6 +711,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   /**
    * 1. Add "late" partition column to iceberg table if not exists
    * 2. compute "late" partition value based on datepartition and completion 
watermark
+   * 3. Default to late=0 if completion watermark check is disabled
    * @param table
    * @param tableMetadata
    * @param hivePartition
@@ -713,10 +720,9 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    */
   private StructLike addLatePartitionValueToIcebergTable(Table table, 
TableMetadata tableMetadata, HivePartition hivePartition, String datepartition) 
{
     table = addPartitionToIcebergTable(table, newPartitionColumn, 
newPartitionColumnType);
-    table.refresh();
     PartitionSpec partitionSpec = table.spec();
     long prevCompletenessWatermark = tableMetadata.prevCompletenessWatermark;
-    int late = isLate(datepartition, prevCompletenessWatermark);
+    int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition, 
prevCompletenessWatermark);
     List<String> partitionValues = new ArrayList<>(hivePartition.getValues());
     partitionValues.add(String.valueOf(late));
     return IcebergUtils.getPartition(partitionSpec.partitionType(), 
partitionValues);
@@ -785,7 +791,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
             
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
         if (tableMetadata.appendFiles.isPresent()) {
           tableMetadata.appendFiles.get().commit();
-          if(tableMetadata.completenessEnabled) {
+          if (tableMetadata.completenessEnabled) {
             String topicName = props.get(TOPIC_NAME_KEY);
             if(topicName == null) {
               log.error(String.format("Not performing audit check. %s is null. 
Please set as table property of %s.%s",
@@ -1011,8 +1017,10 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
       GenericRecord genericRecord = recordEnvelope.getRecord();
       GobblinMetadataChangeEvent gmce =
           (GobblinMetadataChangeEvent) 
SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
-      if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), 
tableSpec.getTable().getTableName())) {
-        TableIdentifier tid = 
TableIdentifier.of(tableSpec.getTable().getDbName(), 
tableSpec.getTable().getTableName());
+      String dbName = tableSpec.getTable().getDbName();
+      String tableName = tableSpec.getTable().getTableName();
+      if (whitelistBlacklist.acceptTable(dbName, tableName)) {
+        TableIdentifier tid = TableIdentifier.of(dbName, tableName);
         String topicPartition = tableTopicPartitionMap.computeIfAbsent(tid,
             t -> recordEnvelope.getWatermark().getSource());
         Long currentWatermark = getAndPersistCurrentWatermark(tid, 
topicPartition);
@@ -1021,12 +1029,14 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         if (currentOffset > currentWatermark) {
           if (!tableMetadataMap.computeIfAbsent(tid, t -> new 
TableMetadata()).lowWatermark.isPresent()) {
             //This means we haven't register this table or met some error 
before, we need to reset the low watermark
-            tableMetadataMap.computeIfAbsent(tid, t -> new 
TableMetadata()).lowWatermark =
-                Optional.of(currentOffset - 1);
-          }
-          
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
-          if(this.completenessEnabled && 
this.completenessWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
 tableSpec.getTable().getTableName())) {
-            tableMetadataMap.get(tid).setCompletenessEnabled(true);
+            tableMetadataMap.get(tid).lowWatermark = Optional.of(currentOffset 
- 1);
+            
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
+            if (this.newPartitionEnabled && 
this.newPartitionTableWhitelistBlacklist.acceptTable(dbName, tableName)) {
+              tableMetadataMap.get(tid).newPartitionColumnEnabled = true;
+              if (this.completenessEnabled && 
this.completenessWhitelistBlacklist.acceptTable(dbName, tableName)) {
+                tableMetadataMap.get(tid).completenessEnabled = true;
+              }
+            }
           }
 
           write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
@@ -1080,8 +1090,8 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
 
     @Setter
     String datasetName;
-    @Setter
     boolean completenessEnabled;
+    boolean newPartitionColumnEnabled;
 
     Cache<CharSequence, String> addedFiles = CacheBuilder.newBuilder()
         .expireAfterAccess(conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME, 
DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
index 65c4e57..c846270 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
@@ -36,5 +36,10 @@ public class IcebergMetadataWriterConfigKeys {
   public static final String TOPIC_NAME_KEY = "topic.name";
   public static final String AUDIT_CHECK_GRANULARITY = 
"iceberg.completeness.audit.check.granularity";
   public static final String DEFAULT_AUDIT_CHECK_GRANULARITY = "HOUR";
+  public static final String ICEBERG_NEW_PARTITION_ENABLED = 
"iceberg.new.partition.enabled";
+  public static final boolean DEFAULT_ICEBERG_NEW_PARTITION_ENABLED = false;
+  public static final String ICEBERG_NEW_PARTITION_WHITELIST = 
"iceberg.new.partition.whitelist";
+  public static final String ICEBERG_NEW_PARTITION_BLACKLIST = 
"iceberg.new.partition.blacklist";
+
 
 }
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index a1d9b18..44c5514 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -179,6 +179,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
 
   private State getStateWithCompletenessConfig() {
     State state = getState();
+    state.setProp(ICEBERG_NEW_PARTITION_ENABLED, true);
     state.setProp(ICEBERG_COMPLETENESS_ENABLED, true);
     state.setProp(NEW_PARTITION_KEY, "late");
     state.setProp(NEW_PARTITION_TYPE_KEY, "int");

Reply via email to