Rgr that. Thanks chris On Aug 20, 2016 7:04 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <chris.mcderm...@hpe.com> wrote:
> Jira is https://issues.apache.org/jira/browse/NIFI-2614. > > Thanks, > > Chris McDermott > > Remote Business Analytics > STaTS/StoreFront Remote > HPE Storage > Hewlett Packard Enterprise > Mobile: +1 978-697-5315 > > > > On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - > STaTS/StorefrontRemote)" <chris.mcderm...@hpe.com> wrote: > > I’ll raise a JIRA, Joe. > > Thanks, > > Chris McDermott > > Remote Business Analytics > STaTS/StoreFront Remote > HPE Storage > Hewlett Packard Enterprise > Mobile: +1 978-697-5315 > > > > On 8/20/16, 6:52 PM, "Joe Witt" <joe.w...@gmail.com> wrote: > > If no jira is raised sooner I'll raise one and get it sorted. > > On Aug 20, 2016 6:40 PM, "Andrew Psaltis" < > psaltis.and...@gmail.com> wrote: > > > Hi Chris, > > Sorry for not catching that code path. I am not sure if it is > actually a > > regression as I took a look at the 1.0.0-BETA code and it > matches the > > 0.7.0, specifically this comment block: > > > > /* > > * We're using the default value from Kafka. We are using it to > control the > > * message size before it goes to to Kafka thus limiting > possibility of a > > * late failures in Kafka client. > > */ > > > > found at[1] leads me to believe it was intentional and not a > regression. > > Looking at the 0.6.1 release code it appears that PutKafka used > a default > > of 5 MB [2]. > > > > I can speculate on the reasoning behind it, however, I will > refrain from > > opining on it as I was not involved in any of the conversations > related to > > the change and enforcement of the 1 MB max. > > > > [1] > > https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA- > > official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka- > > processors/src/main/java/org/apache/nifi/processors/kafka/ > > PublishingContext.java#L36-L41 > > [2] > > https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi- > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > > main/java/org/apache/nifi/processors/kafka/PutKafka. > java#L169-L176 > > > > Thanks, > > Andrew > > > > On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU - > > STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote: > > > > > Thanks, Andrew. > > > > > > I’ve set all of the right broker configs to allow larger > messages. > > > Believe me I spent a lot of time banging my head against the > wall > > thinking > > > that the broker and topic configs were wrong. > > > > > > PublisingKafka uses PublishingContext. That class has bean > property > > > called maxRequestSize, which defaults to 1048576. As far as I > can tell > > the > > > setMaxRequestSize() method is never called (except by some > test code.) > > > KafkaPublisher.publish() calls Max Record > Size.getMaxRequestSize() and > > > passes the result to the constructor for StreamDemarcator. > The publish > > > method then calls the StreamDemarcator. getNextToken(), which > in turns > > > calls StreamDemarcator.fill() which compares the stream > position against > > > the maxRequestSize and throws the exception with this line. > > > > > > throw new IllegalStateException("Maximum allowed data size of > " + > > > this.maxDataSize + " exceeded."); > > > > > > Which matches what I see in the nifi-app.log file… > > > > > > 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8] > > > o.apache.nifi.processors.kafka.PutKafka > > > java.lang.IllegalStateException: Maximum allowed data size of > 1048576 > > > exceeded. > > > at org.apache.nifi.stream.io. > util.StreamDemarcator.fill( > > StreamDemarcator.java:153) > > > ~[nifi-utils-0.7.0.jar:0.7.0] > > > at org.apache.nifi.stream.io.util.StreamDemarcator. > > > nextToken(StreamDemarcator.java:105) > ~[nifi-utils-0.7.0.jar:0.7.0] > > > at org.apache.nifi.processors. > kafka.KafkaPublisher.publish( > > KafkaPublisher.java:129) > > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0] > > > at org.apache.nifi.processors. > kafka.PutKafka$1.process( > > PutKafka.java:315) > > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0] > > > at org.apache.nifi.controller.repository. > > > StandardProcessSession.read(StandardProcessSession.java:1851) > > > ~[nifi-framework-core-0.7.0.jar:0.7.0] > > > at org.apache.nifi.controller.repository. > > > StandardProcessSession.read(StandardProcessSession.java:1822) > > > ~[nifi-framework-core-0.7.0.jar:0.7.0] > > > at org.apache.nifi.processors.kafka.PutKafka. > > > doRendezvousWithKafka(PutKafka.java:311) > ~[nifi-kafka-processors-0.7.0. > > > jar:0.7.0] > > > at org.apache.nifi.processors.kafka.PutKafka. > > > rendezvousWithKafka(PutKafka.java:287) > ~[nifi-kafka-processors-0.7.0. > > > jar:0.7.0] > > > at org.apache.nifi.processors. > kafka.AbstractKafkaProcessor. > > > onTrigger(AbstractKafkaProcessor.java:76) > ~[nifi-kafka-processors-0.7.0. > > > jar:0.7.0] > > > at org.apache.nifi.controller.StandardProcessorNode. > onTrigger( > > > StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0. > jar:0.7.0] > > > at org.apache.nifi.controller.tasks. > ContinuallyRunProcessorTask. > > > call(ContinuallyRunProcessorTask.java:136) > [nifi-framework-core-0.7.0. > > > jar:0.7.0] > > > at org.apache.nifi.controller.tasks. > ContinuallyRunProcessorTask. > > > call(ContinuallyRunProcessorTask.java:47) > [nifi-framework-core-0.7.0. > > > jar:0.7.0] > > > at org.apache.nifi.controller.scheduling. > > > TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent. > java:127) > > > [nifi-framework-core-0.7.0.jar:0.7.0] > > > at java.util.concurrent.Executors$RunnableAdapter. > > call(Executors.java:511) > > > [na:1.8.0_45] > > > at java.util.concurrent.FutureTask.runAndReset( > > FutureTask.java:308) > > > [na:1.8.0_45] > > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor. > java:180) > > > [na:1.8.0_45] > > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > > [na:1.8.0_45] > > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > > ThreadPoolExecutor.java:1142) > > > [na:1.8.0_45] > > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > > ThreadPoolExecutor.java:617) > > > [na:1.8.0_45] > > > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] > > > > > > This occurs using PublishKafka, and PutKafka. Setting the Max > Record > > Size > > > property in the PutKafka processor has no affect on this. > Note the stack > > > trace above is from the PutKafka processor with Max Record > Size set to > > 10MB. > > > > > > I believe that this a regression from 0.6.0. > > > > > > Chris McDermott > > > > > > Remote Business Analytics > > > STaTS/StoreFront Remote > > > HPE Storage > > > Hewlett Packard Enterprise > > > Mobile: +1 978-697-5315 > > > > > > > > > > > > On 8/20/16, 3:48 PM, "Andrew Psaltis" < > psaltis.and...@gmail.com> wrote: > > > > > > Hi Chris, > > > Regarding the PutKafka processor looking at this block[1] > of the > > > PutKafka > > > code, it has a default size of 1 MB, but it does not > restrict the > > > size. The > > > DATA_SIZE_VALIDATOR does a sanity check and also enforces > that > > > the supported value entered is the correct format <value> > [B| > > > KB|MB|GB|TB]. > > > Later on in the code at this block[2], the value is set on > the Kafka > > > config, again this does not enforce a value maximum. > > > > > > In regards to the PublishKafka processor I do not see > where it > > accepts > > > a > > > size nor restrict the size at all. > > > > > > Have you adjusted the 'message.max.bytes' config value for > your > > > broker(s)? > > > The default value for that is 1 MB [3] (The url references > the 0.8 > > > Kafka, > > > however I believe this default has been stable since the > early days > > of > > > the > > > project.) > > > > > > If you really do want to send messages that are larger > than 1 MB in > > > size, I > > > would highly recommending reading this post[4] from Gwen > Shapira. It > > > does > > > a great job of outlining the things you need to take into > > > consideration. > > > This will also point you to the relevant configs in Kafka > that will > > > need to > > > be adjusted if you decide to go this route. > > > > > > > > > Thanks, > > > Andrew > > > > > > [1] > > > https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- > > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > > > main/java/org/apache/nifi/processors/kafka/PutKafka. > java#L174-L180 > > > [2] > > > https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi- > > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/ > > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495 > > > [3] https://kafka.apache.org/08/configuration.html > > > [4] http://ingest.tips/2015/01/21/ > handling-large-messages-kafka/ > > > > > > On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin > (MSDU - > > > STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote: > > > > > > > Hi folks, > > > > > > > > > > > > > > > > From experimentation and looking at the code it seems > that the max > > > message > > > > size that can be sent via the PublishKafka and PutKafka > processors > > > in 0.7.0 > > > > is 1MB. Can someone please confirm my read on this? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Chris McDermott > > > > > > > > > > > > > > > > Remote Business Analytics > > > > > > > > STaTS/StoreFront Remote > > > > > > > > HPE Storage > > > > > > > > Hewlett Packard Enterprise > > > > > > > > Mobile: +1 978-697-5315 > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Andrew > > > > > > Subscribe to my book: Streaming Data < > http://manning.com/psaltis> > > > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > > > twiiter: @itmdata <http://twitter.com/intent/ > > user?screen_name=itmdata> > > > > > > > > > > > > > > > -- > > Thanks, > > Andrew > > > > Subscribe to my book: Streaming Data <http://manning.com/psaltis > > > > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > > twiiter: @itmdata <http://twitter.com/intent/ > user?screen_name=itmdata> > > > > > > >