[ 
https://issues.apache.org/jira/browse/NIFI-3739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991051#comment-15991051
 ] 

ASF GitHub Bot commented on NIFI-3739:
--------------------------------------

Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1695#discussion_r114151491
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 ---
    @@ -392,13 +394,27 @@ private void writeDemarcatedData(final ProcessSession 
session, final List<Consum
             bundleMap.put(topicPartition, tracker);
         }
     
    +    private void rollback(final TopicPartition topicPartition) {
    +        final OffsetAndMetadata offsetAndMetadata = 
kafkaConsumer.committed(topicPartition);
    --- End diff --
    
    instead of asking the kafka client where we were as far as committed 
offsets we should first check the uncommittedOffsets map and if nothing then 
ask the kafkaclient otherwise use the last uncommittedOffset for this 
topic/partition pair.


> Create Processors for publishing records to and consuming records from Kafka
> ----------------------------------------------------------------------------
>
>                 Key: NIFI-3739
>                 URL: https://issues.apache.org/jira/browse/NIFI-3739
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.2.0
>
>
> With the new record readers & writers that have been added in now, it would 
> be good to allow records to be pushed to and pulled from kafka. Currently, we 
> support demarcated data but sometimes we can't correctly demarcate data in a 
> way that keeps the format valid (json is a good example). We should have 
> processors that use the record readers and writers for this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to