[ 
https://issues.apache.org/jira/browse/NIFI-3378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16013059#comment-16013059
 ] 

Joseph Witt commented on NIFI-3378:
-----------------------------------

[~rpersaud] this is really easy to work around at the moment by routing data in 
the flow by size using RouteOnAttribute and an expression against the filesize. 
 The PublishKafka processors were built using the newer/better Kafka API and 
this specific case was made better handled in them as I recall.  I'd recommend 
not fixing this in PublishKafka.

> PutKafka retries flowfiles that are too large instead of routing them to 
> failure
> --------------------------------------------------------------------------------
>
>                 Key: NIFI-3378
>                 URL: https://issues.apache.org/jira/browse/NIFI-3378
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 1.0.0
>            Reporter: Ryan Persaud
>
> When using PutKafka, if the content size exceeds 1048576, an uncaught 
> exception is thrown by org.apache.nifi.stream.io.util.StreamDemarcator.fill() 
> (see stack trace below).  This results in the offending flowfile being 
> retried repeatedly instead of being routed to failure.
> The exception is thrown because maxRequestSize in PublishingContext is 
> hardcoded to 1048576 which is the "kafka default."  In actuality, I believe 
> the default should be 1000000 for Kafka 0.8 (see message.max.bytes at 
> https://kafka.apache.org/08/documentation.html#brokerconfigs), but that's 
> another issue. If we know that any content larger than maxRequestSize is 
> always going to cause an exception, would it make sense to check the fileSize 
> early in PutKafka and avoid many needless function calls, exceptions and 
> retries?  For example, something like:
> {code}
>     @Override
>     protected boolean rendezvousWithKafka(ProcessContext context, 
> ProcessSession session) throws ProcessException {
>         boolean processed = false;
>         FlowFile flowFile = session.get();
>         if (flowFile != null) {
>             if (flowFile.getSize() > 1048576) {
>                 session.transfer(session.penalize(flowFile), REL_FAILURE);
>             }
>             else {
>                 flowFile = this.doRendezvousWithKafka(flowFile, context, 
> session);
>                 if (!this.isFailedFlowFile(flowFile)) {
>                     session.getProvenanceReporter().send(flowFile,
>                             context.getProperty(SEED_BROKERS).getValue() + "/"
>                                     + 
> context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
>                     session.transfer(flowFile, REL_SUCCESS);
>                 } else {
>                     session.transfer(session.penalize(flowFile), REL_FAILURE);
>                 }
>             }
>             processed = true;
>         }
>         return processed;
>     }
> {code}
> Thoughts? A RouteOnAttribute processor that examines the fileSize attribute 
> could be used to 'protect' PutKafka, but that seems rather cumbersome.
> 2017-01-20 02:48:12,008 ERROR [Timer-Driven Process Thread-6] 
> o.apache.nifi.processors.kafka.PutKafka 
> java.lang.IllegalStateException: Maximum allowed data size of 1048576 
> exceeded.
>         at 
> org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153)
>  ~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.stream.io.util.StreamDemarcator.nextToken(StreamDemarcator.java:105)
>  ~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:126)
>  ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:313) 
> ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1880)
>  ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851)
>  ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.doRendezvousWithKafka(PutKafka.java:309)
>  ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.rendezvousWithKafka(PutKafka.java:285)
>  ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.AbstractKafkaProcessor.onTrigger(AbstractKafkaProcessor.java:76)
>  ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064)
>  [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>  [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>  [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>  [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_77]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
> [na:1.8.0_77]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  [na:1.8.0_77]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  [na:1.8.0_77]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_77]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_77]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to