[
https://issues.apache.org/jira/browse/GOBBLIN-1657?focusedWorklogId=778797&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-778797
]
ASF GitHub Bot logged work on GOBBLIN-1657:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jun/22 00:32
Start Date: 07/Jun/22 00:32
Worklog Time Spent: 10m
Work Description: sv2000 commented on code in PR #3517:
URL: https://github.com/apache/gobblin/pull/3517#discussion_r890666692
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -790,28 +789,33 @@ public void flush(String dbName, String tableName) throws
IOException {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+ String topic = props.get(TOPIC_NAME_KEY);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
if (tableMetadata.completenessEnabled) {
- String topicName = props.get(TOPIC_NAME_KEY);
- if(topicName == null) {
- log.error(String.format("Not performing audit check. %s is null.
Please set as table property of %s.%s",
- TOPIC_NAME_KEY, dbName, tableName));
- } else {
- long newCompletenessWatermark =
- computeCompletenessWatermark(topicName,
tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
- if(newCompletenessWatermark >
tableMetadata.prevCompletenessWatermark) {
- log.info(String.format("Updating %s for %s.%s to %s",
COMPLETION_WATERMARK_KEY, dbName, tableName, newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_KEY,
String.valueOf(newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
- tableMetadata.newCompletenessWatermark =
newCompletenessWatermark;
- }
- }
+ checkAndUpdateCompletenessWatermark(tableMetadata, topic,
tableMetadata.datePartitions, props);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
+ // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
+ // The logic is to check the next window from previous completion
watermark and update the watermark if there are no audit counts
Review Comment:
Is the next window the starting time for completeness checks or will we only
check for the completeness of the next window?
##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -45,6 +45,8 @@ public class KafkaAuditCountVerifier {
public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX +
"reference.tiers";
public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
private static final double DEFAULT_THRESHOLD = 0.999;
+ public static final String COMPLETE_ON_NO_COUNTS = COMPLETENESS_PREFIX +
"complete.on.no.counts";
Review Comment:
Do we need a separate config? Shouldn't we always move completion watermark
for quiet topics?
Issue Time Tracking
-------------------
Worklog Id: (was: 778797)
Time Spent: 0.5h (was: 20m)
> Update completion watermark on change_property in IcebergMetadataWriter
> ------------------------------------------------------------------------
>
> Key: GOBBLIN-1657
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1657
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vikram Bohra
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> There are quiet topics where data can be generated quiet infrequently causing
> the completion watermark to lag. Also, a commit operation could miss
> completion watermark update if there is a lag in kafka audit.
> To fix above issues:
> # Added logic to check audit count on change_property operation type for the
> next window of current watermark
> # If counts match or an empty map is returned then consider audit count as
> complete and update the watermark.
> Added a config in KafkaAuditVerifier to return complete if found an empty map
> (with no counts from audit system)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)