[ 
https://issues.apache.org/jira/browse/GOBBLIN-1838?focusedWorklogId=863765&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-863765
 ]

ASF GitHub Bot logged work on GOBBLIN-1838:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Jun/23 17:53
            Start Date: 05/Jun/23 17:53
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1218364957


##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -151,6 +160,49 @@ private double getCompletenessPercentage(String 
datasetName, long beginInMillis,
     return percent;
   }
 
+  private double getTotalCountCompletenessPercentage(String datasetName, long 
beginInMillis, long endInMillis) throws IOException {
+    Preconditions.checkNotNull(this.totalCountRefTiers);
+
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, 
beginInMillis, endInMillis);
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, 
this.srcTier, this.totalCountRefTiers);
+    if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   Same here



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -846,6 +863,18 @@ public void flush(String dbName, String tableName) throws 
IOException {
             log.info(String.format("Need valid watermark, current watermark is 
%s, Not checking kafka audit for %s",
                 tableMetadata.completionWatermark, topicName));
           }
+
+          // Deal with total count completion watermark.
+          if (tableMetadata.totalCountCompletionWatermark > 
DEFAULT_COMPLETION_WATERMARK) {

Review Comment:
   Same here, a bunch of duplicate code as lines 854 to 862, can we try to 
reduce the duplication here?



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -962,13 +1017,27 @@ private long computeCompletenessWatermark(String 
catalogDbTableName, String topi
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
           ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
-          if (auditCountVerifier.get().isComplete(topicName,
-              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
+          if (!isTotalCountCompletenessWatermark && 
auditCountVerifier.get().isComplete(
+              topicName, 
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
             // Also persist the watermark into State object to share this with 
other MetadataWriters
             // we enforce ourselves to always use lower-cased table name here
             String catalogDbTableNameLowerCased = 
catalogDbTableName.toLowerCase(Locale.ROOT);
-            
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, 
catalogDbTableNameLowerCased), completionWatermark);
+            this.state.setProp(
+                String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, 
catalogDbTableNameLowerCased),
+                completionWatermark);
+            break;
+          }
+
+          if (isTotalCountCompletenessWatermark && 
auditCountVerifier.get().isTotalCountComplete(
+              topicName, 
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+            completionWatermark = timestampMillis;
+            // Also persist the watermark into State object to share this with 
other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String catalogDbTableNameLowerCased = 
catalogDbTableName.toLowerCase(Locale.ROOT);
+            this.state.setProp(

Review Comment:
   Should we merge the two if blocks? Seems like the only difference is the 
completeness property name, maybe worth considering removing duplicate code 
here?



##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -120,27 +140,16 @@ public boolean isComplete(String datasetName, long 
beginInMillis, long endInMill
    */
   private double getCompletenessPercentage(String datasetName, long 
beginInMillis, long endInMillis) throws IOException {
     Map<String, Long> countsByTier = getTierAndCount(datasetName, 
beginInMillis, endInMillis);
-    log.info(String.format("Audit counts map for %s for range [%s,%s]", 
datasetName, beginInMillis, endInMillis));
-    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, 
this.srcTier, this.refTiers);
     if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   If countsByTier is empty, we will not reach this block at all?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 863765)
    Time Spent: 0.5h  (was: 20m)

> Introduce total count based completion watermark
> ------------------------------------------------
>
>                 Key: GOBBLIN-1838
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1838
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Andy Jiang
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently the completion watermark is determined according to the 
> completeness percentage: "max of srcCount/refCount, for each refTier".
> This change introduces a new "total count based completion watermark", which 
> is determined by a new completeness percentage: "srcCount / sum of all 
> refCount, for each refTier".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to