This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6830087c2 Fix case when srcTier is 0 and refTier is 0, but non empty 
map (#3760)
6830087c2 is described below

commit 6830087c230cf0fb529fbe11637def8ce031a18f
Author: Andy Jiang <[email protected]>
AuthorDate: Wed Sep 6 11:43:03 2023 -0700

    Fix case when srcTier is 0 and refTier is 0, but non empty map (#3760)
---
 .../completeness/verifier/KafkaAuditCountVerifier.java  | 17 +++++++++++++++++
 .../verifier/KafkaAuditCountVerifierTest.java           | 16 +++++++++++++++-
 .../iceberg/writer/CompletenessWatermarkUpdater.java    |  8 ++++----
 3 files changed, 36 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index 0b91b9734..3f10344a2 100644
--- 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++ 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -171,6 +171,15 @@ public class KafkaAuditCountVerifier {
     for (String refTier: this.refTiers) {
       long refCount = countsByTier.get(refTier);
       long srcCount = countsByTier.get(this.srcTier);
+
+      /*
+        If we have a case where an audit map is returned, however, one of the 
source tiers on another fabric is 0,
+        and the reference tiers from Kafka is reported to be 0, we can say 
that this hour is complete.
+        This needs to be added as a non-zero double value divided by 0 is 
infinity, but 0 divided by 0 is NaN.
+       */
+      if (srcCount == 0 && refCount == 0) {
+        return 1.0;
+      }
       percent = Double.max(percent, (double) srcCount / (double) refCount);
     }
 
@@ -202,6 +211,14 @@ public class KafkaAuditCountVerifier {
         .stream()
         .mapToLong(countsByTier::get)
         .sum();
+    /*
+      If we have a case where an audit map is returned, however, one of the 
source tiers on another fabric is 0,
+      and the sum of the reference tiers from Kafka is reported to be 0, we 
can say that this hour is complete.
+      This needs to be added as a non-zero double value divided by 0 is 
infinity, but 0 divided by 0 is NaN.
+     */
+    if (srcCount == 0 && totalRefCount == 0) {
+      return 1.0;
+    }
     double percent = Double.max(-1, (double) srcCount / (double) 
totalRefCount);
     if (percent < 0) {
       throw new IOException("Cannot calculate total count completion 
percentage");
diff --git 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index d17775353..09fa3b0db 100644
--- 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++ 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -135,7 +135,8 @@ public class KafkaAuditCountVerifierTest {
     Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
         .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
 
-    // Check validation tiers for exceptions
+    // Check validation for exceptions if reference tier is 0 count. Checking 
for division of x / 0 case where x != 0.
+    // Update watermark if source reports counts but reference counts is 0 due 
to Kafka not reporting counts
     client.setTierCounts(
         ImmutableMap.of(
             SOURCE_TIER, 990L,
@@ -145,6 +146,19 @@ public class KafkaAuditCountVerifierTest {
         ));
     Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 
0L).get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
     Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 
0L).get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+
+    // Check validation for exceptions if both source and reference tier is 0 
count. Checking for division of 0 / 0 case.
+    // If both source and reference tiers are 0, we assume we are complete and 
update the watermark
+    // This is to check the case when one source cluster is reporting counts 
but not the other source cluster. Resulting in a non-empty map but having 0 for 
srcCount
+    client.setTierCounts(
+        ImmutableMap.of(
+            SOURCE_TIER, 0L,
+            REFERENCE_TIERS, 0L,
+            TOTAL_COUNT_REF_TIER_0, 0L,
+            TOTAL_COUNT_REF_TIER_1, 0L
+        ));
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 
0L).get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 
0L).get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
   }
 
   public void testOneCountFailed() throws IOException {
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
index 355e84f72..46e10eca6 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
@@ -242,8 +242,8 @@ public class CompletenessWatermarkUpdater {
           updatedWatermark);
 
       if (updatedWatermark > this.previousWatermark) {
-        log.info(String.format("Updating %s for %s to %s", 
COMPLETION_WATERMARK_KEY,
-            this.tableMetadata.table.get().name(), updatedWatermark));
+        log.info(String.format("Updating %s for %s from %s to %s", 
COMPLETION_WATERMARK_KEY,
+            this.tableMetadata.table.get().name(), this.previousWatermark, 
updatedWatermark));
         this.propsToUpdate.put(COMPLETION_WATERMARK_KEY, 
String.valueOf(updatedWatermark));
         this.propsToUpdate.put(COMPLETION_WATERMARK_TIMEZONE_KEY, 
this.timeZone);
 
@@ -273,8 +273,8 @@ public class CompletenessWatermarkUpdater {
           updatedWatermark);
 
       if (updatedWatermark > previousWatermark) {
-        log.info(String.format("Updating %s for %s to %s", 
TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
-            this.tableMetadata.table.get().name(), updatedWatermark));
+        log.info(String.format("Updating %s for %s from %s to %s", 
TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
+            this.tableMetadata.table.get().name(), previousWatermark, 
updatedWatermark));
         this.propsToUpdate.put(TOTAL_COUNT_COMPLETION_WATERMARK_KEY, 
String.valueOf(updatedWatermark));
         tableMetadata.totalCountCompletionWatermark = updatedWatermark;
       }

Reply via email to