This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d389be9  [GOBBLIN-1360] Provide option to specify minimum number of 
containers per topic in Gobblin Kafka[]
d389be9 is described below

commit d389be9618f2a5b28aace8930544427093ce62e3
Author: suvasude <[email protected]>
AuthorDate: Thu Jan 14 10:56:41 2021 -0800

    [GOBBLIN-1360] Provide option to specify minimum number of containers per 
topic in Gobblin Kafka[]
    
    Closes #3201 from sv2000/minContainersPerTopic
---
 .../packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java   | 10 +++++++---
 .../ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java      | 11 +++++++++++
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
index 6113a79..98ec5be 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
@@ -113,13 +113,17 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimator 
implements KafkaWorkUni
       maxProduceRate = 
workUnit.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY);
     }
 
+    double minWorkUnitSize = 
workUnit.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.MIN_WORKUNIT_SIZE_KEY,
 0.0);
+
     //Compute the target consume rate in MB/s.
     double targetConsumeRate =
         ((double) (offsetLag * avgRecordSize) / (catchUpSlaInHours * 3600 * 
ONE_MEGA_BYTE)) + (maxProduceRate
             * produceRateScalingFactor);
-    log.debug("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg 
Record size: {}, Target Consume Rate: {}",
-        topic + ":" + partition, maxProduceRate, offsetLag, avgRecordSize, 
targetConsumeRate);
-    return targetConsumeRate;
+
+    log.debug("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg 
Record size: {}, Target Consume Rate: {}, Min Workunit size: {}",
+        topic + ":" + partition, maxProduceRate, offsetLag, avgRecordSize, 
targetConsumeRate, minWorkUnitSize);
+    //Return the target consumption rate to catch up with incoming traffic and 
current lag, as the workunit size.
+    return Math.max(targetConsumeRate, minWorkUnitSize);
   }
 
   /**
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
index d199295..be68ef7 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
@@ -81,6 +81,7 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimatorTest {
 
     //WorkUnit with Kafka watermark and previous avg produce rates
     watermark = new KafkaStreamingExtractor.KafkaWatermark(new 
KafkaPartition.Builder().withTopicName(TEST_TOPIC).withId(0).build(), new 
LongWatermark(0L));
+    workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.MIN_WORKUNIT_SIZE_KEY, 
2.0);
     watermark.setAvgRecordSize(AVG_RECORD_SIZE);
     watermark.setAvgProduceRates(avgProduceRates);
     workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK, 
GSON.toJson(watermark));
@@ -90,5 +91,15 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimatorTest 
{
     
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS, 
format.parse(BINPACKING_TIME_2).getTime());
     
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY, 
2.0);
     Assert.assertEquals(new 
Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 4L);
+
+    //Create a new workunit with minimum workunit size = 5.0
+    workUnit = WorkUnit.createEmpty();
+    workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK, 
GSON.toJson(watermark));
+    
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY, 
1.0);
+    workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.MIN_WORKUNIT_SIZE_KEY, 
5.0);
+    workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, 
Long.toString(6 * 3600 * 1024));
+    
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS, 
format.parse(BINPACKING_TIME_2).getTime());
+    
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY, 
2.0);
+    Assert.assertEquals(new 
Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 5L);
   }
 }
\ No newline at end of file

Reply via email to