This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a030866  [GOBBLIN-1406] Make KafkaIngestionHealth check use auto-tuned 
consumer…
a030866 is described below

commit a030866737eca462c7eec9eb11483ca09ec29e83
Author: suvasude <[email protected]>
AuthorDate: Tue Mar 16 12:47:43 2021 -0700

    [GOBBLIN-1406] Make KafkaIngestionHealth check use auto-tuned consumer…
    
    Closes #3240 from sv2000/autotuneConsumeRate
---
 .../source/extractor/extract/kafka/KafkaIngestionHealthCheck.java | 5 ++---
 .../kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java   | 4 ++++
 .../extractor/extract/kafka/KafkaIngestionHealthCheckTest.java    | 5 +++--
 .../workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java     | 8 ++++++++
 4 files changed, 17 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
index 9737591..377d488 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
@@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.commit.CommitStep;
+import 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
 import org.apache.gobblin.util.eventbus.EventBusFactory;
@@ -40,13 +41,11 @@ public class KafkaIngestionHealthCheck implements 
CommitStep {
   public static final String 
KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY = 
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "slidingWindow.size";
   public static final String 
KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY = 
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "ingestionLatency.minutes";
   public static final String 
KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY = 
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "consumptionRate.dropOffFraction";
-  public static final String 
KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY = 
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "expected.consumptionRateMbps";
   public static final String 
KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY = 
KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "increasingLatencyCheckEnabled";
 
   public static final int 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE = 3;
   public static final long 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES= 15;
   public static final double 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION = 0.7;
-  public static final double 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS = 10.0;
   private static final boolean 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED = true;
 
   private final Config config;
@@ -65,7 +64,7 @@ public class KafkaIngestionHealthCheck implements CommitStep {
     this.slidingWindowSize = ConfigUtils.getInt(config, 
KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY, 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE);
     this.ingestionLatencyThresholdMinutes = ConfigUtils.getLong(config, 
KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES);
     this.consumptionRateDropOffFraction = ConfigUtils.getDouble(config, 
KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY, 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION);
-    this.expectedConsumptionRate = ConfigUtils.getDouble(config, 
KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY, 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS);
+    this.expectedConsumptionRate = ConfigUtils.getDouble(config, 
KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY, 
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY);
     this.increasingLatencyCheckEnabled = ConfigUtils.getBoolean(config, 
KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, 
DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED);
     this.ingestionLatencies = EvictingQueue.create(this.slidingWindowSize);
     this.consumptionRateMBps = EvictingQueue.create(this.slidingWindowSize);
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index d380dcf..17a7089 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -187,6 +187,10 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
         containerCapacity = 
getContainerCapacityForTopic(capacitiesByTopic.get(topic), 
this.containerCapacityComputationStrategy);
         log.info("Container capacity for topic {}: {}", topic, 
containerCapacity);
       }
+      //Add CONTAINER_CAPACITY into each workunit. Useful when 
KafkaIngestionHealthCheck is enabled.
+      for (WorkUnit workUnit: workUnitsForTopic) {
+        workUnit.setProp(CONTAINER_CAPACITY_KEY, containerCapacity);
+      }
       double estimatedDataSizeForTopic = 
calcTotalEstSizeForTopic(workUnitsForTopic);
       int previousSize = mwuGroups.size();
       if (estimatedDataSizeForTopic < containerCapacity) {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
index 3268164..4f69c6a 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
@@ -32,6 +32,7 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker;
 import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
 import org.apache.gobblin.util.eventbus.EventBusFactory;
 
@@ -56,7 +57,7 @@ public class KafkaIngestionHealthCheckTest {
   public void testExecuteIncreasingLatencyCheckEnabled()
       throws InterruptedException {
     this.countDownLatch = new CountDownLatch(1);
-    Config config = 
ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY,
+    Config config = 
ConfigFactory.empty().withValue(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
         ConfigValueFactory.fromAnyRef(5))
         
.withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY,
 ConfigValueFactory.fromAnyRef(5));
 
@@ -114,7 +115,7 @@ public class KafkaIngestionHealthCheckTest {
       throws InterruptedException {
     this.countDownLatch = new CountDownLatch(1);
 
-    Config config = 
ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY,
+    Config config = 
ConfigFactory.empty().withValue(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
         ConfigValueFactory.fromAnyRef(5))
         
.withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY,
 ConfigValueFactory.fromAnyRef(5))
         
.withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY,
 ConfigValueFactory.fromAnyRef(false));
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
index eb676d4..b44df38 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
@@ -69,8 +69,10 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
     Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), 
"topic1");
     
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 0)), 1);
     
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 1)), 2);
+    
Assert.assertEquals(workUnits.get(0).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
 2, 0.001);
     Assert.assertEquals(workUnits.get(1).getProp(KafkaSource.TOPIC_NAME), 
"topic1");
     
Assert.assertEquals(workUnits.get(1).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 0)), 3);
+    
Assert.assertEquals(workUnits.get(1).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
 2, 0.001);
   }
 
   /**
@@ -95,14 +97,20 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
     Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), 
"topic1");
     
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 0)), 1);
     
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 1)), 2);
+    
Assert.assertEquals(workUnits.get(0).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
 2, 0.001);
+
     Assert.assertEquals(workUnits.get(1).getProp(KafkaSource.TOPIC_NAME), 
"topic1");
     
Assert.assertEquals(workUnits.get(1).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 0)), 3);
+    
Assert.assertEquals(workUnits.get(1).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
 2, 0.001);
 
     Assert.assertEquals(workUnits.get(2).getProp(KafkaSource.TOPIC_NAME), 
"topic2");
     
Assert.assertEquals(workUnits.get(2).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 0)), 1);
     
Assert.assertEquals(workUnits.get(2).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 1)), 2);
+    
Assert.assertEquals(workUnits.get(2).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
 2, 0.001);
+
     Assert.assertEquals(workUnits.get(3).getProp(KafkaSource.TOPIC_NAME), 
"topic2");
     
Assert.assertEquals(workUnits.get(3).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 0)), 3);
+    
Assert.assertEquals(workUnits.get(3).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
 2, 0.001);
   }
 
 

Reply via email to