This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a030866 [GOBBLIN-1406] Make KafkaIngestionHealth check use auto-tuned
consumer…
a030866 is described below
commit a030866737eca462c7eec9eb11483ca09ec29e83
Author: suvasude <[email protected]>
AuthorDate: Tue Mar 16 12:47:43 2021 -0700
[GOBBLIN-1406] Make KafkaIngestionHealth check use auto-tuned consumer…
Closes #3240 from sv2000/autotuneConsumeRate
---
.../source/extractor/extract/kafka/KafkaIngestionHealthCheck.java | 5 ++---
.../kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java | 4 ++++
.../extractor/extract/kafka/KafkaIngestionHealthCheckTest.java | 5 +++--
.../workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java | 8 ++++++++
4 files changed, 17 insertions(+), 5 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
index 9737591..377d488 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
@@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.commit.CommitStep;
+import
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
import org.apache.gobblin.util.eventbus.EventBusFactory;
@@ -40,13 +41,11 @@ public class KafkaIngestionHealthCheck implements
CommitStep {
public static final String
KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY =
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "slidingWindow.size";
public static final String
KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY =
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "ingestionLatency.minutes";
public static final String
KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY =
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "consumptionRate.dropOffFraction";
- public static final String
KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY =
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "expected.consumptionRateMbps";
public static final String
KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY =
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "increasingLatencyCheckEnabled";
public static final int
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE = 3;
public static final long
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES= 15;
public static final double
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION = 0.7;
- public static final double
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS = 10.0;
private static final boolean
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED = true;
private final Config config;
@@ -65,7 +64,7 @@ public class KafkaIngestionHealthCheck implements CommitStep {
this.slidingWindowSize = ConfigUtils.getInt(config,
KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY,
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE);
this.ingestionLatencyThresholdMinutes = ConfigUtils.getLong(config,
KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY,
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES);
this.consumptionRateDropOffFraction = ConfigUtils.getDouble(config,
KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY,
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION);
- this.expectedConsumptionRate = ConfigUtils.getDouble(config,
KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY,
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS);
+ this.expectedConsumptionRate = ConfigUtils.getDouble(config,
KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY);
this.increasingLatencyCheckEnabled = ConfigUtils.getBoolean(config,
KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY,
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED);
this.ingestionLatencies = EvictingQueue.create(this.slidingWindowSize);
this.consumptionRateMBps = EvictingQueue.create(this.slidingWindowSize);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index d380dcf..17a7089 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -187,6 +187,10 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
containerCapacity =
getContainerCapacityForTopic(capacitiesByTopic.get(topic),
this.containerCapacityComputationStrategy);
log.info("Container capacity for topic {}: {}", topic,
containerCapacity);
}
+ //Add CONTAINER_CAPACITY into each workunit. Useful when
KafkaIngestionHealthCheck is enabled.
+ for (WorkUnit workUnit: workUnitsForTopic) {
+ workUnit.setProp(CONTAINER_CAPACITY_KEY, containerCapacity);
+ }
double estimatedDataSizeForTopic =
calcTotalEstSizeForTopic(workUnitsForTopic);
int previousSize = mwuGroups.size();
if (estimatedDataSizeForTopic < containerCapacity) {
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
index 3268164..4f69c6a 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
@@ -32,6 +32,7 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker;
import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
import org.apache.gobblin.util.eventbus.EventBusFactory;
@@ -56,7 +57,7 @@ public class KafkaIngestionHealthCheckTest {
public void testExecuteIncreasingLatencyCheckEnabled()
throws InterruptedException {
this.countDownLatch = new CountDownLatch(1);
- Config config =
ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY,
+ Config config =
ConfigFactory.empty().withValue(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
ConfigValueFactory.fromAnyRef(5))
.withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY,
ConfigValueFactory.fromAnyRef(5));
@@ -114,7 +115,7 @@ public class KafkaIngestionHealthCheckTest {
throws InterruptedException {
this.countDownLatch = new CountDownLatch(1);
- Config config =
ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY,
+ Config config =
ConfigFactory.empty().withValue(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
ConfigValueFactory.fromAnyRef(5))
.withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY,
ConfigValueFactory.fromAnyRef(5))
.withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY,
ConfigValueFactory.fromAnyRef(false));
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
index eb676d4..b44df38 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
@@ -69,8 +69,10 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME),
"topic1");
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
0)), 1);
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
1)), 2);
+
Assert.assertEquals(workUnits.get(0).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
2, 0.001);
Assert.assertEquals(workUnits.get(1).getProp(KafkaSource.TOPIC_NAME),
"topic1");
Assert.assertEquals(workUnits.get(1).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
0)), 3);
+
Assert.assertEquals(workUnits.get(1).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
2, 0.001);
}
/**
@@ -95,14 +97,20 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME),
"topic1");
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
0)), 1);
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
1)), 2);
+
Assert.assertEquals(workUnits.get(0).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
2, 0.001);
+
Assert.assertEquals(workUnits.get(1).getProp(KafkaSource.TOPIC_NAME),
"topic1");
Assert.assertEquals(workUnits.get(1).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
0)), 3);
+
Assert.assertEquals(workUnits.get(1).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
2, 0.001);
Assert.assertEquals(workUnits.get(2).getProp(KafkaSource.TOPIC_NAME),
"topic2");
Assert.assertEquals(workUnits.get(2).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
0)), 1);
Assert.assertEquals(workUnits.get(2).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
1)), 2);
+
Assert.assertEquals(workUnits.get(2).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
2, 0.001);
+
Assert.assertEquals(workUnits.get(3).getProp(KafkaSource.TOPIC_NAME),
"topic2");
Assert.assertEquals(workUnits.get(3).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
0)), 3);
+
Assert.assertEquals(workUnits.get(3).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
2, 0.001);
}