[ 
https://issues.apache.org/jira/browse/GOBBLIN-1657?focusedWorklogId=779199&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-779199
 ]

ASF GitHub Bot logged work on GOBBLIN-1657:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jun/22 17:35
            Start Date: 07/Jun/22 17:35
    Worklog Time Spent: 10m 
      Work Description: sv2000 commented on code in PR #3517:
URL: https://github.com/apache/gobblin/pull/3517#discussion_r891531637


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -790,28 +789,33 @@ public void flush(String dbName, String tableName) throws 
IOException {
         Transaction transaction = tableMetadata.transaction.get();
         Map<String, String> props = tableMetadata.newProperties.or(
             
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+        String topic = props.get(TOPIC_NAME_KEY);
         if (tableMetadata.appendFiles.isPresent()) {
           tableMetadata.appendFiles.get().commit();
           if (tableMetadata.completenessEnabled) {
-            String topicName = props.get(TOPIC_NAME_KEY);
-            if(topicName == null) {
-              log.error(String.format("Not performing audit check. %s is null. 
Please set as table property of %s.%s",
-                  TOPIC_NAME_KEY, dbName, tableName));
-            } else {
-              long newCompletenessWatermark =
-                  computeCompletenessWatermark(topicName, 
tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
-              if(newCompletenessWatermark > 
tableMetadata.prevCompletenessWatermark) {
-                log.info(String.format("Updating %s for %s.%s to %s", 
COMPLETION_WATERMARK_KEY, dbName, tableName, newCompletenessWatermark));
-                props.put(COMPLETION_WATERMARK_KEY, 
String.valueOf(newCompletenessWatermark));
-                props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
-                tableMetadata.newCompletenessWatermark = 
newCompletenessWatermark;
-              }
-            }
+            checkAndUpdateCompletenessWatermark(tableMetadata, topic, 
tableMetadata.datePartitions, props);
           }
         }
         if (tableMetadata.deleteFiles.isPresent()) {
           tableMetadata.deleteFiles.get().commit();
         }
+        // Check and update completion watermark when there are no files to be 
registered, typically for quiet topics
+        // The logic is to check the next window from previous completion 
watermark and update the watermark if there are no audit counts

Review Comment:
   So what happens if the current time is 08:00 and current watermark is 02:00? 
Will we only check for the completeness for [02:00, 03:00]? Shouldn't we check 
for completeness of the entire range [02:00, 08:00]? 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 779199)
    Time Spent: 1h 20m  (was: 1h 10m)

> Update completion watermark on change_property in IcebergMetadataWriter 
> ------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1657
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1657
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vikram Bohra
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> There are quiet topics where data can be generated quiet infrequently causing 
> the completion watermark to lag. Also, a commit operation could miss 
> completion watermark update if there is a lag in kafka audit.
> To fix above issues:
>  # Added logic to check audit count on change_property operation type for the 
> next window of current watermark 
>  # If counts match or an empty map is returned then consider audit count as 
> complete and update the watermark.
> Added a config in KafkaAuditVerifier to return complete if found an empty map 
> (with no counts from audit system) 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to