This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 183bc5c44 [GOBBLIN-1862] Allow reference count to be 0 but no less 
(#3725)
183bc5c44 is described below

commit 183bc5c44550b8ca888d589e849c3fb7dad8e90f
Author: Andy Jiang <[email protected]>
AuthorDate: Tue Aug 1 16:58:56 2023 -0700

    [GOBBLIN-1862] Allow reference count to be 0 but no less (#3725)
    
    * Remove equal for refCount for when Kafka reports 0 counts
    
    * Add unit test check for validation if refCount is equal to 0
    
    * Remove unused package
    
    * Add new test for classicCompleteness
    
    * Add comment
---
 .../completeness/verifier/KafkaAuditCountVerifier.java        |  7 +++++--
 .../completeness/verifier/KafkaAuditCountVerifierTest.java    | 11 +++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index f43c72250..0b91b9734 100644
--- 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++ 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -221,8 +221,11 @@ public class KafkaAuditCountVerifier {
         throw new IOException(String.format("Reference tier %s audit count 
cannot be retrieved for dataset %s between %s and %s", refTier, datasetName, 
beginInMillis, endInMillis));
       }
       long refCount = countsByTier.get(refTier);
-      if(refCount <= 0) {
-        throw new IOException(String.format("Reference tier %s count cannot be 
less than or equal to zero", refTier));
+      if (refCount == 0) {
+        // If count in refTier is 0, it will be assumed that the data for that 
hour is completed and move the watermark forward.
+        log.warn(String.format("Reference tier %s audit count is reported to 
be zero", refCount));
+      } else if (refCount < 0) {
+        throw new IOException(String.format("Reference tier %s count cannot be 
less than zero", refTier));
       }
     }
   }
diff --git 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index ab667dc1f..d17775353 100644
--- 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++ 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -134,6 +134,17 @@ public class KafkaAuditCountVerifierTest {
         .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
     Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
         .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+
+    // Check validation tiers for exceptions
+    client.setTierCounts(
+        ImmutableMap.of(
+            SOURCE_TIER, 990L,
+            REFERENCE_TIERS, 0L,
+            TOTAL_COUNT_REF_TIER_0, 0L,
+            TOTAL_COUNT_REF_TIER_1, 0L
+        ));
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 
0L).get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 
0L).get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
   }
 
   public void testOneCountFailed() throws IOException {

Reply via email to