This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 65ddfedc4 GOBBLIN-1933]Change the logic in completeness verifier to
support multi reference tier (#3806)
65ddfedc4 is described below
commit 65ddfedc4062992f1e34d104c941dc36d2d9ed8d
Author: Zihan Li <[email protected]>
AuthorDate: Fri Oct 27 19:24:52 2023 -0700
GOBBLIN-1933]Change the logic in completeness verifier to support multi
reference tier (#3806)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1933] Change the logic in completeness verifier to support multi
reference tier
* add uite test
* fix typo
* change the javadoc
* change the javadoc
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../verifier/KafkaAuditCountVerifier.java | 12 ++--
.../verifier/KafkaAuditCountVerifierTest.java | 69 ++++++++++++++++++----
.../extract/kafka/KafkaIngestionHealthCheck.java | 4 +-
.../packer/KafkaTopicGroupingWorkUnitPacker.java | 6 +-
4 files changed, 73 insertions(+), 18 deletions(-)
diff --git
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index 3f10344a2..4be9653b0 100644
---
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -156,12 +156,13 @@ public class KafkaAuditCountVerifier {
/**
* Compare source tier against reference tiers. For each reference tier,
calculates percentage by srcCount/refCount.
- *
+ * We will return the lowest value, which, in other words, we will wait
until src tier catches up to all reference
+ * tiers (upto 99.9%) to mark that hour as completed.
* @param datasetName A dataset short name like 'PageViewEvent'
* @param beginInMillis Unix timestamp in milliseconds
* @param endInMillis Unix timestamp in milliseconds
*
- * @return The highest percentage value
+ * @return The lowest percentage value
*/
private double calculateClassicCompleteness(String datasetName, long
beginInMillis, long endInMillis,
Map<String, Long> countsByTier) throws IOException {
@@ -171,6 +172,7 @@ public class KafkaAuditCountVerifier {
for (String refTier: this.refTiers) {
long refCount = countsByTier.get(refTier);
long srcCount = countsByTier.get(this.srcTier);
+ double tmpPercent;
/*
If we have a case where an audit map is returned, however, one of the
source tiers on another fabric is 0,
@@ -178,9 +180,11 @@ public class KafkaAuditCountVerifier {
This needs to be added as a non-zero double value divided by 0 is
infinity, but 0 divided by 0 is NaN.
*/
if (srcCount == 0 && refCount == 0) {
- return 1.0;
+ tmpPercent = 1;
+ } else {
+ tmpPercent = (double) srcCount / (double) refCount;
}
- percent = Double.max(percent, (double) srcCount / (double) refCount);
+ percent = percent < 0 ? tmpPercent : Double.min(percent, tmpPercent);
}
if (percent < 0) {
diff --git
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
index 09fa3b0db..e989fa7f8 100644
---
a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
+++
b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -31,7 +31,9 @@ import org.apache.gobblin.configuration.State;
public class KafkaAuditCountVerifierTest {
public static final String SOURCE_TIER = "gobblin";
- public static final String REFERENCE_TIERS = "producer";
+ public static final String REFERENCE_TIER = "producer";
+ public static final String REFERENCE_TIER_1 = "producer_reference";
+ public static final String REFERENCE_TIERS = REFERENCE_TIER + "," +
REFERENCE_TIER_1;
public static final String TOTAL_COUNT_REF_TIER_0 = "producer_0";
public static final String TOTAL_COUNT_REF_TIER_1 = "producer_1";
@@ -50,7 +52,8 @@ public class KafkaAuditCountVerifierTest {
// All complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 1000L,
- REFERENCE_TIERS, 1000L
+ REFERENCE_TIER, 1000L,
+ REFERENCE_TIER_1, 1000L
));
// Default threshold
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
@@ -59,7 +62,8 @@ public class KafkaAuditCountVerifierTest {
// 99.999 % complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
- REFERENCE_TIERS, 1000L
+ REFERENCE_TIER, 1000L,
+ REFERENCE_TIER_1, 1000L
));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
@@ -67,7 +71,8 @@ public class KafkaAuditCountVerifierTest {
// <= 99% complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 990L,
- REFERENCE_TIERS, 1000L
+ REFERENCE_TIER, 1000L,
+ REFERENCE_TIER_1, 1000L
));
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
@@ -86,7 +91,8 @@ public class KafkaAuditCountVerifierTest {
// All complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 1000L,
- REFERENCE_TIERS, 1000L,
+ REFERENCE_TIER, 1000L,
+ REFERENCE_TIER_1, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
@@ -97,7 +103,8 @@ public class KafkaAuditCountVerifierTest {
// 99.999 % complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
- REFERENCE_TIERS, 1000L,
+ REFERENCE_TIER, 1000L,
+ REFERENCE_TIER_1, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
@@ -107,7 +114,8 @@ public class KafkaAuditCountVerifierTest {
// <= 99% complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 990L,
- REFERENCE_TIERS, 1000L,
+ REFERENCE_TIER, 1000L,
+ REFERENCE_TIER_1, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
@@ -140,7 +148,8 @@ public class KafkaAuditCountVerifierTest {
client.setTierCounts(
ImmutableMap.of(
SOURCE_TIER, 990L,
- REFERENCE_TIERS, 0L,
+ REFERENCE_TIER, 0L,
+ REFERENCE_TIER_1, 0L,
TOTAL_COUNT_REF_TIER_0, 0L,
TOTAL_COUNT_REF_TIER_1, 0L
));
@@ -153,7 +162,8 @@ public class KafkaAuditCountVerifierTest {
client.setTierCounts(
ImmutableMap.of(
SOURCE_TIER, 0L,
- REFERENCE_TIERS, 0L,
+ REFERENCE_TIER, 0L,
+ REFERENCE_TIER_1, 0L,
TOTAL_COUNT_REF_TIER_0, 0L,
TOTAL_COUNT_REF_TIER_1, 0L
));
@@ -175,7 +185,8 @@ public class KafkaAuditCountVerifierTest {
// Missing total count tier which will throw exception
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
- REFERENCE_TIERS, 1000L
+ REFERENCE_TIER, 1000L,
+ REFERENCE_TIER_1, 1000L
));
// Classic completeness is still returned, but total is missing
@@ -184,4 +195,42 @@ public class KafkaAuditCountVerifierTest {
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.containsKey(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
}
+
+ public void testDifferentValueInReferenceTier() throws IOException {
+ final String topic = "testTopic";
+ State props = new State();
+ props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+ props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+ props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS,
TOTAL_COUNT_REFERENCE_TIERS);
+ props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+ props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
+ TestAuditClient client = new TestAuditClient(props);
+ KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props,
client);
+
+ // Different value in reference tier
+ client.setTierCounts(ImmutableMap.of(
+ SOURCE_TIER, 999L,
+ REFERENCE_TIER, 1000L,
+ REFERENCE_TIER_1, 2000L
+ ));
+
+ // Classic completeness is fail as 999/2000 < 99.9%
+ Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+
+ // Different value in reference tier and one tier has 0 in count
+ client.setTierCounts(ImmutableMap.of(
+ SOURCE_TIER, 999L,
+ REFERENCE_TIER, 0L,
+ REFERENCE_TIER_1, 2000L
+ ));
+
+ // Classic completeness is fail as 999/2000 < 99.9%
+ Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
+ .get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
+
+
+
+ }
+
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
index 377d4887f..e4c0dc7c6 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
@@ -137,6 +137,8 @@ public class KafkaIngestionHealthCheck implements
CommitStep {
public void execute() {
this.ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.MINUTES));
this.consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps());
+ double avgConsumptionRate = getMaxConsumptionRate();
+ log.info("Avg. Consumption Rate = {} MBps, Target Consumption rate = {}
MBps", avgConsumptionRate, this.expectedConsumptionRate);
if (ingestionLatencies.size() < this.slidingWindowSize) {
log.info("SUCCESS: Num observations: {} smaller than {}",
ingestionLatencies.size(), this.slidingWindowSize);
return;
@@ -146,8 +148,6 @@ public class KafkaIngestionHealthCheck implements
CommitStep {
log.info("SUCCESS: Ingestion Latencies = {}, Ingestion Latency
Threshold: {}", this.ingestionLatencies.toString(),
this.ingestionLatencyThresholdMinutes);
return;
}
-
- double avgConsumptionRate = getMaxConsumptionRate();
if (avgConsumptionRate > this.consumptionRateDropOffFraction *
this.expectedConsumptionRate) {
log.info("SUCCESS: Avg. Consumption Rate = {} MBps, Target Consumption
rate = {} MBps", avgConsumptionRate, this.expectedConsumptionRate);
return;
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 62cb18447..1bdc2d8bb 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -70,7 +70,8 @@ import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
@Slf4j
public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
- private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
+ public static final String DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY =
GOBBLIN_KAFKA_PREFIX + "default.num.topic.partitions.per.container";
+ private static final int DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER
= 10;
//A global configuration for container capacity. The container capacity
refers to the peak rate (in MB/s) that a
//single JVM can consume from Kafka for a single topic and controls the
number of partitions of a topic that will be
@@ -198,6 +199,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
}
//Add CONTAINER_CAPACITY into each workunit. Useful when
KafkaIngestionHealthCheck is enabled.
for (WorkUnit workUnit: workUnitsForTopic) {
+ //todo: check whether it's set already to respect the topic specific
capacity from user input properties
workUnit.setProp(CONTAINER_CAPACITY_KEY, containerCapacity);
}
double estimatedDataSizeForTopic =
calcTotalEstSizeForTopic(workUnitsForTopic);
@@ -293,7 +295,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
private Double getDefaultWorkUnitSize() {
return
state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
- KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) /
DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER;
+ KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) /
state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY,
DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
}
/**