[
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396882#comment-15396882
]
Chaitanya commented on APEXMALHAR-2169:
---------------------------------------
Siyuan,
I am looking for dynamic partition based on metadata change. Before metadata
change, operator partitioned happened based on throughput. Issue is in
partition based on throughput.
Issue is at line 625, latest stats may not contain counters. If the latest
stats does not contain counters then definePartitions() return empty partition
list. (Refer the warning messages under JIRA description).
> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY
> partition strategy
> ----------------------------------------------------------------------------------------------
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
> Issue Type: Bug
> Reporter: Chaitanya
> Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce:
> (1) Created a topic with 3 partitions
> (2) Created an application as KAFKA -> Console with below configuration:
> strategy : one_to_many
> initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]:
> Repartition the operator(s) under 9223372036854775807 msgs/s and
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list
> after repartition: OperatorMeta{name=Input,
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
> attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)