This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 65ddfedc4 GOBBLIN-1933]Change the logic in completeness verifier to 
support multi reference tier (#3806)
65ddfedc4 is described below

commit 65ddfedc4062992f1e34d104c941dc36d2d9ed8d
Author: Zihan Li <[email protected]>
AuthorDate: Fri Oct 27 19:24:52 2023 -0700

    GOBBLIN-1933]Change the logic in completeness verifier to support multi 
reference tier (#3806)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1933] Change the logic in completeness verifier to support multi 
reference tier
    
    * add uite test
    
    * fix typo
    
    * change the javadoc
    
    * change the javadoc
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../verifier/KafkaAuditCountVerifier.java          | 12 ++--
 .../verifier/KafkaAuditCountVerifierTest.java      | 69 ++++++++++++++++++----
 .../extract/kafka/KafkaIngestionHealthCheck.java   |  4 +-
 .../packer/KafkaTopicGroupingWorkUnitPacker.java   |  6 +-
 4 files changed, 73 insertions(+), 18 deletions(-)

diff --git 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index 3f10344a2..4be9653b0 100644
--- 
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++ 
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -156,12 +156,13 @@ public class KafkaAuditCountVerifier {
 
   /**
    * Compare source tier against reference tiers. For each reference tier, 
calculates percentage by srcCount/refCount.
-   *
+   * We will return the lowest value, which, in other words, we will wait 
until src tier catches up to all reference
+   * tiers (upto 99.9%) to mark that hour as completed.
    * @param datasetName A dataset short name like 'PageViewEvent'
    * @param beginInMillis Unix timestamp in milliseconds
    * @param endInMillis Unix timestamp in milliseconds
    *
-   * @return The highest percentage value
+   * @return The lowest percentage value
    */
   private double calculateClassicCompleteness(String datasetName, long 
beginInMillis, long endInMillis,
       Map<String, Long> countsByTier) throws IOException {
@@ -171,6 +172,7 @@ public class KafkaAuditCountVerifier {
     for (String refTier: this.refTiers) {
       long refCount = countsByTier.get(refTier);
       long srcCount = countsByTier.get(this.srcTier);
+      double tmpPercent;
 
       /*
         If we have a case where an audit map is returned, however, one of the 
source tiers on another fabric is 0,
@@ -178,9 +180,11 @@ public class KafkaAuditCountVerifier {
         This needs to be added as a non-zero double value divided by 0 is 
infinity, but 0 divided by 0 is NaN.
        */
       if (srcCount == 0 && refCount == 0) {
-        return 1.0;
+        tmpPercent = 1;
+      } else {
+        tmpPercent =  (double) srcCount / (double) refCount;
       }
-      percent = Double.max(percent, (double) srcCount / (double) refCount);
+      percent = percent < 0 ? tmpPercent : Double.min(percent, tmpPercent);
     }
 
     if (percent < 0) {
diff --git 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index 09fa3b0db..e989fa7f8 100644
--- 
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++ 
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -31,7 +31,9 @@ import org.apache.gobblin.configuration.State;
 public class KafkaAuditCountVerifierTest {
 
   public static final String SOURCE_TIER = "gobblin";
-  public static final String REFERENCE_TIERS = "producer";
+  public static final String REFERENCE_TIER = "producer";
+  public static final String REFERENCE_TIER_1 = "producer_reference";
+  public static final String REFERENCE_TIERS = REFERENCE_TIER + "," + 
REFERENCE_TIER_1;
 
   public static final String TOTAL_COUNT_REF_TIER_0 = "producer_0";
   public static final String TOTAL_COUNT_REF_TIER_1 = "producer_1";
@@ -50,7 +52,8 @@ public class KafkaAuditCountVerifierTest {
     // All complete
     client.setTierCounts(ImmutableMap.of(
         SOURCE_TIER, 1000L,
-        REFERENCE_TIERS,   1000L
+        REFERENCE_TIER, 1000L,
+        REFERENCE_TIER_1, 1000L
     ));
     // Default threshold
     Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
@@ -59,7 +62,8 @@ public class KafkaAuditCountVerifierTest {
     // 99.999 % complete
     client.setTierCounts(ImmutableMap.of(
         SOURCE_TIER, 999L,
-        REFERENCE_TIERS,   1000L
+        REFERENCE_TIER, 1000L,
+        REFERENCE_TIER_1, 1000L
     ));
     Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
         .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
@@ -67,7 +71,8 @@ public class KafkaAuditCountVerifierTest {
     // <= 99% complete
     client.setTierCounts(ImmutableMap.of(
         SOURCE_TIER, 990L,
-        REFERENCE_TIERS,   1000L
+        REFERENCE_TIER, 1000L,
+        REFERENCE_TIER_1, 1000L
     ));
     Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
         .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
@@ -86,7 +91,8 @@ public class KafkaAuditCountVerifierTest {
     // All complete
     client.setTierCounts(ImmutableMap.of(
         SOURCE_TIER, 1000L,
-        REFERENCE_TIERS, 1000L,
+        REFERENCE_TIER, 1000L,
+        REFERENCE_TIER_1, 1000L,
         TOTAL_COUNT_REF_TIER_0, 600L,
         TOTAL_COUNT_REF_TIER_1, 400L
     ));
@@ -97,7 +103,8 @@ public class KafkaAuditCountVerifierTest {
     // 99.999 % complete
     client.setTierCounts(ImmutableMap.of(
         SOURCE_TIER, 999L,
-        REFERENCE_TIERS, 1000L,
+        REFERENCE_TIER, 1000L,
+        REFERENCE_TIER_1, 1000L,
         TOTAL_COUNT_REF_TIER_0, 600L,
         TOTAL_COUNT_REF_TIER_1, 400L
     ));
@@ -107,7 +114,8 @@ public class KafkaAuditCountVerifierTest {
     // <= 99% complete
     client.setTierCounts(ImmutableMap.of(
         SOURCE_TIER, 990L,
-        REFERENCE_TIERS, 1000L,
+        REFERENCE_TIER, 1000L,
+        REFERENCE_TIER_1, 1000L,
         TOTAL_COUNT_REF_TIER_0, 600L,
         TOTAL_COUNT_REF_TIER_1, 400L
     ));
@@ -140,7 +148,8 @@ public class KafkaAuditCountVerifierTest {
     client.setTierCounts(
         ImmutableMap.of(
             SOURCE_TIER, 990L,
-            REFERENCE_TIERS, 0L,
+            REFERENCE_TIER, 0L,
+            REFERENCE_TIER_1, 0L,
             TOTAL_COUNT_REF_TIER_0, 0L,
             TOTAL_COUNT_REF_TIER_1, 0L
         ));
@@ -153,7 +162,8 @@ public class KafkaAuditCountVerifierTest {
     client.setTierCounts(
         ImmutableMap.of(
             SOURCE_TIER, 0L,
-            REFERENCE_TIERS, 0L,
+            REFERENCE_TIER, 0L,
+            REFERENCE_TIER_1, 0L,
             TOTAL_COUNT_REF_TIER_0, 0L,
             TOTAL_COUNT_REF_TIER_1, 0L
         ));
@@ -175,7 +185,8 @@ public class KafkaAuditCountVerifierTest {
     // Missing total count tier which will throw exception
     client.setTierCounts(ImmutableMap.of(
         SOURCE_TIER, 999L,
-        REFERENCE_TIERS, 1000L
+        REFERENCE_TIER, 1000L,
+        REFERENCE_TIER_1, 1000L
     ));
 
     // Classic completeness is still returned, but total is missing
@@ -184,4 +195,42 @@ public class KafkaAuditCountVerifierTest {
     Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
         
.containsKey(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
   }
+
+  public void testDifferentValueInReferenceTier() throws IOException {
+    final String topic = "testTopic";
+    State props = new State();
+    props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+    props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+    props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, 
TOTAL_COUNT_REFERENCE_TIERS);
+    props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+    props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
+    TestAuditClient client = new TestAuditClient(props);
+    KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, 
client);
+
+    // Different value in reference tier
+    client.setTierCounts(ImmutableMap.of(
+        SOURCE_TIER, 999L,
+        REFERENCE_TIER, 1000L,
+        REFERENCE_TIER_1, 2000L
+    ));
+
+    // Classic completeness is fail as 999/2000 < 99.9%
+    Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+
+    // Different value in reference tier and one tier has 0 in count
+    client.setTierCounts(ImmutableMap.of(
+        SOURCE_TIER, 999L,
+        REFERENCE_TIER, 0L,
+        REFERENCE_TIER_1, 2000L
+    ));
+
+    // Classic completeness is fail as 999/2000 < 99.9%
+    Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+        .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+
+
+
+  }
+
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
index 377d4887f..e4c0dc7c6 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
@@ -137,6 +137,8 @@ public class KafkaIngestionHealthCheck implements 
CommitStep {
   public void execute() {
     
this.ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.MINUTES));
     this.consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps());
+    double avgConsumptionRate = getMaxConsumptionRate();
+    log.info("Avg. Consumption Rate = {} MBps, Target Consumption rate = {} 
MBps", avgConsumptionRate, this.expectedConsumptionRate);
     if (ingestionLatencies.size() < this.slidingWindowSize) {
       log.info("SUCCESS: Num observations: {} smaller than {}", 
ingestionLatencies.size(), this.slidingWindowSize);
       return;
@@ -146,8 +148,6 @@ public class KafkaIngestionHealthCheck implements 
CommitStep {
       log.info("SUCCESS: Ingestion Latencies = {}, Ingestion Latency 
Threshold: {}", this.ingestionLatencies.toString(), 
this.ingestionLatencyThresholdMinutes);
       return;
     }
-
-    double avgConsumptionRate = getMaxConsumptionRate();
     if (avgConsumptionRate > this.consumptionRateDropOffFraction * 
this.expectedConsumptionRate) {
       log.info("SUCCESS: Avg. Consumption Rate = {} MBps, Target Consumption 
rate = {} MBps", avgConsumptionRate, this.expectedConsumptionRate);
       return;
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 62cb18447..1bdc2d8bb 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -70,7 +70,8 @@ import static 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
 @Slf4j
 public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
   public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
-  private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
+  public static final String DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY = 
GOBBLIN_KAFKA_PREFIX + "default.num.topic.partitions.per.container";
+  private static final int DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER 
= 10;
 
   //A global configuration for container capacity. The container capacity 
refers to the peak rate (in MB/s) that a
   //single JVM can consume from Kafka for a single topic and controls the 
number of partitions of a topic that will be
@@ -198,6 +199,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
       }
       //Add CONTAINER_CAPACITY into each workunit. Useful when 
KafkaIngestionHealthCheck is enabled.
       for (WorkUnit workUnit: workUnitsForTopic) {
+        //todo: check whether it's set already to respect the topic specific 
capacity from user input properties
         workUnit.setProp(CONTAINER_CAPACITY_KEY, containerCapacity);
       }
       double estimatedDataSizeForTopic = 
calcTotalEstSizeForTopic(workUnitsForTopic);
@@ -293,7 +295,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
 
   private Double getDefaultWorkUnitSize() {
     return 
state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
-        KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / 
DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER;
+        KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / 
state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, 
DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
   }
 
   /**

Reply via email to