Hi Chris,

Yep, now my processors are much faster.

I'll submit the patch soon :)

Casey


On Mon, Mar 31, 2014 at 7:10 PM, Chris Riccomini <[email protected]>wrote:

> Hey Casey,
>
> I've opened:
>
>   https://issues.apache.org/jira/browse/SAMZA-213
>
>
> Would you be up for submitting a patch?
>
> Cheers,
> Chris
>
> On 3/31/14 10:00 AM, "Chris Riccomini" <[email protected]> wrote:
>
> >Hey Casey,
> >
> >Yikes, nice catch. This does indeed look like a bug.
> >
> >As you say, I think we just need to move the update call outside of the
> >refresh.maybeCall block.
> >
> >The essence of the problem you've found is that we're updating the chooser
> >inside the refresh block, even though it doesn't have anything to do with
> >it. We should always update the chooser on every iteration, provided we
> >have messages in the unprocessedMessages buffer, regardless of whether we
> >received messages (or are backing off) on our latest poll to the
> >consumers.
> >
> >Does moving the the update block outside of refresh.maybeCall speed up
> >your processor?
> >
> >Cheers,
> >Chris
> >
> >On 3/29/14 4:58 PM, "Anh Thu Vu" <[email protected]> wrote:
> >
> >>Ok, I feel like I'm spamming this mailing list too much but I think I
> >>finally figure out what happened.
> >>
> >>For my case of 2 tasks, first task write to the kafka topic with 2
> >>partitions, and the second task reading from that topic.
> >>Most of the messages are sent to the first partition and very few are in
> >>the second partition. So the problem happens in the "choose" method of
> >>SystemConsumers.
> >>
> >>As there is very few messages in the 2nd partition, most of the time,
> >>number of messages in the queue of MessageChooser <= 1.
> >>After SystemConsumers get back that message from MessageChooser, it calls
> >>to refresh.maybeCall() but this does not invoke refresh.call() (due to
> >>the
> >>backoff) and thus we did not update MessageChooser. MessageChooser queue
> >>now is empty and next time SystemConsumers.choose() is called, it will
> >>return a null envelope and cause the blocking.
> >>
> >>So the problem is the updating of MessageChooser is packed with the
> >>reading
> >>of new messages from consumers. My suggestion is simply move the update
> >>of
> >>MessageChooser queue outside refresh.call(), something like the snippet
> >>below:
> >>
> >>def choose: IncomingMessageEnvelope = {
> >>    val envelopeFromChooser = chooser.choose
> >>
> >>    if (envelopeFromChooser == null) {
> >>      debug("Chooser returned null.")
> >>      ...
> >>      // Allow blocking if the chooser didn't choose a message.
> >>      timeout = noNewMessagesTimeout
> >>    } else {
> >>      debug("Chooser returned an incoming message envelope: %s" format
> >>envelopeFromChooser)
> >>      // Do something with the envelope from MessageChooser
> >>      ...
> >>    }
> >>
> >>    refresh.maybeCall()
> >>    // Update MessageChooser here
> >>    envelopeFromChooser
> >>  }
> >>
> >>Correct me if I'm wrong. :)
> >>Cheers,
> >>Casey
> >>
> >>
> >>
> >>On Sat, Mar 29, 2014 at 8:49 PM, Anh Thu Vu <[email protected]>
> >>wrote:
> >>
> >>> I think I know the reason. It is related to the MessageChooser. If I'm
> >>>not
> >>> wrong, the default Kafka stream between the 2 tasks has 2 partitions
> >>>and
> >>> the default MessageChooser tries to read from the 2 partitions in a
> >>> round-robin fashion. As the messages are not distributed evenly between
> >>>the
> >>> 2 (coz I did not provide message key, I guess) so MessageChooser keeps
> >>> waiting on the partition with fewer messages and delays the whole
> >>>thing.
> >>>
> >>> Does my guess sound right?
> >>> I will have to do something to verify my guessing (and fix my problem
> >>>:) )
> >>>
> >>> I'll update this once I can verify and fix it.
> >>>
> >>> Cheers,
> >>> Casey
> >>>
> >>>
> >>> On Sat, Mar 29, 2014 at 2:40 PM, Anh Thu Vu <[email protected]>
> >>>wrote:
> >>>
> >>>> Hmm, when I hack samza code and change the default value of
> >>>>noNewMessagesTimeout
> >>>> in org.apache.samza.system.SystemConsumers from 10 to 5, the
> >>>>throughput
> >>>> (as recorded in second Task) goes exactly 2 times faster.
> >>>>
> >>>> I'm not clear how this happens, maybe you have a better idea or a
> >>>>better
> >>>> guess?
> >>>>
> >>>> Casey
> >>>>
> >>>>
> >>>> On Sat, Mar 29, 2014 at 12:25 PM, Anh Thu Vu
> >>>><[email protected]>wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I tried running kafka-console-consumer.sh to listen to the output of
> >>>>>the
> >>>>> first task and the speed was ok (I don't have an exact measurement
> >>>>>but it
> >>>>> is much faster than the result from the second task). So much guess
> >>>>>is
> >>>>> something got stuck at the consumer in the second task. Is there
> >>>>>possibly
> >>>>> any synchronization in the consumer?
> >>>>>
> >>>>> Casey
> >>>>>
> >>>>>
> >>>>> On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu
> >>>>><[email protected]>wrote:
> >>>>>
> >>>>>> Ok, I went home and ran these two: FirstTask and SecondTask in
> >>>>>> samza-test/ (see attachment - I was lazy so I just inject the code
> >>>>>>into
> >>>>>> hello-samza) on my MacbookAir
> >>>>>>
> >>>>>> What I got is 1000 messages per 10-11seconds (worse than the result
> >>>>>>I
> >>>>>> got when I ran on my office machine this afternoon, must be my lousy
> >>>>>>Mac).
> >>>>>> Anyway, please help me to see if there is anything wrong with my
> >>>>>>code
> >>>>>> or the config.
> >>>>>>
> >>>>>> Thanks a lot!!!!
> >>>>>> Casey
> >>>>>>
> >>>>>> PS: I tried with sync & batchsize=1, with async & batchsize=200 but
> >>>>>> strangely the performance did not seem to differ.
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu
> >>>>>><[email protected]>wrote:
> >>>>>>
> >>>>>>> Hi Chris,
> >>>>>>>
> >>>>>>> The previous message was sent by mistake. Somehow the SEND button
> >>>>>>>got
> >>>>>>> clicked before I could finish writing. Please ignore it and see the
> >>>>>>>one
> >>>>>>> below
> >>>>>>>
> >>>>>>> In fact, my code is "a bit" complicated as it lies in a bigger
> >>>>>>> "platform". What I'm planning to do now is to write 2 pure samza
> >>>>>>>tasks to
> >>>>>>> test it.
> >>>>>>>
> >>>>>>> For now, the config is something like this:
> >>>>>>> Task 1
> >>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
> >>>>>>> job.name=Dummy_20140328182500-0
> >>>>>>>
> >>>>>>> task.clas=...
> >>>>>>> task.inputs=mysystem.Dummy_20140328182500-0
> >>>>>>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra
> >>>>>>>property)
> >>>>>>>
> >>>>>>> systems.mysystem.samza.factory=MySystemFactory
> >>>>>>>
> >>>>>>> serializers.registry.kryo.class=KryoSerdeFactory
> >>>>>>> systems.kafka.samza.msg.serde=kryo
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSyste
> >>>>>>>m
> >>>>>>>Factory
> >>>>>>> systems.kafka.producer.batch.num.messages=1
> >>>>>>> systems.kafka.producer.producer.type=sync
> >>>>>>>
> >>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
> >>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
> >>>>>>>
> >>>>>>>
> >>>>>>> Task 2:
> >>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
> >>>>>>> job.name=Dummy_20140328182500-1
> >>>>>>>
> >>>>>>> task.class=...
> >>>>>>> task.inputs=kafka.Dummy_20140328182500-0-1-0
> >>>>>>>
> >>>>>>> serializers.registry.kryo.class=KryoSerdeFactory
> >>>>>>> systems.kafka.samza.msg.serde=kryo
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSyste
> >>>>>>>m
> >>>>>>>Factory
> >>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
> >>>>>>> systems.kafka.producer.batch.num.messages=1
> >>>>>>> systems.kafka.producer.producer.type=sync
> >>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
> >>>>>>>
> >>>>>>>
> >>>>>>> For the code, a simplified one is:
> >>>>>>> In the custom consumer of MySystem:
> >>>>>>> @Override
> >>>>>>>         public void start() {
> >>>>>>>             Thread processorPollingThread = new Thread(
> >>>>>>>                     new Runnable() {
> >>>>>>>                         @Override
> >>>>>>>                         public void run() {
> >>>>>>>                             try {
> >>>>>>>                                 pollingEntranceProcessor();
> >>>>>>>                                 setIsAtHead(systemStreamPartition,
> >>>>>>> true);
> >>>>>>>                             } catch (InterruptedException e) {
> >>>>>>>                                 e.getStackTrace();
> >>>>>>>                                 stop();
> >>>>>>>                             }
> >>>>>>>                         }
> >>>>>>>                     }
> >>>>>>>             );
> >>>>>>>
> >>>>>>>             processorPollingThread.start();
> >>>>>>>         }
> >>>>>>>
> >>>>>>>         @Override
> >>>>>>>         public void stop() {
> >>>>>>>
> >>>>>>>         }
> >>>>>>>
> >>>>>>>         private void pollingEntranceProcessor() throws
> >>>>>>> InterruptedException {
> >>>>>>>             int messageCnt = 0;
> >>>>>>>             long startTime = System.nanoTime();
> >>>>>>>             while(!this.entranceProcessor.isFinished()) {
> >>>>>>>                 messageCnt =
> >>>>>>> this.getNumMessagesInQueue(systemStreamPartition);
> >>>>>>>                 if (this.entranceProcessor.hasNext() && messageCnt
> >>>>>>><
> >>>>>>> 10000) {
> >>>>>>>                     this.put(systemStreamPartition, new
> >>>>>>> IncomingMessageEnvelope(systemStreamPartition,null,
> >>>>>>> null,this.entranceProcessor.nextEvent()));
> >>>>>>>                 } else {
> >>>>>>>                     try {
> >>>>>>>                         Thread.sleep(100);
> >>>>>>>                     } catch (InterruptedException e) {
> >>>>>>>                         break;
> >>>>>>>                     }
> >>>>>>>                 }
> >>>>>>>             }
> >>>>>>>             // Send last event
> >>>>>>>             this.put(systemStreamPartition, new
> >>>>>>> IncomingMessageEnvelope(systemStreamPartition,null,
> >>>>>>> null,this.entranceProcessor.nextEvent()));
> >>>>>>>         }
> >>>>>>>
> >>>>>>>  In the first Task:
> >>>>>>> @Override
> >>>>>>>     public void process(IncomingMessageEnvelope envelope,
> >>>>>>> MessageCollector collector, TaskCoordinator coordinator) throws
> >>>>>>>Exception {
> >>>>>>>
> >>>>>>>         this.outputStream.setCollector(collector);
> >>>>>>>         this.outputStream.put(event); // This will call
> >>>>>>> collector.send()
> >>>>>>>
> >>>>>>>     }
> >>>>>>>
> >>>>>>> In the second Task:
> >>>>>>> public void process(IncomingMessageEnvelope envelope,
> >>>>>>>MessageCollector
> >>>>>>> collector, TaskCoordinator coordinator) throws Exception {
> >>>>>>>         for (SamzaStream stream:this.outputStreams) {
> >>>>>>>             stream.setCollector(collector);
> >>>>>>>         }
> >>>>>>>         this.processor.process((ContentEvent)
> >>>>>>>envelope.getMessage());
> >>>>>>> // The content of this method is shown below
> >>>>>>>     }
> >>>>>>>
> >>>>>>>
> >>>>>>> public boolean process(ContentEvent event) {
> >>>>>>>         counter++;
> >>>>>>>
> >>>>>>>         if(counter == 1){
> >>>>>>>             sampleStart = System.nanoTime();
> >>>>>>>             expStart = sampleStart;
> >>>>>>>             logger.info("End processor starts receiving events");
> >>>>>>>         }
> >>>>>>>
> >>>>>>>         sampleEnd = System.nanoTime();
> >>>>>>>         if (counter  % freq == 0) {
> >>>>>>>             long sampleDuration =
> >>>>>>>TimeUnit.SECONDS.convert(sampleEnd -
> >>>>>>> sampleStart, TimeUnit.NANOSECONDS);
> >>>>>>>             logger.info("Instances index: {} - {} seconds",
> >>>>>>>counter,
> >>>>>>> sampleDuration);
> >>>>>>>             sampleStart = sampleEnd;
> >>>>>>>         }
> >>>>>>>
> >>>>>>>         if(event.isLastEvent()){
> >>>>>>>             long sampleDuration =
> >>>>>>>TimeUnit.SECONDS.convert(sampleEnd -
> >>>>>>> expStart, TimeUnit.NANOSECONDS);
> >>>>>>>             logger.info("Total: {} - {} seconds", counter,
> >>>>>>> sampleDuration);
> >>>>>>>
> >>>>>>>         }
> >>>>>>> }
> >>>>>>>
> >>>>>>> As I said, I will write the new samza tasks and test NOW. Will let
> >>>>>>>you
> >>>>>>> know later.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Casey
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini <
> >>>>>>> [email protected]> wrote:
> >>>>>>>
> >>>>>>>> Hey Anh,
> >>>>>>>>
> >>>>>>>> For a simple read/write StreamTask that has little logic in it,
> >>>>>>>>you
> >>>>>>>> should
> >>>>>>>> be able to get 10,000+ messages/sec per-container with a 1kb msg
> >>>>>>>> payload
> >>>>>>>> when talking to a remote Kafka broker.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> At first glance, setting a batch size of 1 with a sync producer
> >>>>>>>>will
> >>>>>>>> definitely slow down your task, especially if num.acks is set to a
> >>>>>>>> number
> >>>>>>>> other than zero.
> >>>>>>>>
> >>>>>>>> Could you please post your job config file, and your code (if
> >>>>>>>>that's
> >>>>>>>> OK)?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Chris
> >>>>>>>>
> >>>>>>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote:
> >>>>>>>>
> >>>>>>>> >I forgot to clarify this. My application is a simple pipeline of
> >>>>>>>>2
> >>>>>>>> jobs:
> >>>>>>>> >The first one reads from a file and write to a kafka topic.
> >>>>>>>> >The second reads from that kafka topic.
> >>>>>>>> >
> >>>>>>>> >The measured throughput is done in the second job (get timestamp
> >>>>>>>>when
> >>>>>>>> >receive the 1st, 1000th,... message)
> >>>>>>>> >
> >>>>>>>> >Casey
> >>>>>>>> >
> >>>>>>>> >
> >>>>>>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu
> >>>>>>>><[email protected]>
> >>>>>>>> wrote:
> >>>>>>>> >
> >>>>>>>> >> Hi guys,
> >>>>>>>> >>
> >>>>>>>> >> I'm running my application on both local and on a small cluster
> >>>>>>>>of
> >>>>>>>> 5
> >>>>>>>> >>nodes
> >>>>>>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I
> >>>>>>>> think) and
> >>>>>>>> >> the observed throughput seems very slow.
> >>>>>>>> >>
> >>>>>>>> >> Do you have any idea about an expected throughput when run with
> >>>>>>>>one
> >>>>>>>> >> 7200RPM harddrive?
> >>>>>>>> >> My estimated throughput is about 1000 messages per second. Each
> >>>>>>>> message
> >>>>>>>> >>is
> >>>>>>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer.
> >>>>>>>> >>
> >>>>>>>> >> When I try with async producer, with different batchsize, there
> >>>>>>>> can be a
> >>>>>>>> >> slight improvement.
> >>>>>>>> >>
> >>>>>>>> >> The config for the job has only the essential properties.
> >>>>>>>> >>
> >>>>>>>> >> Any suggestion? Could I misconfigure something?
> >>>>>>>> >>
> >>>>>>>> >> Casey
> >>>>>>>> >>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >
>
>

Reply via email to