[ 
https://issues.apache.org/jira/browse/NIFI-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312282#comment-16312282
 ] 

ASF GitHub Bot commented on NIFI-4724:
--------------------------------------

Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2362
  
    @markap14 Thanks for your feedback, I've added another commit to 
incorporate your suggestions. Also rebased with the latest master just in case.
    
    I considered to implement the use of BlockingQueue for further 
optimization, too. However, I think it may be dangerous to reuse the same 
byte[] object because different FlowFiles may have different size of content. 
For example, the 1st FlowFile content is 'foo bar', then the 2nd FlowFile's is 
'baz', then the 2nd message would be 'baz bar'. In addition to that, the 
`ProducerRecord` provided by Kafka client library does not take `size` and 
`offset` those are usually used to specify a part of a larger byte array. So, I 
think we need to create new byte[] object for each message. I hope my 
understanding is correct. But if there's more optimization can be done, please 
file a separate JIRA as I think it's out of scope for this ticket.


> Publish kafka processors fails with FlowFileHandlingException if the flow 
> file is empty
> ---------------------------------------------------------------------------------------
>
>                 Key: NIFI-4724
>                 URL: https://issues.apache.org/jira/browse/NIFI-4724
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.1.0
>            Reporter: Mahesh Nayak
>            Assignee: Koji Kawamura
>
> 1. Construct the flow GenerateFlowFile --> PublishKafka --> PutFile
> 2. In GenerateFlowFile set the "File Size" to 0B.
> 3. Start the flow.
> Result : Kafka processor throws below exception
> {code:None}
> 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] 
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
> PublishKafka_0_10[id=95dbc77a-0160-1000-0000-000069761c4e] due to uncaught 
> Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0]
>  transfer relationship not specified
> 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] 
> o.a.n.c.t.ContinuallyRunProcessorTask
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0]
>  transfer relationship not specified
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
>         at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
>         at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>         at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to