Hi Chris,
Sorry for not catching that code path. I am not sure if it is actually a
regression as I took a look at the 1.0.0-BETA code and it matches the
0.7.0, specifically this comment block:

/*
 * We're using the default value from Kafka. We are using it to control the
 * message size before it goes to to Kafka thus limiting possibility of a
 * late failures in Kafka client.
 */

found at[1] leads me to believe it was intentional and not a regression.
Looking at the 0.6.1 release code it appears that PutKafka used a default
of 5 MB [2].

I can speculate on the reasoning behind it, however, I will refrain from
opining on it as I was not involved in any of the conversations related to
the change and enforcement of the 1 MB max.

[1]
https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java#L36-L41
[2]
https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176

Thanks,
Andrew

On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote:

> Thanks, Andrew.
>
> I’ve set all of the right broker configs to allow larger messages.
> Believe me I spent a lot of time banging my head against the wall thinking
> that the broker and topic configs were wrong.
>
> PublisingKafka uses PublishingContext.  That class has bean property
> called maxRequestSize, which defaults to 1048576.  As far as I can tell the
> setMaxRequestSize() method is never called (except by some test code.)
> KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and
> passes the result to the constructor for StreamDemarcator.   The publish
> method then calls the StreamDemarcator. getNextToken(), which in turns
> calls StreamDemarcator.fill() which compares the stream position against
> the maxRequestSize and throws the exception with this line.
>
> throw new IllegalStateException("Maximum allowed data size of " +
> this.maxDataSize + " exceeded.");
>
> Which matches what I see in the nifi-app.log file…
>
> 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
> o.apache.nifi.processors.kafka.PutKafka
> java.lang.IllegalStateException: Maximum allowed data size of 1048576
> exceeded.
>         at 
> org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153)
> ~[nifi-utils-0.7.0.jar:0.7.0]
>         at org.apache.nifi.stream.io.util.StreamDemarcator.
> nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
>         at 
> org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:129)
> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>         at 
> org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:315)
> ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
>         at org.apache.nifi.controller.repository.
> StandardProcessSession.read(StandardProcessSession.java:1851)
> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>         at org.apache.nifi.controller.repository.
> StandardProcessSession.read(StandardProcessSession.java:1822)
> ~[nifi-framework-core-0.7.0.jar:0.7.0]
>         at org.apache.nifi.processors.kafka.PutKafka.
> doRendezvousWithKafka(PutKafka.java:311) ~[nifi-kafka-processors-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.processors.kafka.PutKafka.
> rendezvousWithKafka(PutKafka.java:287) ~[nifi-kafka-processors-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
> onTrigger(AbstractKafkaProcessor.java:76) ~[nifi-kafka-processors-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1054) [nifi-framework-core-0.7.0.jar:0.7.0]
>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0.
> jar:0.7.0]
>         at org.apache.nifi.controller.scheduling.
> TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
> [nifi-framework-core-0.7.0.jar:0.7.0]
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_45]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_45]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_45]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_45]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_45]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_45]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>
> This occurs using PublishKafka, and PutKafka.  Setting the Max Record Size
> property in the PutKafka processor has no affect on this.  Note the stack
> trace above is from the PutKafka processor with Max Record Size set to 10MB.
>
> I believe that this a regression from 0.6.0.
>
> Chris McDermott
>
> Remote Business Analytics
> STaTS/StoreFront Remote
> HPE Storage
> Hewlett Packard Enterprise
> Mobile: +1 978-697-5315
>
>
>
> On 8/20/16, 3:48 PM, "Andrew Psaltis" <psaltis.and...@gmail.com> wrote:
>
>     Hi Chris,
>     Regarding the PutKafka processor looking at this block[1] of the
> PutKafka
>     code, it has a default size of 1 MB, but it does not restrict the
> size. The
>     DATA_SIZE_VALIDATOR does a sanity check and also enforces that
>     the supported value entered is the correct format <value> [B|
> KB|MB|GB|TB].
>     Later on in the code at this block[2], the value is set on the Kafka
>     config, again this does not enforce a value maximum.
>
>     In regards to the PublishKafka processor I do not see where it accepts
> a
>     size nor restrict the size at all.
>
>     Have you adjusted the 'message.max.bytes' config value for your
> broker(s)?
>     The default value for that is 1 MB [3] (The url references the 0.8
> Kafka,
>     however I believe this default has been stable since the early days of
> the
>     project.)
>
>     If you really do want to send messages that are larger than 1 MB in
> size, I
>     would highly recommending reading this post[4] from Gwen Shapira.  It
> does
>     a great job of outlining the things you need to take into
> consideration.
>     This will also point you to the relevant configs in Kafka that will
> need to
>     be adjusted if you decide to go this route.
>
>
>     Thanks,
>     Andrew
>
>     [1]
>     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
>     [2]
>     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
> nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
> main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
>     [3] https://kafka.apache.org/08/configuration.html
>     [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
>
>     On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
>     STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote:
>
>     > Hi folks,
>     >
>     >
>     >
>     > From experimentation and looking at the code it seems that the max
> message
>     > size that can be sent via the PublishKafka and PutKafka processors
> in 0.7.0
>     > is 1MB.  Can someone please confirm my read on this?
>     >
>     >
>     >
>     > Thanks,
>     >
>     >
>     >
>     > Chris McDermott
>     >
>     >
>     >
>     > Remote Business Analytics
>     >
>     > STaTS/StoreFront Remote
>     >
>     > HPE Storage
>     >
>     > Hewlett Packard Enterprise
>     >
>     > Mobile: +1 978-697-5315
>     >
>     >
>     >
>     >
>
>
>     --
>     Thanks,
>     Andrew
>
>     Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>     <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>     twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
>
>
>


-- 
Thanks,
Andrew

Subscribe to my book: Streaming Data <http://manning.com/psaltis>
<https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
  • Max Kafka message s... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
    • Re: Max Kafka ... Andrew Psaltis
      • Re: Max Ka... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
        • Re: Ma... Andrew Psaltis
          • Re... Joe Witt
            • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
              • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)

Reply via email to