Jira is https://issues.apache.org/jira/browse/NIFI-2614.

Thanks,

Chris McDermott
 
Remote Business Analytics
STaTS/StoreFront Remote
HPE Storage
Hewlett Packard Enterprise
Mobile: +1 978-697-5315
 


On 8/20/16, 6:57 PM, "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" 
<chris.mcderm...@hpe.com> wrote:

    I’ll raise a JIRA, Joe.
    
    Thanks,
    
    Chris McDermott
     
    Remote Business Analytics
    STaTS/StoreFront Remote
    HPE Storage
    Hewlett Packard Enterprise
    Mobile: +1 978-697-5315
     
    
    
    On 8/20/16, 6:52 PM, "Joe Witt" <joe.w...@gmail.com> wrote:
    
        If no jira is raised sooner I'll raise one and get it sorted.
        
        On Aug 20, 2016 6:40 PM, "Andrew Psaltis" <psaltis.and...@gmail.com> 
wrote:
        
        > Hi Chris,
        > Sorry for not catching that code path. I am not sure if it is 
actually a
        > regression as I took a look at the 1.0.0-BETA code and it matches the
        > 0.7.0, specifically this comment block:
        >
        > /*
        >  * We're using the default value from Kafka. We are using it to 
control the
        >  * message size before it goes to to Kafka thus limiting possibility 
of a
        >  * late failures in Kafka client.
        >  */
        >
        > found at[1] leads me to believe it was intentional and not a 
regression.
        > Looking at the 0.6.1 release code it appears that PutKafka used a 
default
        > of 5 MB [2].
        >
        > I can speculate on the reasoning behind it, however, I will refrain 
from
        > opining on it as I was not involved in any of the conversations 
