[
https://issues.apache.org/jira/browse/GOBBLIN-1838?focusedWorklogId=865124&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865124
]
ASF GitHub Bot logged work on GOBBLIN-1838:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Jun/23 08:40
Start Date: 13/Jun/23 08:40
Worklog Time Spent: 10m
Work Description: wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1227756280
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -836,15 +851,10 @@ public void flush(String dbName, String tableName) throws
IOException {
// The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
- if (tableMetadata.completionWatermark >
DEFAULT_COMPLETION_WATERMARK) {
- log.info(String.format("Checking kafka audit for %s on
change_property ", topicName));
- SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
- ZonedDateTime dtAtBeginningOfHour =
ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
- timestamps.add(dtAtBeginningOfHour);
- checkAndUpdateCompletenessWatermark(tableMetadata, topicName,
timestamps, props);
- } else {
- log.info(String.format("Need valid watermark, current watermark is
%s, Not checking kafka audit for %s",
- tableMetadata.completionWatermark, topicName));
+ updateWatermarkWithEmptyFilesRegistered(topicName, tableMetadata,
props, false);
+
+ if (tableMetadata.totalCountCompletenessEnabled) {
+ updateWatermarkWithEmptyFilesRegistered(topicName, tableMetadata,
props, true);
Review Comment:
Fixed
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -891,94 +901,45 @@ public void flush(String dbName, String tableName) throws
IOException {
}
}
- @Override
- public void reset(String dbName, String tableName) throws IOException {
- this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+ private AbstractCompletenessWatermarkUpdater getWatermarkUpdater(String
topicName, TableMetadata tableMetadata,
+ Map<String, String> propsToUpdate, boolean isTotalCountCompleteness) {
+ return isTotalCountCompleteness
Review Comment:
Fixed.
Issue Time Tracking
-------------------
Worklog Id: (was: 865124)
Time Spent: 1h 10m (was: 1h)
> Introduce total count based completion watermark
> ------------------------------------------------
>
> Key: GOBBLIN-1838
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1838
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Andy Jiang
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Currently the completion watermark is determined according to the
> completeness percentage: "max of srcCount/refCount, for each refTier".
> This change introduces a new "total count based completion watermark", which
> is determined by a new completeness percentage: "srcCount / sum of all
> refCount, for each refTier".
--
This message was sent by Atlassian Jira
(v8.20.10#820010)