This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 951809d43 [GOBBLIN-1864] Catch exceptions if specific watermark
calculation fails (#3728)
951809d43 is described below
commit 951809d4327df564d423ee0f3073ff192f7a9f18
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Aug 1 13:40:54 2023 -0700
[GOBBLIN-1864] Catch exceptions if specific watermark calculation fails
(#3728)
* Catch exceptions if specific watermark calculation fails
* Add null handling for watermark map
---
.../verifier/KafkaAuditCountVerifier.java | 12 +++++++----
.../verifier/KafkaAuditCountVerifierTest.java | 24 ++++++++++++++++++++++
.../writer/CompletenessWatermarkUpdater.java | 4 ++--
3 files changed, 34 insertions(+), 6 deletions(-)
diff --git
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index 77d89ff31..f43c72250 100644
---
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.completeness.verifier;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -125,10 +126,13 @@ public class KafkaAuditCountVerifier {
countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
Map<CompletenessType, Boolean> result = new HashMap<>();
- result.put(CompletenessType.ClassicCompleteness,
calculateCompleteness(datasetName, beginInMillis, endInMillis,
- CompletenessType.ClassicCompleteness, countsByTier) > threshold);
- result.put(CompletenessType.TotalCountCompleteness,
calculateCompleteness(datasetName, beginInMillis, endInMillis,
- CompletenessType.TotalCountCompleteness, countsByTier) > threshold);
+ Arrays.stream(CompletenessType.values()).forEach(type -> {
+ try {
+ result.put(type, calculateCompleteness(datasetName, beginInMillis,
endInMillis, type, countsByTier) > threshold);
+ } catch (IOException e) {
+ log.error("Failed to calculate completeness for type " + type, e);
+ }
+ });
return result;
}
diff --git
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index 3e7140f41..ab667dc1f 100644
---
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -135,4 +135,28 @@ public class KafkaAuditCountVerifierTest {
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
}
+
+ public void testOneCountFailed() throws IOException {
+ final String topic = "testTopic";
+ State props = new State();
+ props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+ props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+ props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS,
TOTAL_COUNT_REFERENCE_TIERS);
+ props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+ props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
+ TestAuditClient client = new TestAuditClient(props);
+ KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props,
client);
+
+ // Missing total count tier which will throw exception
+ client.setTierCounts(ImmutableMap.of(
+ SOURCE_TIER, 999L,
+ REFERENCE_TIERS, 1000L
+ ));
+
+ // Classic completeness is still returned, but total is missing
+ Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+ Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+
.containsKey(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
+ }
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
index b022ac339..355e84f72 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/CompletenessWatermarkUpdater.java
@@ -230,7 +230,7 @@ public class CompletenessWatermarkUpdater {
@Override
protected void
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean>
results,
ZonedDateTime timestampDT) {
- if
(!results.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness)) {
+ if
(!results.getOrDefault(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness,
false)) {
return;
}
@@ -261,7 +261,7 @@ public class CompletenessWatermarkUpdater {
@Override
protected void
computeAndUpdateInternal(Map<KafkaAuditCountVerifier.CompletenessType, Boolean>
results,
ZonedDateTime timestampDT) {
- if
(!results.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness))
{
+ if
(!results.getOrDefault(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness,
false)) {
return;
}