I’ll raise a JIRA, Joe. Thanks,
Chris McDermott Remote Business Analytics STaTS/StoreFront Remote HPE Storage Hewlett Packard Enterprise Mobile: +1 978-697-5315 On 8/20/16, 6:52 PM, "Joe Witt" <joe.w...@gmail.com> wrote: If no jira is raised sooner I'll raise one and get it sorted. On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <psaltis.and...@gmail.com> wrote: > Hi Chris, > Sorry for not catching that code path. I am not sure if it is actually a > regression as I took a look at the 1.0.0-BETA code and it matches the > 0.7.0, specifically this comment block: > > /* > * We're using the default value from Kafka. We are using it to control the > * message size before it goes to to Kafka thus limiting possibility of a > * late failures in Kafka client. > */ > > found at[1] leads me to believe it was intentional and not a regression. > Looking at the 0.6.1 release code it appears that PutKafka used a default > of 5 MB [2]. > > I can speculate on the reasoning behind it, however, I will refrain from > opining on it as I was not involved in any of the conversations related to > the change and enforcement of the 1 MB max. > > [1] > https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA- > official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka- > processors/src/main/java/org/apache/nifi/processors/kafka/ > PublishingContext.java#L36-L41 > [2] > https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi- > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176 > > Thanks, > Andrew > > On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU - > STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote: > > > Thanks, Andrew. > > > > I’ve set all of the right broker configs to allow larger messages. > > Believe me I spent a lot of time banging my head against the wall > thinking > > that the broker and topic configs were wrong. > > > > PublisingKafka uses PublishingContext. That class has bean property > > called maxRequestSize, which defaults to 1048576. As far as I can tell > the > > setMaxRequestSize() method is never called (except by some test code.) > > KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and > > passes the result to the constructor for StreamDemarcator. The publish > > method then calls the StreamDemarcator. getNextToken(), which in turns > > calls StreamDemarcator.fill() which compares the stream position against > > the maxRequestSize and throws the exception with this line. > > > > throw new IllegalStateException("Maximum allowed data size of " + > > this.maxDataSize + " exceeded."); > > > > Which matches what I see in the nifi-app.log file… > > > > 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8] > > o.apache.nifi.processors.kafka.PutKafka > > java.lang.IllegalStateException: Maximum allowed data size of 1048576 > > exceeded. > > at org.apache.nifi.stream.io.util.StreamDemarcator.fill( > StreamDemarcator.java:153) > > ~[nifi-utils-0.7.0.jar:0.7.0] > > at org.apache.nifi.stream.io.util.StreamDemarcator. > > nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0] > > at org.apache.nifi.processors.kafka.KafkaPublisher.publish( > KafkaPublisher.java:129) > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0] > > at org.apache.nifi.processors.kafka.PutKafka$1.process( > PutKafka.java:315) > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0] > > at org.apache.nifi.controller.repository. > > StandardProcessSession.read(StandardProcessSession.java:1851) > > ~[nifi-framework-core-0.7.0.jar:0.7.0] > > at org.apache.nifi.controller.repository. > > StandardProcessSession.read(StandardProcessSession.java:1822) > > ~[nifi-framework-core-0.7.0.jar:0.7.0] > > at org.apache.nifi.processors.kafka.PutKafka. > > doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0. > > jar:0.7.0] > > at org.apache.nifi.processors.kafka.PutKafka. > > rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0. > > jar:0.7.0] > > at org.apache.nifi.processors.kafka.AbstractKafkaProcessor. > > onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0. > > jar:0.7.0] > > at org.apache.nifi.controller.StandardProcessorNode.onTrigger( > > StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0] > > at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. > > call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0. > > jar:0.7.0] > > at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. > > call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0. > > jar:0.7.0] > > at org.apache.nifi.controller.scheduling. > > TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127) > > [nifi-framework-core-0.7.0.jar:0.7.0] > > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > > [na:1.8.0_45] > > at java.util.concurrent.FutureTask.runAndReset( > FutureTask.java:308) > > [na:1.8.0_45] > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > [na:1.8.0_45] > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > [na:1.8.0_45] > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > [na:1.8.0_45] > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > [na:1.8.0_45] > > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] > > > > This occurs using PublishKafka, and PutKafka. Setting the Max Record > Size > > property in the PutKafka processor has no affect on this. Note the stack > > trace above is from the PutKafka processor with Max Record Size set to > 10MB. > > > > I believe that this a regression from 0.6.0. > > > > Chris McDermott > > > > Remote Business Analytics > > STaTS/StoreFront Remote > > HPE Storage > > Hewlett Packard Enterprise > > Mobile: +1 978-697-5315 > > > > > > > > On 8/20/16, 3:48 PM, "Andrew Psaltis" <psaltis.and...@gmail.com> wrote: > > > > Hi Chris, > > Regarding the PutKafka processor looking at this block[1] of the > > PutKafka > > code, it has a default size of 1 MB, but it does not restrict the > > size. The > > DATA_SIZE_VALIDATOR does a sanity check and also enforces that > > the supported value entered is the correct format <value> [B| > > KB|MB|GB|TB]. > > Later on in the code at this block[2], the value is set on the Kafka > > config, again this does not enforce a value maximum. > > > > In regards to the PublishKafka processor I do not see where it > accepts > > a > > size nor restrict the size at all. > > > > Have you adjusted the 'message.max.bytes' config value for your > > broker(s)? > > The default value for that is 1 MB [3] (The url references the 0.8 > > Kafka, > > however I believe this default has been stable since the early days > of > > the > > project.) > > > > If you really do want to send messages that are larger than 1 MB in > > size, I > > would highly recommending reading this post[4] from Gwen Shapira. It > > does > > a great job of outlining the things you need to take into > > consideration. > > This will also point you to the relevant configs in Kafka that will > > need to > > be adjusted if you decide to go this route. > > > > > > Thanks, > > Andrew > > > > [1] > > https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180 > > [2] > > https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495 > > [3] https://kafka.apache.org/08/configuration.html > > [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/ > > > > On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU - > > STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote: > > > > > Hi folks, > > > > > > > > > > > > From experimentation and looking at the code it seems that the max > > message > > > size that can be sent via the PublishKafka and PutKafka processors > > in 0.7.0 > > > is 1MB. Can someone please confirm my read on this? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Chris McDermott > > > > > > > > > > > > Remote Business Analytics > > > > > > STaTS/StoreFront Remote > > > > > > HPE Storage > > > > > > Hewlett Packard Enterprise > > > > > > Mobile: +1 978-697-5315 > > > > > > > > > > > > > > > > > > -- > > Thanks, > > Andrew > > > > Subscribe to my book: Streaming Data <http://manning.com/psaltis> > > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > > twiiter: @itmdata <http://twitter.com/intent/ > user?screen_name=itmdata> > > > > > > > > > -- > Thanks, > Andrew > > Subscribe to my book: Streaming Data <http://manning.com/psaltis> > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata> >