Ryan Persaud created NIFI-3378:
----------------------------------

             Summary: PutKafka retries flowfiles that are too large instead of 
routing them to failure
                 Key: NIFI-3378
                 URL: https://issues.apache.org/jira/browse/NIFI-3378
             Project: Apache NiFi
          Issue Type: Bug
          Components: Core Framework
    Affects Versions: 1.0.0
            Reporter: Ryan Persaud


When using PutKafka, if the content size exceeds 1048576, an uncaught exception 
is thrown by org.apache.nifi.stream.io.util.StreamDemarcator.fill() (see stack 
trace below).  This results in the offending flowfile being retried repeatedly 
instead of being routed to failure.

The exception is thrown because maxRequestSize in PublishingContext is 
hardcoded to 1048576 which is the "kafka default."  In actuality, I believe the 
default should be 1000000 for Kafka 0.8 (see message.max.bytes at 
https://kafka.apache.org/08/documentation.html#brokerconfigs), but that's 
another issue. If we know that any content larger than maxRequestSize is always 
going to cause an exception, would it make sense to check the fileSize early in 
PutKafka and avoid many needless function calls, exceptions and retries?  For 
example, something like:

    @Override
    protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session) throws ProcessException {
        boolean processed = false;
        FlowFile flowFile = session.get();
        if (flowFile != null) {
            if (flowFile.getSize() > 1048576) {
                session.transfer(session.penalize(flowFile), REL_FAILURE);
            }
            else {
                flowFile = this.doRendezvousWithKafka(flowFile, context, 
session);
                if (!this.isFailedFlowFile(flowFile)) {
                    session.getProvenanceReporter().send(flowFile,
                            context.getProperty(SEED_BROKERS).getValue() + "/"
                                    + 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
                    session.transfer(flowFile, REL_SUCCESS);
                } else {
                    session.transfer(session.penalize(flowFile), REL_FAILURE);
                }
            }
            processed = true;
        }
        return processed;
    }

Thoughts? A RouteOnAttribute processor that examines the fileSize attribute 
could be used to 'protect' PutKafka, but that seems rather cumbersome.

2017-01-20 02:48:12,008 ERROR [Timer-Driven Process Thread-6] 
o.apache.nifi.processors.kafka.PutKafka 
java.lang.IllegalStateException: Maximum allowed data size of 1048576 exceeded.
        at 
org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153) 
~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.stream.io.util.StreamDemarcator.nextToken(StreamDemarcator.java:105)
 ~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:126)
 ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:313) 
~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1880)
 ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851)
 ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.processors.kafka.PutKafka.doRendezvousWithKafka(PutKafka.java:309)
 ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.processors.kafka.PutKafka.rendezvousWithKafka(PutKafka.java:285)
 ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.processors.kafka.AbstractKafkaProcessor.onTrigger(AbstractKafkaProcessor.java:76)
 ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064)
 [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
 [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_77]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[na:1.8.0_77]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_77]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_77]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_77]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_77]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to