ZihanLi58 commented on a change in pull request #3426:
URL: https://github.com/apache/gobblin/pull/3426#discussion_r751810712



##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -1025,11 +1031,13 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> 
recordEnvelope, Map<Stri
                 Optional.of(currentOffset - 1);
           }
           
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());

Review comment:
       We want to set those only when low water mark is not set (which means 
this is the first time we compute this table since last failure/ restart), so 
we can move those lines inside the block of line 1028?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -1078,6 +1086,10 @@ public void close() throws IOException {
     long newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
     SortedSet<ZonedDateTime> datePartitions = new 
TreeSet<>(Collections.reverseOrder());
 
+    @Setter
+    String dbName;

Review comment:
       Why this two arg needed here?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -284,9 +287,8 @@ private Long getAndPersistCurrentWatermark(TableIdentifier 
tid, String topicPart
    * information increases the memory footprints, therefore we would like to 
flush them eagerly).
    */
   public void write(GobblinMetadataChangeEvent gmce, Map<String, 
Collection<HiveSpec>> newSpecsMap,
-      Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) 
throws IOException {
-    TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(), 
tableSpec.getTable().getTableName());
-    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata());
+      Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec, 
TableMetadata tableMetadata) throws IOException {

Review comment:
       We can directly parse tid instead of tableMetadata here?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -684,7 +687,8 @@ protected void addFiles(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<
         Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString());
         StructLike partition = getIcebergPartitionVal(hiveSpecs, 
file.getFilePath(), partitionSpec);
 
-        if(tableMetadata.completenessEnabled && gmce.getOperationType() == 
OperationType.add_files) {
+        
if(this.newPartitionTableWhitelistBlacklist.acceptTable(tableMetadata.dbName, 
tableMetadata.tableName)
+            && gmce.getOperationType() == OperationType.add_files) {
           tableMetadata.prevCompletenessWatermark = 
Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,

Review comment:
       What will happen if it's not add_files operation? i.e. what will happen 
for compaction?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to