[
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411108#comment-15411108
]
ASF GitHub Bot commented on APEXMALHAR-2169:
--------------------------------------------
Github user siyuanh commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/351#discussion_r73811155
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
---
@@ -834,17 +840,16 @@ private boolean isPartitionRequired(int opid,
List<KafkaConsumer.KafkaMeterStats
return false;
}
+ if (!isPartitionBasedOnLoad) {
--- End diff --
Why would this line be involved. If there is new kafka partition, it would
return true at line 829. And new kafka partition would be added to waiting list.
> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY
> partition strategy
> ----------------------------------------------------------------------------------------------
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
> Issue Type: Bug
> Reporter: Chaitanya
> Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce:
> (1) Created a topic with 3 partitions
> (2) Created an application as KAFKA -> Console with below configuration:
> strategy : one_to_many
> initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]:
> Repartition the operator(s) under 9223372036854775807 msgs/s and
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list
> after repartition: OperatorMeta{name=Input,
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
> attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)