This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d389be9 [GOBBLIN-1360] Provide option to specify minimum number of
containers per topic in Gobblin Kafka[]
d389be9 is described below
commit d389be9618f2a5b28aace8930544427093ce62e3
Author: suvasude <[email protected]>
AuthorDate: Thu Jan 14 10:56:41 2021 -0800
[GOBBLIN-1360] Provide option to specify minimum number of containers per
topic in Gobblin Kafka[]
Closes #3201 from sv2000/minContainersPerTopic
---
.../packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java | 10 +++++++---
.../ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java | 11 +++++++++++
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
index 6113a79..98ec5be 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
@@ -113,13 +113,17 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimator
implements KafkaWorkUni
maxProduceRate =
workUnit.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY);
}
+ double minWorkUnitSize =
workUnit.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.MIN_WORKUNIT_SIZE_KEY,
0.0);
+
//Compute the target consume rate in MB/s.
double targetConsumeRate =
((double) (offsetLag * avgRecordSize) / (catchUpSlaInHours * 3600 *
ONE_MEGA_BYTE)) + (maxProduceRate
* produceRateScalingFactor);
- log.debug("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg
Record size: {}, Target Consume Rate: {}",
- topic + ":" + partition, maxProduceRate, offsetLag, avgRecordSize,
targetConsumeRate);
- return targetConsumeRate;
+
+ log.debug("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg
Record size: {}, Target Consume Rate: {}, Min Workunit size: {}",
+ topic + ":" + partition, maxProduceRate, offsetLag, avgRecordSize,
targetConsumeRate, minWorkUnitSize);
+ //Return the target consumption rate to catch up with incoming traffic and
current lag, as the workunit size.
+ return Math.max(targetConsumeRate, minWorkUnitSize);
}
/**
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
index d199295..be68ef7 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimatorTest.java
@@ -81,6 +81,7 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimatorTest {
//WorkUnit with Kafka watermark and previous avg produce rates
watermark = new KafkaStreamingExtractor.KafkaWatermark(new
KafkaPartition.Builder().withTopicName(TEST_TOPIC).withId(0).build(), new
LongWatermark(0L));
+ workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.MIN_WORKUNIT_SIZE_KEY,
2.0);
watermark.setAvgRecordSize(AVG_RECORD_SIZE);
watermark.setAvgProduceRates(avgProduceRates);
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK,
GSON.toJson(watermark));
@@ -90,5 +91,15 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimatorTest
{
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS,
format.parse(BINPACKING_TIME_2).getTime());
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY,
2.0);
Assert.assertEquals(new
Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 4L);
+
+ //Create a new workunit with minimum workunit size = 5.0
+ workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PARTITION_WATERMARK,
GSON.toJson(watermark));
+
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY,
1.0);
+ workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.MIN_WORKUNIT_SIZE_KEY,
5.0);
+ workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY,
Long.toString(6 * 3600 * 1024));
+
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.PACKING_START_TIME_MILLIS,
format.parse(BINPACKING_TIME_2).getTime());
+
workUnit.setProp(KafkaTopicGroupingWorkUnitPacker.DEFAULT_WORKUNIT_SIZE_KEY,
2.0);
+ Assert.assertEquals(new
Double(this.estimator.calcEstimatedSize(workUnit)).longValue(), 5L);
}
}
\ No newline at end of file