[
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411342#comment-15411342
]
ASF GitHub Bot commented on APEXMALHAR-2169:
--------------------------------------------
Github user chaithu14 commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/351#discussion_r73824992
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
---
@@ -834,17 +840,16 @@ private boolean isPartitionRequired(int opid,
List<KafkaConsumer.KafkaMeterStats
return false;
}
+ if (!isPartitionBasedOnLoad) {
--- End diff --
Consider the below scenario:
By default, repartitionInterval is 30 sec. So, operator checks whether the
repartition is needed or not at 30, 60, 90, 120, etc. Suppose, Meta-data change
happened at 120.
In this case, operator checks whether repartition required or not at 30,
60, 90 secs. isPartitionRequired returns true based on HardConstraint and
SoftConstraint.
But, if the user wants to disable the dynamic partition based on load then
isPartitionRequied() has to return false at 30, 60, 90 secs.
> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY
> partition strategy
> ----------------------------------------------------------------------------------------------
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
> Issue Type: Bug
> Reporter: Chaitanya
> Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce:
> (1) Created a topic with 3 partitions
> (2) Created an application as KAFKA -> Console with below configuration:
> strategy : one_to_many
> initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]:
> Repartition the operator(s) under 9223372036854775807 msgs/s and
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list
> after repartition: OperatorMeta{name=Input,
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
> attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)