wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1222735916


##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -120,27 +140,16 @@ public boolean isComplete(String datasetName, long 
beginInMillis, long endInMill
    */
   private double getCompletenessPercentage(String datasetName, long 
beginInMillis, long endInMillis) throws IOException {
     Map<String, Long> countsByTier = getTierAndCount(datasetName, 
beginInMillis, endInMillis);
-    log.info(String.format("Audit counts map for %s for range [%s,%s]", 
datasetName, beginInMillis, endInMillis));
-    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, 
this.srcTier, this.refTiers);
     if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   Good catch. Fixed it and added a new unit test.



##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -151,6 +160,49 @@ private double getCompletenessPercentage(String 
datasetName, long beginInMillis,
     return percent;
   }
 
+  private double getTotalCountCompletenessPercentage(String datasetName, long 
beginInMillis, long endInMillis) throws IOException {
+    Preconditions.checkNotNull(this.totalCountRefTiers);
+
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, 
beginInMillis, endInMillis);
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, 
this.srcTier, this.totalCountRefTiers);
+    if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   Refactored.



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -962,13 +1017,27 @@ private long computeCompletenessWatermark(String 
catalogDbTableName, String topi
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
           ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
-          if (auditCountVerifier.get().isComplete(topicName,
-              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
+          if (!isTotalCountCompletenessWatermark && 
auditCountVerifier.get().isComplete(
+              topicName, 
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
             // Also persist the watermark into State object to share this with 
other MetadataWriters
             // we enforce ourselves to always use lower-cased table name here
             String catalogDbTableNameLowerCased = 
catalogDbTableName.toLowerCase(Locale.ROOT);
-            
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, 
catalogDbTableNameLowerCased), completionWatermark);
+            this.state.setProp(
+                String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, 
catalogDbTableNameLowerCased),
+                completionWatermark);
+            break;
+          }
+
+          if (isTotalCountCompletenessWatermark && 
auditCountVerifier.get().isTotalCountComplete(
+              topicName, 
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+            completionWatermark = timestampMillis;
+            // Also persist the watermark into State object to share this with 
other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String catalogDbTableNameLowerCased = 
catalogDbTableName.toLowerCase(Locale.ROOT);
+            this.state.setProp(

Review Comment:
   Refactored.



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -846,6 +863,18 @@ public void flush(String dbName, String tableName) throws 
IOException {
             log.info(String.format("Need valid watermark, current watermark is 
%s, Not checking kafka audit for %s",
                 tableMetadata.completionWatermark, topicName));
           }
+
+          // Deal with total count completion watermark.
+          if (tableMetadata.totalCountCompletionWatermark > 
DEFAULT_COMPLETION_WATERMARK) {

Review Comment:
   Refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to