This is an automated email from the ASF dual-hosted git repository. lesun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gobblin.git
commit c9885d81a63747228bcd1b6a403dd017109703dc Author: suvasude <[email protected]> AuthorDate: Tue Jun 1 10:52:25 2021 -0700 GOBBLIN-1455: Limit gobblin.kafka.minContainersForTopic config to the number of partitions of the topic --- .../kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java index f01b648..3b93ab2 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java @@ -282,7 +282,8 @@ public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker { * @return the minimum workunit size. */ private Double getMinWorkUnitSize(WorkUnit workUnit) { - int minContainersForTopic = workUnit.getPropAsInt(MIN_CONTAINERS_FOR_TOPIC, -1); + int minContainersForTopic = Math.min(workUnit.getPropAsInt(MIN_CONTAINERS_FOR_TOPIC, -1), + workUnit.getPropAsInt(KafkaSource.NUM_TOPIC_PARTITIONS)); if (minContainersForTopic == -1) { //No minimum configured? Return lower bound for workunit size to be 0. return 0.0;
