Hi Chris, Sorry for not catching that code path. I am not sure if it is actually a regression as I took a look at the 1.0.0-BETA code and it matches the 0.7.0, specifically this comment block:
/* * We're using the default value from Kafka. We are using it to control the * message size before it goes to to Kafka thus limiting possibility of a * late failures in Kafka client. */ found at[1] leads me to believe it was intentional and not a regression. Looking at the 0.6.1 release code it appears that PutKafka used a default of 5 MB [2]. I can speculate on the reasoning behind it, however, I will refrain from opining on it as I was not involved in any of the conversations related to the change and enforcement of the 1 MB max. [1] https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java#L36-L41 [2] https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176 Thanks, Andrew On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote: > Thanks, Andrew. > > I’ve set all of the right broker configs to allow larger messages. > Believe me I spent a lot of time banging my head against the wall thinking > that the broker and topic configs were wrong. > > PublisingKafka uses PublishingContext. That class has bean property > called maxRequestSize, which defaults to 1048576. As far as I can tell the > setMaxRequestSize() method is never called (except by some test code.) > KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and > passes the result to the constructor for StreamDemarcator. The publish > method then calls the StreamDemarcator. getNextToken(), which in turns > calls StreamDemarcator.fill() which compares the stream position against > the maxRequestSize and throws the exception with this line. > > throw new IllegalStateException("Maximum allowed data size of " + > this.maxDataSize + " exceeded."); > > Which matches what I see in the nifi-app.log file… > > 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8] > o.apache.nifi.processors.kafka.PutKafka > java.lang.IllegalStateException: Maximum allowed data size of 1048576 > exceeded. > at > org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153) > ~[nifi-utils-0.7.0.jar:0.7.0] > at org.apache.nifi.stream.io.util.StreamDemarcator. > nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0] > at > org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:129) > ~[nifi-kafka-processors-0.7.0.jar:0.7.0] > at > org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:315) > ~[nifi-kafka-processors-0.7.0.jar:0.7.0] > at org.apache.nifi.controller.repository. > StandardProcessSession.read(StandardProcessSession.java:1851) > ~[nifi-framework-core-0.7.0.jar:0.7.0] > at org.apache.nifi.controller.repository. > StandardProcessSession.read(StandardProcessSession.java:1822) > ~[nifi-framework-core-0.7.0.jar:0.7.0] > at org.apache.nifi.processors.kafka.PutKafka. > doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0. > jar:0.7.0] > at org.apache.nifi.processors.kafka.PutKafka. > rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0. > jar:0.7.0] > at org.apache.nifi.processors.kafka.AbstractKafkaProcessor. > onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0. > jar:0.7.0] > at org.apache.nifi.controller.StandardProcessorNode.onTrigger( > StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0] > at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. > call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0. > jar:0.7.0] > at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. > call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0. > jar:0.7.0] > at org.apache.nifi.controller.scheduling. > TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127) > [nifi-framework-core-0.7.0.jar:0.7.0] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_45] > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > [na:1.8.0_45] > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > [na:1.8.0_45] > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > [na:1.8.0_45] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_45] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_45] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] > > This occurs using PublishKafka, and PutKafka. Setting the Max Record Size > property in the PutKafka processor has no affect on this. Note the stack > trace above is from the PutKafka processor with Max Record Size set to 10MB. > > I believe that this a regression from 0.6.0. > > Chris McDermott > > Remote Business Analytics > STaTS/StoreFront Remote > HPE Storage > Hewlett Packard Enterprise > Mobile: +1 978-697-5315 > > > > On 8/20/16, 3:48 PM, "Andrew Psaltis" <psaltis.and...@gmail.com> wrote: > > Hi Chris, > Regarding the PutKafka processor looking at this block[1] of the > PutKafka > code, it has a default size of 1 MB, but it does not restrict the > size. The > DATA_SIZE_VALIDATOR does a sanity check and also enforces that > the supported value entered is the correct format <value> [B| > KB|MB|GB|TB]. > Later on in the code at this block[2], the value is set on the Kafka > config, again this does not enforce a value maximum. > > In regards to the PublishKafka processor I do not see where it accepts > a > size nor restrict the size at all. > > Have you adjusted the 'message.max.bytes' config value for your > broker(s)? > The default value for that is 1 MB [3] (The url references the 0.8 > Kafka, > however I believe this default has been stable since the early days of > the > project.) > > If you really do want to send messages that are larger than 1 MB in > size, I > would highly recommending reading this post[4] from Gwen Shapira. It > does > a great job of outlining the things you need to take into > consideration. > This will also point you to the relevant configs in Kafka that will > need to > be adjusted if you decide to go this route. > > > Thanks, > Andrew > > [1] > https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180 > [2] > https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495 > [3] https://kafka.apache.org/08/configuration.html > [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/ > > On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU - > STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote: > > > Hi folks, > > > > > > > > From experimentation and looking at the code it seems that the max > message > > size that can be sent via the PublishKafka and PutKafka processors > in 0.7.0 > > is 1MB. Can someone please confirm my read on this? > > > > > > > > Thanks, > > > > > > > > Chris McDermott > > > > > > > > Remote Business Analytics > > > > STaTS/StoreFront Remote > > > > HPE Storage > > > > Hewlett Packard Enterprise > > > > Mobile: +1 978-697-5315 > > > > > > > > > > > -- > Thanks, > Andrew > > Subscribe to my book: Streaming Data <http://manning.com/psaltis> > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata> > > > -- Thanks, Andrew Subscribe to my book: Streaming Data <http://manning.com/psaltis> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>