This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8198404ec Fix bug with total count watermark whitelist (#3724)
8198404ec is described below
commit 8198404ecacc919eefabf22307ab8abf1b97e885
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Jul 26 12:29:09 2023 -0700
Fix bug with total count watermark whitelist (#3724)
---
.../org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 19bc805eb..d614e2524 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -861,7 +861,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
// The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
- updateWatermarkWithNoFilesRegistered(topicName, tableMetadata,
props);
+ updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
}
//Set high waterMark
@@ -925,14 +926,14 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
private void updateWatermarkWithNoFilesRegistered(String topicName,
TableMetadata tableMetadata,
- Map<String, String> propsToUpdate) {
+ Map<String, String> propsToUpdate, boolean
includeTotalCountCompletionWatermark) {
if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
log.info(String.format("Checking kafka audit for %s on change_property
", topicName));
SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
ZonedDateTime dtAtBeginningOfHour =
ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
timestamps.add(dtAtBeginningOfHour);
- getWatermarkUpdater(topicName, tableMetadata,
propsToUpdate).run(timestamps, true);
+ getWatermarkUpdater(topicName, tableMetadata,
propsToUpdate).run(timestamps, includeTotalCountCompletionWatermark);
} else {
log.info(String.format("Need valid watermark, current watermark is %s,
Not checking kafka audit for %s",
tableMetadata.completionWatermark, topicName));