This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6830087c2 Fix case when srcTier is 0 and refTier is 0, but non empty
map (#3760)
6830087c2 is described below
commit 6830087c230cf0fb529fbe11637def8ce031a18f
Author: Andy Jiang <[email protected]>
AuthorDate: Wed Sep 6 11:43:03 2023 -0700
Fix case when srcTier is 0 and refTier is 0, but non empty map (#3760)
---
.../completeness/verifier/KafkaAuditCountVerifier.java | 17 +++++++++++++++++
.../verifier/KafkaAuditCountVerifierTest.java | 16 +++++++++++++++-
.../iceberg/writer/CompletenessWatermarkUpdater.java | 8 ++++----
3 files changed, 36 insertions(+), 5 deletions(-)
diff --git
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index 0b91b9734..3f10344a2 100644
---
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -171,6 +171,15 @@ public class KafkaAuditCountVerifier {
for (String refTier: this.refTiers) {
long refCount = countsByTier.get(refTier);
long srcCount = countsByTier.get(this.srcTier);
+
+ /*
+ If we have a case where an audit map is returned, however, one of the
source tiers on another fabric is 0,
+ and the reference tiers from Kafka is reported to be 0, we can say
that this hour is complete.
+ This needs to be added as a non-zero double value divided by 0 is
infinity, but 0 divided by 0 is NaN.
+ */
+ if (srcCount == 0 && refCount == 0) {
+ return 1.0;
+ }
percent = Double.max(percent, (double) srcCount / (double) refCount);
}
@@ -202,6 +211,14 @@ public class KafkaAuditCountVerifier {
.stream()
.mapToLong(countsByTier::get)
.sum();
+ /*
+ If we have a case where an audit map is returned, however, one of the
source tiers on another fabric is 0,
+ and the sum of the reference tiers from Kafka is reported to be 0, we
can say that this hour is complete.
+ This needs to be added as a non-zero double value divided by 0 is
infinity, but 0 divided by 0 is NaN.
+ */
+ if (srcCount == 0 && totalRefCount == 0) {
+ return 1.0;
+ }
double percent = Double.max(-1, (double) srcCount / (double)
totalRefCount);
if (percent < 0) {
throw new IOException("Cannot calculate total count completion
percentage");
diff --git
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index d17775353..09fa3b0db 100644
---
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -135,7 +135,8 @@ public class KafkaAuditCountVerifierTest {
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
- // Check validation tiers for exceptions
+ // Check validation for exceptions if reference tier is 0 count. Checking
for division of x / 0 case where x != 0.
+ // Update watermark if source reports counts but reference counts is 0 due
to Kafka not reporting counts
client.setTierCounts(
ImmutableMap.of(
SOURCE_TIER, 990L,
@@ -145,6 +146,19 @@ public class KafkaAuditCountVerifierTest {
));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L,
0L).get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L,
0L).get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+
+ // Check validation for exceptions if both source and reference tier is 0
count. Checking for division of 0 / 0 case.
+ // If both source and reference tiers are 0, we assume we are complete and
update the watermark
+ // This is to check the case when one source cluster is reporting counts
but not the other source cluster. Resulting in a non-empty map but having 0 for
srcCount
+ client.setTierCounts(
+ ImmutableMap.of(
+ SOURCE_TIER, 0L,
+ REFERENCE_TIERS, 0L,
+ TOTAL_COUNT_REF_TIER_0, 0L,
+ TOTAL_COUNT_REF_TIER_1, 0L
+ ));
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L,
0L).get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L,
0L).get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
}
public void testOneCountFailed() throws IOException {
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
index 355e84f72..46e10eca6 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
@@ -242,8 +242,8 @@ public class CompletenessWatermarkUpdater {
updatedWatermark);
if (updatedWatermark > this.previousWatermark) {
- log.info(String.format("Updating %s for %s to %s",
COMPLETION_WATERMARK_KEY,
- this.tableMetadata.table.get().name(), updatedWatermark));
+ log.info(String.format("Updating %s for %s from %s to %s",
COMPLETION_WATERMARK_KEY,
+ this.tableMetadata.table.get().name(), this.previousWatermark,
updatedWatermark));
this.propsToUpdate.put(COMPLETION_WATERMARK_KEY,
String.valueOf(updatedWatermark));
this.propsToUpdate.put(COMPLETION_WATERMARK_TIMEZONE_KEY,
this.timeZone);
@@ -273,8 +273,8 @@ public class CompletenessWatermarkUpdater {
updatedWatermark);
if (updatedWatermark > previousWatermark) {
- log.info(String.format("Updating %s for %s to %s",
TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
- this.tableMetadata.table.get().name(), updatedWatermark));
+ log.info(String.format("Updating %s for %s from %s to %s",
TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
+ this.tableMetadata.table.get().name(), previousWatermark,
updatedWatermark));
this.propsToUpdate.put(TOTAL_COUNT_COMPLETION_WATERMARK_KEY,
String.valueOf(updatedWatermark));
tableMetadata.totalCountCompletionWatermark = updatedWatermark;
}