[
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411341#comment-15411341
]
ASF GitHub Bot commented on APEXMALHAR-2169:
--------------------------------------------
Github user chaithu14 commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/351#discussion_r73824988
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
---
@@ -188,6 +188,8 @@
@Min(1)
private int initialPartitionCount = 1;
+ private boolean isPartitionBasedOnLoad = false;
--- End diff --
Is this can be achieved as follows. Please correct it, if I am wrong.
if (msgRateUpperBound != Long.MAX_VALUE) {
/// Dynamic partition based on load is enabled
} else {
/// Dynamic partition based on load is disabled
}
> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY
> partition strategy
> ----------------------------------------------------------------------------------------------
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
> Issue Type: Bug
> Reporter: Chaitanya
> Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce:
> (1) Created a topic with 3 partitions
> (2) Created an application as KAFKA -> Console with below configuration:
> strategy : one_to_many
> initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]:
> Repartition the operator(s) under 9223372036854775807 msgs/s and
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list
> after repartition: OperatorMeta{name=Input,
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
> attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)