ZihanLi58 commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1218364957


##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -151,6 +160,49 @@ private double getCompletenessPercentage(String 
datasetName, long beginInMillis,
     return percent;
   }
 
+  private double getTotalCountCompletenessPercentage(String datasetName, long 
beginInMillis, long endInMillis) throws IOException {
+    Preconditions.checkNotNull(this.totalCountRefTiers);
+
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, 
beginInMillis, endInMillis);
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, 
this.srcTier, this.totalCountRefTiers);
+    if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   Same here



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -846,6 +863,18 @@ public void flush(String dbName, String tableName) throws 
IOException {
             log.info(String.format("Need valid watermark, current watermark is 
%s, Not checking kafka audit for %s",
                 tableMetadata.completionWatermark, topicName));
           }
+
+          // Deal with total count completion watermark.
+          if (tableMetadata.totalCountCompletionWatermark > 
DEFAULT_COMPLETION_WATERMARK) {

Review Comment:
   Same here, a bunch of duplicate code as lines 854 to 862, can we try to 
reduce the duplication here?



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -962,13 +1017,27 @@ private long computeCompletenessWatermark(String 
catalogDbTableName, String topi
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
           ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
-          if (auditCountVerifier.get().isComplete(topicName,
-              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
+          if (!isTotalCountCompletenessWatermark && 
auditCountVerifier.get().isComplete(
+              topicName, 
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
             // Also persist the watermark into State object to share this with 
other MetadataWriters
             // we enforce ourselves to always use lower-cased table name here
             String catalogDbTableNameLowerCased = 
catalogDbTableName.toLowerCase(Locale.ROOT);
-            
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, 
catalogDbTableNameLowerCased), completionWatermark);
+            this.state.setProp(
+                String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, 
catalogDbTableNameLowerCased),
+                completionWatermark);
+            break;
+          }
+
+          if (isTotalCountCompletenessWatermark && 
auditCountVerifier.get().isTotalCountComplete(
+              topicName, 
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+            completionWatermark = timestampMillis;
+            // Also persist the watermark into State object to share this with 
other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String catalogDbTableNameLowerCased = 
catalogDbTableName.toLowerCase(Locale.ROOT);
+            this.state.setProp(

Review Comment:
   Should we merge the two if blocks? Seems like the only difference is the 
completeness property name, maybe worth considering removing duplicate code 
here?



##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -120,27 +140,16 @@ public boolean isComplete(String datasetName, long 
beginInMillis, long endInMill
    */
   private double getCompletenessPercentage(String datasetName, long 
beginInMillis, long endInMillis) throws IOException {
     Map<String, Long> countsByTier = getTierAndCount(datasetName, 
beginInMillis, endInMillis);
-    log.info(String.format("Audit counts map for %s for range [%s,%s]", 
datasetName, beginInMillis, endInMillis));
-    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, 
this.srcTier, this.refTiers);
     if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   If countsByTier is empty, we will not reach this block at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to