This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8198404ec Fix bug with total count watermark whitelist (#3724)
8198404ec is described below

commit 8198404ecacc919eefabf22307ab8abf1b97e885
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Jul 26 12:29:09 2023 -0700

    Fix bug with total count watermark whitelist (#3724)
---
 .../org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java   | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 19bc805eb..d614e2524 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -861,7 +861,8 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         // The logic is to check the window [currentHour-1,currentHour] and 
update the watermark if there are no audit counts
         if(!tableMetadata.appendFiles.isPresent() && 
!tableMetadata.deleteFiles.isPresent()
             && tableMetadata.completenessEnabled) {
-          updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, 
props);
+          updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+              tableMetadata.totalCountCompletenessEnabled);
         }
 
         //Set high waterMark
@@ -925,14 +926,14 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   }
 
   private void updateWatermarkWithNoFilesRegistered(String topicName, 
TableMetadata tableMetadata,
-      Map<String, String> propsToUpdate) {
+      Map<String, String> propsToUpdate, boolean 
includeTotalCountCompletionWatermark) {
     if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
       log.info(String.format("Checking kafka audit for %s on change_property 
", topicName));
       SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
       ZonedDateTime dtAtBeginningOfHour = 
ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
       timestamps.add(dtAtBeginningOfHour);
 
-      getWatermarkUpdater(topicName, tableMetadata, 
propsToUpdate).run(timestamps, true);
+      getWatermarkUpdater(topicName, tableMetadata, 
propsToUpdate).run(timestamps, includeTotalCountCompletionWatermark);
     } else {
       log.info(String.format("Need valid watermark, current watermark is %s, 
Not checking kafka audit for %s",
           tableMetadata.completionWatermark, topicName));

Reply via email to