[
https://issues.apache.org/jira/browse/GOBBLIN-1838?focusedWorklogId=863765&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-863765
]
ASF GitHub Bot logged work on GOBBLIN-1838:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Jun/23 17:53
Start Date: 05/Jun/23 17:53
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1218364957
##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -151,6 +160,49 @@ private double getCompletenessPercentage(String
datasetName, long beginInMillis,
return percent;
}
+ private double getTotalCountCompletenessPercentage(String datasetName, long
beginInMillis, long endInMillis) throws IOException {
+ Preconditions.checkNotNull(this.totalCountRefTiers);
+
+ Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
+ validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier,
this.srcTier, this.totalCountRefTiers);
+ if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
Review Comment:
Same here
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -846,6 +863,18 @@ public void flush(String dbName, String tableName) throws
IOException {
log.info(String.format("Need valid watermark, current watermark is
%s, Not checking kafka audit for %s",
tableMetadata.completionWatermark, topicName));
}
+
+ // Deal with total count completion watermark.
+ if (tableMetadata.totalCountCompletionWatermark >
DEFAULT_COMPLETION_WATERMARK) {
Review Comment:
Same here, a bunch of duplicate code as lines 854 to 862, can we try to
reduce the duplication here?
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -962,13 +1017,27 @@ private long computeCompletenessWatermark(String
catalogDbTableName, String topi
&& TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 0) {
long timestampMillis = timestampDT.toInstant().toEpochMilli();
ZonedDateTime auditCountCheckLowerBoundDT =
TimeIterator.dec(timestampDT, granularity, 1);
- if (auditCountVerifier.get().isComplete(topicName,
- auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
timestampMillis)) {
+ if (!isTotalCountCompletenessWatermark &&
auditCountVerifier.get().isComplete(
+ topicName,
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
completionWatermark = timestampMillis;
// Also persist the watermark into State object to share this with
other MetadataWriters
// we enforce ourselves to always use lower-cased table name here
String catalogDbTableNameLowerCased =
catalogDbTableName.toLowerCase(Locale.ROOT);
-
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
catalogDbTableNameLowerCased), completionWatermark);
+ this.state.setProp(
+ String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
catalogDbTableNameLowerCased),
+ completionWatermark);
+ break;
+ }
+
+ if (isTotalCountCompletenessWatermark &&
auditCountVerifier.get().isTotalCountComplete(
+ topicName,
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+ completionWatermark = timestampMillis;
+ // Also persist the watermark into State object to share this with
other MetadataWriters
+ // we enforce ourselves to always use lower-cased table name here
+ String catalogDbTableNameLowerCased =
catalogDbTableName.toLowerCase(Locale.ROOT);
+ this.state.setProp(
Review Comment:
Should we merge the two if blocks? Seems like the only difference is the
completeness property name, maybe worth considering removing duplicate code
here?
##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -120,27 +140,16 @@ public boolean isComplete(String datasetName, long
beginInMillis, long endInMill
*/
private double getCompletenessPercentage(String datasetName, long
beginInMillis, long endInMillis) throws IOException {
Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
- log.info(String.format("Audit counts map for %s for range [%s,%s]",
datasetName, beginInMillis, endInMillis));
- countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+ validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier,
this.srcTier, this.refTiers);
if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
Review Comment:
If countsByTier is empty, we will not reach this block at all?
Issue Time Tracking
-------------------
Worklog Id: (was: 863765)
Time Spent: 0.5h (was: 20m)
> Introduce total count based completion watermark
> ------------------------------------------------
>
> Key: GOBBLIN-1838
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1838
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Andy Jiang
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Currently the completion watermark is determined according to the
> completeness percentage: "max of srcCount/refCount, for each refTier".
> This change introduces a new "total count based completion watermark", which
> is determined by a new completeness percentage: "srcCount / sum of all
> refCount, for each refTier".
--
This message was sent by Atlassian Jira
(v8.20.10#820010)