Thanks, Andrew.  

I’ve set all of the right broker configs to allow larger messages.  Believe me 
I spent a lot of time banging my head against the wall thinking that the broker 
and topic configs were wrong.

PublisingKafka uses PublishingContext.  That class has bean property called 
maxRequestSize, which defaults to 1048576.  As far as I can tell the 
setMaxRequestSize() method is never called (except by some test code.)  
KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() and passes 
the result to the constructor for StreamDemarcator.   The publish method then 
calls the StreamDemarcator. getNextToken(), which in turns calls 
StreamDemarcator.fill() which compares the stream position against the 
maxRequestSize and throws the exception with this line.

throw new IllegalStateException("Maximum allowed data size of " + 
this.maxDataSize + " exceeded.");

Which matches what I see in the nifi-app.log file…

2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8] 
o.apache.nifi.processors.kafka.PutKafka
java.lang.IllegalStateException: Maximum allowed data size of 1048576 exceeded.
        at 
org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153) 
~[nifi-utils-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.stream.io.util.StreamDemarcator.nextToken(StreamDemarcator.java:105)
 ~[nifi-utils-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:129)
 ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:315) 
~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851)
 ~[nifi-framework-core-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1822)
 ~[nifi-framework-core-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.processors.kafka.PutKafka.doRendezvousWithKafka(PutKafka.java:311)
 ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.processors.kafka.PutKafka.rendezvousWithKafka(PutKafka.java:287)
 ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.processors.kafka.AbstractKafkaProcessor.onTrigger(AbstractKafkaProcessor.java:76)
 ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1054)
 [nifi-framework-core-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-0.7.0.jar:0.7.0]
        at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
 [nifi-framework-core-0.7.0.jar:0.7.0]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_45]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[na:1.8.0_45]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_45]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_45]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_45]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

This occurs using PublishKafka, and PutKafka.  Setting the Max Record Size 
property in the PutKafka processor has no affect on this.  Note the stack trace 
above is from the PutKafka processor with Max Record Size set to 10MB.

I believe that this a regression from 0.6.0.

Chris McDermott
 
Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315
 


On 8/20/16, 3:48 PM, "Andrew Psaltis" <[email protected]> wrote:

    Hi Chris,
    Regarding the PutKafka processor looking at this block[1] of the PutKafka
    code, it has a default size of 1 MB, but it does not restrict the size. The
    DATA_SIZE_VALIDATOR does a sanity check and also enforces that
    the supported value entered is the correct format <value> [B| KB|MB|GB|TB].
    Later on in the code at this block[2], the value is set on the Kafka
    config, again this does not enforce a value maximum.
    
    In regards to the PublishKafka processor I do not see where it accepts a
    size nor restrict the size at all.
    
    Have you adjusted the 'message.max.bytes' config value for your broker(s)?
    The default value for that is 1 MB [3] (The url references the 0.8 Kafka,
    however I believe this default has been stable since the early days of the
    project.)
    
    If you really do want to send messages that are larger than 1 MB in size, I
    would highly recommending reading this post[4] from Gwen Shapira.  It does
    a great job of outlining the things you need to take into consideration.
    This will also point you to the relevant configs in Kafka that will need to
    be adjusted if you decide to go this route.
    
    
    Thanks,
    Andrew
    
    [1]
    
https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
    [2]
    
https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
    [3] https://kafka.apache.org/08/configuration.html
    [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
    
    On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
    STaTS/StorefrontRemote) <[email protected]> wrote:
    
    > Hi folks,
    >
    >
    >
    > From experimentation and looking at the code it seems that the max message
    > size that can be sent via the PublishKafka and PutKafka processors in 
0.7.0
    > is 1MB.  Can someone please confirm my read on this?
    >
    >
    >
    > Thanks,
    >
    >
    >
    > Chris McDermott
    >
    >
    >
    > Remote Business Analytics
    >
    > STaTS/StoreFront Remote
    >
    > HPE Storage
    >
    > Hewlett Packard Enterprise
    >
    > Mobile: +1 978-697-5315
    >
    >
    >
    >
    
    
    -- 
    Thanks,
    Andrew
    
    Subscribe to my book: Streaming Data <http://manning.com/psaltis>
    <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
    twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
    

  • Max Kafka message s... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
    • Re: Max Kafka ... Andrew Psaltis
      • Re: Max Ka... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
        • Re: Ma... Andrew Psaltis
          • Re... Joe Witt
            • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
              • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)

Reply via email to