[
https://issues.apache.org/jira/browse/GOBBLIN-1657?focusedWorklogId=779217&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-779217
]
ASF GitHub Bot logged work on GOBBLIN-1657:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jun/22 19:23
Start Date: 07/Jun/22 19:23
Worklog Time Spent: 10m
Work Description: vikrambohra commented on code in PR #3517:
URL: https://github.com/apache/gobblin/pull/3517#discussion_r891629386
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -790,28 +789,33 @@ public void flush(String dbName, String tableName) throws
IOException {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+ String topic = props.get(TOPIC_NAME_KEY);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
if (tableMetadata.completenessEnabled) {
- String topicName = props.get(TOPIC_NAME_KEY);
- if(topicName == null) {
- log.error(String.format("Not performing audit check. %s is null.
Please set as table property of %s.%s",
- TOPIC_NAME_KEY, dbName, tableName));
- } else {
- long newCompletenessWatermark =
- computeCompletenessWatermark(topicName,
tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
- if(newCompletenessWatermark >
tableMetadata.prevCompletenessWatermark) {
- log.info(String.format("Updating %s for %s.%s to %s",
COMPLETION_WATERMARK_KEY, dbName, tableName, newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_KEY,
String.valueOf(newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
- tableMetadata.newCompletenessWatermark =
newCompletenessWatermark;
- }
- }
+ checkAndUpdateCompletenessWatermark(tableMetadata, topic,
tableMetadata.datePartitions, props);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
+ // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
+ // The logic is to check the next window from previous completion
watermark and update the watermark if there are no audit counts
Review Comment:
This case should happen only after a long downtime. Even then watermark
should catchup quickly as we will be flushing every minute. And if there is a
append file operation in between then we would catchup more aggressively
In regular scenario. We will update watermark on the next flush once current
time is the next hour of current watermark.
Issue Time Tracking
-------------------
Worklog Id: (was: 779217)
Time Spent: 1.5h (was: 1h 20m)
> Update completion watermark on change_property in IcebergMetadataWriter
> ------------------------------------------------------------------------
>
> Key: GOBBLIN-1657
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1657
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vikram Bohra
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> There are quiet topics where data can be generated quiet infrequently causing
> the completion watermark to lag. Also, a commit operation could miss
> completion watermark update if there is a lag in kafka audit.
> To fix above issues:
> # Added logic to check audit count on change_property operation type for the
> next window of current watermark
> # If counts match or an empty map is returned then consider audit count as
> complete and update the watermark.
> Added a config in KafkaAuditVerifier to return complete if found an empty map
> (with no counts from audit system)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)