[
https://issues.apache.org/jira/browse/NIFI-2614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429532#comment-15429532
]
Christopher McDermott commented on NIFI-2614:
---------------------------------------------
Here is an email trail from the Dev list. It contains some implementation
details.
On 8/20/16, 6:52 PM, "Joe Witt" <[email protected]> wrote:
If no jira is raised sooner I'll raise one and get it sorted.
On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <[email protected]> wrote:
> Hi Chris,
> Sorry for not catching that code path. I am not sure if it is actually a
> regression as I took a look at the 1.0.0-BETA code and it matches the
> 0.7.0, specifically this comment block:
>
> /*
> * We're using the default value from Kafka. We are using it to control
the
> * message size before it goes to to Kafka thus limiting possibility of a
> * late failures in Kafka client.
> */
>
> found at[1] leads me to believe it was intentional and not a regression.
> Looking at the 0.6.1 release code it appears that PutKafka used a default
> of 5 MB [2].
>
> I can speculate on the reasoning behind it, however, I will refrain from
> opining on it as I was not involved in any of the conversations related to
> the change and enforcement of the 1 MB max.
>
> [1]
> https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
> official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
> processors/src/main/java/org/apache/nifi/processors/kafka/
> PublishingContext.java#L36-L41
> [2]
> https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
>
> Thanks,
> Andrew
>
> On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
> STaTS/StorefrontRemote) <[email protected]> wrote:
>
> > Thanks, Andrew.
> >
> > I’ve set all of the right broker configs to allow larger messages.
> > Believe me I spent a lot of time banging my head against the wall
> thinking
> > that the broker and topic configs were wrong.
> >
> > PublisingKafka uses PublishingContext. That class has bean property
> > called maxRequestSize, which defaults to 1048576. As far as I can tell
> the
> > setMaxRequestSize() method is never called (except by some test code.)
> > KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
> > passes the result to the constructor for StreamDemarcator. The publish
> > method then calls the StreamDemarcator. getNextToken(), which in turns
> > calls StreamDemarcator.fill() which compares the stream position against
> > the maxRequestSize and throws the exception with this line.
> >
> > throw new IllegalStateException("Maximum allowed data size of " +
> > this.maxDataSize + " exceeded.");
> >
> > Which matches what I see in the nifi-app.log file…
> >
> > 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
> > o.apache.nifi.processors.kafka.PutKafka
> > java.lang.IllegalStateException: Maximum allowed data size of 1048576
> > exceeded.
> > at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
> StreamDemarcator.java:153)
> > ~[nifi-utils-0.7.0.jar:0.7.0]
> > at org.apache.nifi.stream.io.util.StreamDemarcator.
> > nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
> > at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
> KafkaPublisher.java:129)
> > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
> > at org.apache.nifi.processors.kafka.PutKafka$1.process(
> PutKafka.java:315)
> > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
> > at org.apache.nifi.controller.repository.
> > StandardProcessSession.read(StandardProcessSession.java:1851)
> > ~[nifi-framework-core-0.7.0.jar:0.7.0]
> > at org.apache.nifi.controller.repository.
> > StandardProcessSession.read(StandardProcessSession.java:1822)
> > ~[nifi-framework-core-0.7.0.jar:0.7.0]
> > at org.apache.nifi.processors.kafka.PutKafka.
> > doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
> > jar:0.7.0]
> > at org.apache.nifi.processors.kafka.PutKafka.
> > rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
> > jar:0.7.0]
> > at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
> > onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
> > jar:0.7.0]
> > at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
> > at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> > call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
> > jar:0.7.0]
> > at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> > call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
> > jar:0.7.0]
> > at org.apache.nifi.controller.scheduling.
> > TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
> > [nifi-framework-core-0.7.0.jar:0.7.0]
> > at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> > [na:1.8.0_45]
> > at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
> > [na:1.8.0_45]
> > at java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > [na:1.8.0_45]
> > at java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > [na:1.8.0_45]
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> > [na:1.8.0_45]
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> > [na:1.8.0_45]
> > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
> >
> > This occurs using PublishKafka, and PutKafka. Setting the Max Record
> Size
> > property in the PutKafka processor has no affect on this. Note the
stack
> > trace above is from the PutKafka processor with Max Record Size set to
> 10MB.
> >
> > I believe that this a regression from 0.6.0.
> >
> > Chris McDermott
> >
> > Remote Business Analytics
> > STaTS/StoreFront Remote
> > HPE Storage
> > Hewlett Packard Enterprise
> > Mobile: +1 978-697-5315
> >
> >
> >
> > On 8/20/16, 3:48 PM, "Andrew Psaltis" <[email protected]> wrote:
> >
> > Hi Chris,
> > Regarding the PutKafka processor looking at this block[1] of the
> > PutKafka
> > code, it has a default size of 1 MB, but it does not restrict the
> > size. The
> > DATA_SIZE_VALIDATOR does a sanity check and also enforces that
> > the supported value entered is the correct format <value> [B|
> > KB|MB|GB|TB].
> > Later on in the code at this block[2], the value is set on the Kafka
> > config, again this does not enforce a value maximum.
> >
> > In regards to the PublishKafka processor I do not see where it
> accepts
> > a
> > size nor restrict the size at all.
> >
> > Have you adjusted the 'message.max.bytes' config value for your
> > broker(s)?
> > The default value for that is 1 MB [3] (The url references the 0.8
> > Kafka,
> > however I believe this default has been stable since the early days
> of
> > the
> > project.)
> >
> > If you really do want to send messages that are larger than 1 MB in
> > size, I
> > would highly recommending reading this post[4] from Gwen Shapira.
It
> > does
> > a great job of outlining the things you need to take into
> > consideration.
> > This will also point you to the relevant configs in Kafka that will
> > need to
> > be adjusted if you decide to go this route.
> >
> >
> > Thanks,
> > Andrew
> >
> > [1]
> > https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
> > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
> > [2]
> > https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
> > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
> > [3] https://kafka.apache.org/08/configuration.html
> > [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
> >
> > On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
> > STaTS/StorefrontRemote) <[email protected]> wrote:
> >
> > > Hi folks,
> > >
> > >
> > >
> > > From experimentation and looking at the code it seems that the max
> > message
> > > size that can be sent via the PublishKafka and PutKafka processors
> > in 0.7.0
> > > is 1MB. Can someone please confirm my read on this?
> > >
> > >
> > >
> > > Thanks,
> > >
> > >
> > >
> > > Chris McDermott
> > >
> > >
> > >
> > > Remote Business Analytics
> > >
> > > STaTS/StoreFront Remote
> > >
> > > HPE Storage
> > >
> > > Hewlett Packard Enterprise
> > >
> > > Mobile: +1 978-697-5315
> > >
> > >
> > >
> > >
> >
> >
> > --
> > Thanks,
> > Andrew
> >
> > Subscribe to my book: Streaming Data <http://manning.com/psaltis>
> > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
> > twiiter: @itmdata <http://twitter.com/intent/
> user?screen_name=itmdata>
> >
> >
> >
>
>
> --
> Thanks,
> Andrew
>
> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
>
> Cannot send Kafka messages > 1MiB
> ---------------------------------
>
> Key: NIFI-2614
> URL: https://issues.apache.org/jira/browse/NIFI-2614
> Project: Apache NiFi
> Issue Type: Bug
> Affects Versions: 1.0.0, 0.7.0
> Reporter: Christopher McDermott
>
> Neither PutKafka or PublishKafka can send a message > 1MiB. PublishKafka is
> new in 0.7.0. In 0.6.1, PutKafka could put messages > 1MiB. That processors
> has a Max Record Size property which defined the largest message size. In
> 0.7.0 that property is ignored.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)