This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 183bc5c44 [GOBBLIN-1862] Allow reference count to be 0 but no less
(#3725)
183bc5c44 is described below
commit 183bc5c44550b8ca888d589e849c3fb7dad8e90f
Author: Andy Jiang <[email protected]>
AuthorDate: Tue Aug 1 16:58:56 2023 -0700
[GOBBLIN-1862] Allow reference count to be 0 but no less (#3725)
* Remove equal for refCount for when Kafka reports 0 counts
* Add unit test check for validation if refCount is equal to 0
* Remove unused package
* Add new test for classicCompleteness
* Add comment
---
.../completeness/verifier/KafkaAuditCountVerifier.java | 7 +++++--
.../completeness/verifier/KafkaAuditCountVerifierTest.java | 11 +++++++++++
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index f43c72250..0b91b9734 100644
---
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -221,8 +221,11 @@ public class KafkaAuditCountVerifier {
throw new IOException(String.format("Reference tier %s audit count
cannot be retrieved for dataset %s between %s and %s", refTier, datasetName,
beginInMillis, endInMillis));
}
long refCount = countsByTier.get(refTier);
- if(refCount <= 0) {
- throw new IOException(String.format("Reference tier %s count cannot be
less than or equal to zero", refTier));
+ if (refCount == 0) {
+ // If count in refTier is 0, it will be assumed that the data for that
hour is completed and move the watermark forward.
+ log.warn(String.format("Reference tier %s audit count is reported to
be zero", refCount));
+ } else if (refCount < 0) {
+ throw new IOException(String.format("Reference tier %s count cannot be
less than zero", refTier));
}
}
}
diff --git
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index ab667dc1f..d17775353 100644
---
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -134,6 +134,17 @@ public class KafkaAuditCountVerifierTest {
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+
+ // Check validation tiers for exceptions
+ client.setTierCounts(
+ ImmutableMap.of(
+ SOURCE_TIER, 990L,
+ REFERENCE_TIERS, 0L,
+ TOTAL_COUNT_REF_TIER_0, 0L,
+ TOTAL_COUNT_REF_TIER_1, 0L
+ ));
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L,
0L).get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L,
0L).get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
}
public void testOneCountFailed() throws IOException {