This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 951809d43 [GOBBLIN-1864] Catch exceptions if specific watermark 
calculation fails (#3728)
951809d43 is described below

commit 951809d4327df564d423ee0f3073ff192f7a9f18
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Aug 1 13:40:54 2023 -0700

    [GOBBLIN-1864] Catch exceptions if specific watermark calculation fails 
(#3728)
    
    * Catch exceptions if specific watermark calculation fails
    
    * Add null handling for watermark map
---
 .../verifier/KafkaAuditCountVerifier.java          | 12 +++++++----
 .../verifier/KafkaAuditCountVerifierTest.java      | 24 ++++++++++++++++++++++
 .../writer/CompletenessWatermarkUpdater.java       |  4 ++--
 3 files changed, 34 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index 77d89ff31..f43c72250 100644
--- 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++ 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.completeness.verifier;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -125,10 +126,13 @@ public class KafkaAuditCountVerifier {
     countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
 
     Map<CompletenessType, Boolean> result = new HashMap<>();
-    result.put(CompletenessType.ClassicCompleteness, 
calculateCompleteness(datasetName, beginInMillis, endInMillis,
-        CompletenessType.ClassicCompleteness, countsByTier) > threshold);
-    result.put(CompletenessType.TotalCountCompleteness, 
calculateCompleteness(datasetName, beginInMillis, endInMillis,
-        CompletenessType.TotalCountCompleteness, countsByTier) > threshold);
+    Arrays.stream(CompletenessType.values()).forEach(type -> {
+      try {
+        result.put(type, calculateCompleteness(datasetName, beginInMillis, 
endInMillis, type, countsByTier) > threshold);
+      } catch (IOException e) {
+        log.error("Failed to calculate completeness for type " + type, e);
+      }
+    });
     return result;
   }
 
diff --git 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index 3e7140f41..ab667dc1f 100644
--- 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++ 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -135,4 +135,28 @@ public class KafkaAuditCountVerifierTest {
     Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
         .get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
   }
+
+  public void testOneCountFailed() throws IOException {
+    final String topic = "testTopic";
+    State props = new State();
+    props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+    props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+    props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, 
TOTAL_COUNT_REFERENCE_TIERS);
+    props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+    props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
+    TestAuditClient client = new TestAuditClient(props);
+    KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, 
client);
+
+    // Missing total count tier which will throw exception
+    client.setTierCounts(ImmutableMap.of(
+        SOURCE_TIER, 999L,
+        REFERENCE_TIERS, 1000L
+    ));
+
+    // Classic completeness is still returned, but total is missing
+    Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+    Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+        
.containsKey(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+  }
 }
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
index b022ac339..355e84f72 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
@@ -230,7 +230,7 @@ public class CompletenessWatermarkUpdater {
     @Override
     protected void 
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean> 
results,
         ZonedDateTime timestampDT) {
-      if 
(!results.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness)) {
+      if 
(!results.getOrDefault(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness,
 false)) {
         return;
       }
 
@@ -261,7 +261,7 @@ public class CompletenessWatermarkUpdater {
     @Override
     protected void 
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean> 
results,
         ZonedDateTime timestampDT) {
-      if 
(!results.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness)) 
{
+      if 
(!results.getOrDefault(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness,
 false)) {
         return;
       }
 

Reply via email to