related to
        > the change and enforcement of the 1 MB max.
        >
        > [1]
        > https://github.com/apache/nifi/blob/rel/nifi-1.0.0-BETA-
        > official/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-
        > processors/src/main/java/org/apache/nifi/processors/kafka/
        > PublishingContext.java#L36-L41
        > [2]
        > https://github.com/apache/nifi/blob/rel/nifi-0.6.1/nifi-
        > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
        > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L169-L176
        >
        > Thanks,
        > Andrew
        >
        > On Sat, Aug 20, 2016 at 6:09 PM, McDermott, Chris Kevin (MSDU -
        > STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote:
        >
        > > Thanks, Andrew.
        > >
        > > I’ve set all of the right broker configs to allow larger messages.
        > > Believe me I spent a lot of time banging my head against the wall
        > thinking
        > > that the broker and topic configs were wrong.
        > >
        > > PublisingKafka uses PublishingContext.  That class has bean property
        > > called maxRequestSize, which defaults to 1048576.  As far as I can 
tell
        > the
        > > setMaxRequestSize() method is never called (except by some test 
code.)
        > > KafkaPublisher.publish() calls Max Record Size.getMaxRequestSize() 
and
        > > passes the result to the constructor for StreamDemarcator.   The 
publish
        > > method then calls the StreamDemarcator. getNextToken(), which in 
turns
        > > calls StreamDemarcator.fill() which compares the stream position 
against
        > > the maxRequestSize and throws the exception with this line.
        > >
        > > throw new IllegalStateException("Maximum allowed data size of " +
        > > this.maxDataSize + " exceeded.");
        > >
        > > Which matches what I see in the nifi-app.log file…
        > >
        > > 2016-08-20 22:03:05,470 ERROR [Timer-Driven Process Thread-8]
        > > o.apache.nifi.processors.kafka.PutKafka
        > > java.lang.IllegalStateException: Maximum allowed data size of 
1048576
        > > exceeded.
        > >         at org.apache.nifi.stream.io.util.StreamDemarcator.fill(
        > StreamDemarcator.java:153)
        > > ~[nifi-utils-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.stream.io.util.StreamDemarcator.
        > > nextToken(StreamDemarcator.java:105) ~[nifi-utils-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.KafkaPublisher.publish(
        > KafkaPublisher.java:129)
        > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.PutKafka$1.process(
        > PutKafka.java:315)
        > > ~[nifi-kafka-processors-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.controller.repository.
        > > StandardProcessSession.read(StandardProcessSession.java:1851)
        > > ~[nifi-framework-core-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.controller.repository.
        > > StandardProcessSession.read(StandardProcessSession.java:1822)
        > > ~[nifi-framework-core-0.7.0.jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.PutKafka.
        > > doRendezvousWithKafka(PutKafka.java:311) 
~[nifi-kafka-processors-0.7.0.
        > > jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.PutKafka.
        > > rendezvousWithKafka(PutKafka.java:287) 
~[nifi-kafka-processors-0.7.0.
        > > jar:0.7.0]
        > >         at org.apache.nifi.processors.kafka.AbstractKafkaProcessor.
        > > onTrigger(AbstractKafkaProcessor.java:76) 
~[nifi-kafka-processors-0.7.0.
        > > jar:0.7.0]
        > >         at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(
        > > StandardProcessorNode.java:1054) 
[nifi-framework-core-0.7.0.jar:0.7.0]
        > >         at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
        > > call(ContinuallyRunProcessorTask.java:136) 
[nifi-framework-core-0.7.0.
        > > jar:0.7.0]
        > >         at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
        > > call(ContinuallyRunProcessorTask.java:47) 
[nifi-framework-core-0.7.0.
        > > jar:0.7.0]
        > >         at org.apache.nifi.controller.scheduling.
        > > 
TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:127)
        > > [nifi-framework-core-0.7.0.jar:0.7.0]
        > >         at java.util.concurrent.Executors$RunnableAdapter.
        > call(Executors.java:511)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.FutureTask.runAndReset(
        > FutureTask.java:308)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.ScheduledThreadPoolExecutor$
        > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.ScheduledThreadPoolExecutor$
        > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.ThreadPoolExecutor.runWorker(
        > ThreadPoolExecutor.java:1142)
        > > [na:1.8.0_45]
        > >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
        > ThreadPoolExecutor.java:617)
        > > [na:1.8.0_45]
        > >         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
        > >
        > > This occurs using PublishKafka, and PutKafka.  Setting the Max 
Record
        > Size
        > > property in the PutKafka processor has no affect on this.  Note the 
stack
        > > trace above is from the PutKafka processor with Max Record Size set 
to
        > 10MB.
        > >
        > > I believe that this a regression from 0.6.0.
        > >
        > > Chris McDermott
        > >
        > > Remote Business Analytics
        > > STaTS/StoreFront Remote
        > > HPE Storage
        > > Hewlett Packard Enterprise
        > > Mobile: +1 978-697-5315
        > >
        > >
        > >
        > > On 8/20/16, 3:48 PM, "Andrew Psaltis" <psaltis.and...@gmail.com> 
wrote:
        > >
        > >     Hi Chris,
        > >     Regarding the PutKafka processor looking at this block[1] of the
        > > PutKafka
        > >     code, it has a default size of 1 MB, but it does not restrict 
the
        > > size. The
        > >     DATA_SIZE_VALIDATOR does a sanity check and also enforces that
        > >     the supported value entered is the correct format <value> [B|
        > > KB|MB|GB|TB].
        > >     Later on in the code at this block[2], the value is set on the 
Kafka
        > >     config, again this does not enforce a value maximum.
        > >
        > >     In regards to the PublishKafka processor I do not see where it
        > accepts
        > > a
        > >     size nor restrict the size at all.
        > >
        > >     Have you adjusted the 'message.max.bytes' config value for your
        > > broker(s)?
        > >     The default value for that is 1 MB [3] (The url references the 
0.8
        > > Kafka,
        > >     however I believe this default has been stable since the early 
days
        > of
        > > the
        > >     project.)
        > >
        > >     If you really do want to send messages that are larger than 1 
MB in
        > > size, I
        > >     would highly recommending reading this post[4] from Gwen 
Shapira.  It
        > > does
        > >     a great job of outlining the things you need to take into
        > > consideration.
        > >     This will also point you to the relevant configs in Kafka that 
will
        > > need to
        > >     be adjusted if you decide to go this route.
        > >
        > >
        > >     Thanks,
        > >     Andrew
        > >
        > >     [1]
        > >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
        > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
        > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L174-L180
        > >     [2]
        > >     https://github.com/apache/nifi/blob/rel/nifi-0.7.0/nifi-
        > > nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/
        > > main/java/org/apache/nifi/processors/kafka/PutKafka.java#L495
        > >     [3] https://kafka.apache.org/08/configuration.html
        > >     [4] http://ingest.tips/2015/01/21/handling-large-messages-kafka/
        > >
        > >     On Sat, Aug 20, 2016 at 3:25 PM, McDermott, Chris Kevin (MSDU -
        > >     STaTS/StorefrontRemote) <chris.mcderm...@hpe.com> wrote:
        > >
        > >     > Hi folks,
        > >     >
        > >     >
        > >     >
        > >     > From experimentation and looking at the code it seems that 
the max
        > > message
        > >     > size that can be sent via the PublishKafka and PutKafka 
processors
        > > in 0.7.0
        > >     > is 1MB.  Can someone please confirm my read on this?
        > >     >
        > >     >
        > >     >
        > >     > Thanks,
        > >     >
        > >     >
        > >     >
        > >     > Chris McDermott
        > >     >
        > >     >
        > >     >
        > >     > Remote Business Analytics
        > >     >
        > >     > STaTS/StoreFront Remote
        > >     >
        > >     > HPE Storage
        > >     >
        > >     > Hewlett Packard Enterprise
        > >     >
        > >     > Mobile: +1 978-697-5315
        > >     >
        > >     >
        > >     >
        > >     >
        > >
        > >
        > >     --
        > >     Thanks,
        > >     Andrew
        > >
        > >     Subscribe to my book: Streaming Data 
<http://manning.com/psaltis>
        > >     <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
        > >     twiiter: @itmdata <http://twitter.com/intent/
        > user?screen_name=itmdata>
        > >
        > >
        > >
        >
        >
        > --
        > Thanks,
        > Andrew
        >
        > Subscribe to my book: Streaming Data <http://manning.com/psaltis>
        > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
        > twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
        >
        
    
    

  • Max Kafka message s... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
    • Re: Max Kafka ... Andrew Psaltis
      • Re: Max Ka... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
        • Re: Ma... Andrew Psaltis
          • Re... Joe Witt
            • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
              • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... Oleg Zhurakousky
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
                • ... Joe Witt
                • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)

Reply via email to