[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411388#comment-15411388
 ] 

Siyuan Hua commented on APEXMALHAR-2169:
----------------------------------------

[~chaithu]  
Then I think the problem is in softConstraint and hardConstraint code, it 
should never return true because default limit is Long.MAX_VALUE.  

There is something in backlog that I didn't track in Jira(my bad). But since 
you have issue here, can you please do some refactor here. 
We want to actually simplify the operator code instead of making it more and 
more complicate. And kafka input operator is there for awhile and I don't see 
any requirement/asking for dynamic partition based on throughput.
Can we take away the hardConstraint and softConstraint condition check and make 
the 2 upperbound property deprecated.  So dynamic partition by default should 
only happen when kafka partition changes. 
And for ONE_TO_MANY partition strategy, the number of operator partitions 
should stay unchanged for the whole application with the specified 
initialPartitionCount. I think there is still bug there that if new kafka 
partition is added, we always start a new partition. That is not correct.

And you can create another ticket to move all repartition based on throughput 
to a separate Partitioner so the operator code would be simple and easy to 
understand/debug



> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> ----------------------------------------------------------------------------------------------
>
>                 Key: APEXMALHAR-2169
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>            Reporter: Chaitanya
>            Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>        strategy : one_to_many
>        initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to