[ 
https://issues.apache.org/jira/browse/NIFI-2732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15467584#comment-15467584
 ] 

ASF GitHub Bot commented on NIFI-2732:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/987#discussion_r77649182
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 ---
    @@ -47,22 +156,244 @@
          * kafka client to collect more data from Kafka before committing the
          * offsets.
          *
    -     * @param offsets offsets
    -     * @throws KafkaException if issue occurs talking to underlying 
resource.
    +     * if false then we didn't do anything and should probably yield if 
true
    +     * then we committed new data
    +     *
    +     */
    +    boolean commit() {
    +        if (uncommittedOffsetsMap.isEmpty()) {
    +            resetInternalState();
    +            return false;
    +        }
    +        try {
    +            /**
    +             * Committing the nifi session then the offsets means we have 
an at
    +             * least once guarantee here. If we reversed the order we'd 
have at
    +             * most once.
    +             */
    +            final Collection<FlowFile> bundledFlowFiles = getBundles();
    +            if (!bundledFlowFiles.isEmpty()) {
    +                getProcessSession().transfer(getBundles(), REL_SUCCESS);
    --- End diff --
    
    Is there a reason we're calling getBundles() again here, instead of just 
calling transfer with the bundledFlowFiles variable? getBundles() is fairly 
expensive to be calling again needlessly.


> ConsumeKafka 0.9 and 0.10 not handling partition reassignment case 
> sufficiently
> -------------------------------------------------------------------------------
>
>                 Key: NIFI-2732
>                 URL: https://issues.apache.org/jira/browse/NIFI-2732
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Joseph Witt
>            Assignee: Joseph Witt
>            Priority: Critical
>             Fix For: 1.1.0
>
>
> The new ConsumeKafka clients handle the threading model of the consumer api 
> correctly.  However, they are not yet honoring partition reassignment cases 
> sufficiently which means we could have avoidable cases of duplication.  By 
> registering a partition reassignment listener we can handle it correctly.
> Further, the processor is loading subsequent polls of messages into memory 
> rather than writing directly to the process session/disk.  We could write 
> them to disk and achieve far better performance and efficiency.  Early 
> testing shows easily achieving 100MB/s sustained per thread on a simple 
> laptop setup with defaults which will scale very nicely on a legit installed 
> content repository.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to