[
https://issues.apache.org/jira/browse/NIFI-3378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16013077#comment-16013077
]
Joseph Witt commented on NIFI-3378:
-----------------------------------
We're probably being overly alarming. The PublishKafka processors are by far
the most used and are quite stable.
> PutKafka retries flowfiles that are too large instead of routing them to
> failure
> --------------------------------------------------------------------------------
>
> Key: NIFI-3378
> URL: https://issues.apache.org/jira/browse/NIFI-3378
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 1.0.0
> Reporter: Ryan Persaud
>
> When using PutKafka, if the content size exceeds 1048576, an uncaught
> exception is thrown by org.apache.nifi.stream.io.util.StreamDemarcator.fill()
> (see stack trace below). This results in the offending flowfile being
> retried repeatedly instead of being routed to failure.
> The exception is thrown because maxRequestSize in PublishingContext is
> hardcoded to 1048576 which is the "kafka default." In actuality, I believe
> the default should be 1000000 for Kafka 0.8 (see message.max.bytes at
> https://kafka.apache.org/08/documentation.html#brokerconfigs), but that's
> another issue. If we know that any content larger than maxRequestSize is
> always going to cause an exception, would it make sense to check the fileSize
> early in PutKafka and avoid many needless function calls, exceptions and
> retries? For example, something like:
> {code}
> @Override
> protected boolean rendezvousWithKafka(ProcessContext context,
> ProcessSession session) throws ProcessException {
> boolean processed = false;
> FlowFile flowFile = session.get();
> if (flowFile != null) {
> if (flowFile.getSize() > 1048576) {
> session.transfer(session.penalize(flowFile), REL_FAILURE);
> }
> else {
> flowFile = this.doRendezvousWithKafka(flowFile, context,
> session);
> if (!this.isFailedFlowFile(flowFile)) {
> session.getProvenanceReporter().send(flowFile,
> context.getProperty(SEED_BROKERS).getValue() + "/"
> +
> context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
> session.transfer(flowFile, REL_SUCCESS);
> } else {
> session.transfer(session.penalize(flowFile), REL_FAILURE);
> }
> }
> processed = true;
> }
> return processed;
> }
> {code}
> Thoughts? A RouteOnAttribute processor that examines the fileSize attribute
> could be used to 'protect' PutKafka, but that seems rather cumbersome.
> 2017-01-20 02:48:12,008 ERROR [Timer-Driven Process Thread-6]
> o.apache.nifi.processors.kafka.PutKafka
> java.lang.IllegalStateException: Maximum allowed data size of 1048576
> exceeded.
> at
> org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153)
> ~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.stream.io.util.StreamDemarcator.nextToken(StreamDemarcator.java:105)
> ~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:126)
> ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:313)
> ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1880)
> ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851)
> ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.processors.kafka.PutKafka.doRendezvousWithKafka(PutKafka.java:309)
> ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.processors.kafka.PutKafka.rendezvousWithKafka(PutKafka.java:285)
> ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.processors.kafka.AbstractKafkaProcessor.onTrigger(AbstractKafkaProcessor.java:76)
> ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064)
> [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
> [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_77]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_77]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)