[
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411388#comment-15411388
]
Siyuan Hua commented on APEXMALHAR-2169:
----------------------------------------
[~chaithu]
Then I think the problem is in softConstraint and hardConstraint code, it
should never return true because default limit is Long.MAX_VALUE.
There is something in backlog that I didn't track in Jira(my bad). But since
you have issue here, can you please do some refactor here.
We want to actually simplify the operator code instead of making it more and
more complicate. And kafka input operator is there for awhile and I don't see
any requirement/asking for dynamic partition based on throughput.
Can we take away the hardConstraint and softConstraint condition check and make
the 2 upperbound property deprecated. So dynamic partition by default should
only happen when kafka partition changes.
And for ONE_TO_MANY partition strategy, the number of operator partitions
should stay unchanged for the whole application with the specified
initialPartitionCount. I think there is still bug there that if new kafka
partition is added, we always start a new partition. That is not correct.
And you can create another ticket to move all repartition based on throughput
to a separate Partitioner so the operator code would be simple and easy to
understand/debug
> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY
> partition strategy
> ----------------------------------------------------------------------------------------------
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
> Issue Type: Bug
> Reporter: Chaitanya
> Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce:
> (1) Created a topic with 3 partitions
> (2) Created an application as KAFKA -> Console with below configuration:
> strategy : one_to_many
> initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]:
> Repartition the operator(s) under 9223372036854775807 msgs/s and
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list
> after repartition: OperatorMeta{name=Input,
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
> attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)