Hey Casey,

I've opened:

  https://issues.apache.org/jira/browse/SAMZA-213


Would you be up for submitting a patch?

Cheers,
Chris

On 3/31/14 10:00 AM, "Chris Riccomini" <[email protected]> wrote:

>Hey Casey,
>
>Yikes, nice catch. This does indeed look like a bug.
>
>As you say, I think we just need to move the update call outside of the
>refresh.maybeCall block.
>
>The essence of the problem you've found is that we're updating the chooser
>inside the refresh block, even though it doesn't have anything to do with
>it. We should always update the chooser on every iteration, provided we
>have messages in the unprocessedMessages buffer, regardless of whether we
>received messages (or are backing off) on our latest poll to the
>consumers.
>
>Does moving the the update block outside of refresh.maybeCall speed up
>your processor?
>
>Cheers,
>Chris
>
>On 3/29/14 4:58 PM, "Anh Thu Vu" <[email protected]> wrote:
>
>>Ok, I feel like I'm spamming this mailing list too much but I think I
>>finally figure out what happened.
>>
>>For my case of 2 tasks, first task write to the kafka topic with 2
>>partitions, and the second task reading from that topic.
>>Most of the messages are sent to the first partition and very few are in
>>the second partition. So the problem happens in the "choose" method of
>>SystemConsumers.
>>
>>As there is very few messages in the 2nd partition, most of the time,
>>number of messages in the queue of MessageChooser <= 1.
>>After SystemConsumers get back that message from MessageChooser, it calls
>>to refresh.maybeCall() but this does not invoke refresh.call() (due to
>>the
>>backoff) and thus we did not update MessageChooser. MessageChooser queue
>>now is empty and next time SystemConsumers.choose() is called, it will
>>return a null envelope and cause the blocking.
>>
>>So the problem is the updating of MessageChooser is packed with the
>>reading
>>of new messages from consumers. My suggestion is simply move the update
>>of
>>MessageChooser queue outside refresh.call(), something like the snippet
>>below:
>>
>>def choose: IncomingMessageEnvelope = {
>>    val envelopeFromChooser = chooser.choose
>>
>>    if (envelopeFromChooser == null) {
>>      debug("Chooser returned null.")
>>      ...
>>      // Allow blocking if the chooser didn't choose a message.
>>      timeout = noNewMessagesTimeout
>>    } else {
>>      debug("Chooser returned an incoming message envelope: %s" format
>>envelopeFromChooser)
>>      // Do something with the envelope from MessageChooser
>>      ...
>>    }
>>
>>    refresh.maybeCall()
>>    // Update MessageChooser here
>>    envelopeFromChooser
>>  }
>>
>>Correct me if I'm wrong. :)
>>Cheers,
>>Casey
>>
>>
>>
>>On Sat, Mar 29, 2014 at 8:49 PM, Anh Thu Vu <[email protected]>
>>wrote:
>>
>>> I think I know the reason. It is related to the MessageChooser. If I'm
>>>not
>>> wrong, the default Kafka stream between the 2 tasks has 2 partitions
>>>and
>>> the default MessageChooser tries to read from the 2 partitions in a
>>> round-robin fashion. As the messages are not distributed evenly between
>>>the
>>> 2 (coz I did not provide message key, I guess) so MessageChooser keeps
>>> waiting on the partition with fewer messages and delays the whole
>>>thing.
>>>
>>> Does my guess sound right?
>>> I will have to do something to verify my guessing (and fix my problem
>>>:) )
>>>
>>> I'll update this once I can verify and fix it.
>>>
>>> Cheers,
>>> Casey
>>>
>>>
>>> On Sat, Mar 29, 2014 at 2:40 PM, Anh Thu Vu <[email protected]>
>>>wrote:
>>>
>>>> Hmm, when I hack samza code and change the default value of
>>>>noNewMessagesTimeout
>>>> in org.apache.samza.system.SystemConsumers from 10 to 5, the
>>>>throughput
>>>> (as recorded in second Task) goes exactly 2 times faster.
>>>>
>>>> I'm not clear how this happens, maybe you have a better idea or a
>>>>better
>>>> guess?
>>>>
>>>> Casey
>>>>
>>>>
>>>> On Sat, Mar 29, 2014 at 12:25 PM, Anh Thu Vu
>>>><[email protected]>wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I tried running kafka-console-consumer.sh to listen to the output of
>>>>>the
>>>>> first task and the speed was ok (I don't have an exact measurement
>>>>>but it
>>>>> is much faster than the result from the second task). So much guess
>>>>>is
>>>>> something got stuck at the consumer in the second task. Is there
>>>>>possibly
>>>>> any synchronization in the consumer?
>>>>>
>>>>> Casey
>>>>>
>>>>>
>>>>> On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu
>>>>><[email protected]>wrote:
>>>>>
>>>>>> Ok, I went home and ran these two: FirstTask and SecondTask in
>>>>>> samza-test/ (see attachment - I was lazy so I just inject the code
>>>>>>into
>>>>>> hello-samza) on my MacbookAir
>>>>>>
>>>>>> What I got is 1000 messages per 10-11seconds (worse than the result
>>>>>>I
>>>>>> got when I ran on my office machine this afternoon, must be my lousy
>>>>>>Mac).
>>>>>> Anyway, please help me to see if there is anything wrong with my
>>>>>>code
>>>>>> or the config.
>>>>>>
>>>>>> Thanks a lot!!!!
>>>>>> Casey
>>>>>>
>>>>>> PS: I tried with sync & batchsize=1, with async & batchsize=200 but
>>>>>> strangely the performance did not seem to differ.
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu
>>>>>><[email protected]>wrote:
>>>>>>
>>>>>>> Hi Chris,
>>>>>>>
>>>>>>> The previous message was sent by mistake. Somehow the SEND button
>>>>>>>got
>>>>>>> clicked before I could finish writing. Please ignore it and see the
>>>>>>>one
>>>>>>> below
>>>>>>>
>>>>>>> In fact, my code is "a bit" complicated as it lies in a bigger
>>>>>>> "platform". What I'm planning to do now is to write 2 pure samza
>>>>>>>tasks to
>>>>>>> test it.
>>>>>>>
>>>>>>> For now, the config is something like this:
>>>>>>> Task 1
>>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>>>>>> job.name=Dummy_20140328182500-0
>>>>>>>
>>>>>>> task.clas=...
>>>>>>> task.inputs=mysystem.Dummy_20140328182500-0
>>>>>>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra
>>>>>>>property)
>>>>>>>
>>>>>>> systems.mysystem.samza.factory=MySystemFactory
>>>>>>>
>>>>>>> serializers.registry.kryo.class=KryoSerdeFactory
>>>>>>> systems.kafka.samza.msg.serde=kryo
>>>>>>>
>>>>>>>
>>>>>>> 
>>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSyste
>>>>>>>m
>>>>>>>Factory
>>>>>>> systems.kafka.producer.batch.num.messages=1
>>>>>>> systems.kafka.producer.producer.type=sync
>>>>>>>
>>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>>>>>
>>>>>>>
>>>>>>> Task 2:
>>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>>>>>> job.name=Dummy_20140328182500-1
>>>>>>>
>>>>>>> task.class=...
>>>>>>> task.inputs=kafka.Dummy_20140328182500-0-1-0
>>>>>>>
>>>>>>> serializers.registry.kryo.class=KryoSerdeFactory
>>>>>>> systems.kafka.samza.msg.serde=kryo
>>>>>>>
>>>>>>>
>>>>>>> 
>>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSyste
>>>>>>>m
>>>>>>>Factory
>>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>>>>>>> systems.kafka.producer.batch.num.messages=1
>>>>>>> systems.kafka.producer.producer.type=sync
>>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>>>>>
>>>>>>>
>>>>>>> For the code, a simplified one is:
>>>>>>> In the custom consumer of MySystem:
>>>>>>> @Override
>>>>>>>         public void start() {
>>>>>>>             Thread processorPollingThread = new Thread(
>>>>>>>                     new Runnable() {
>>>>>>>                         @Override
>>>>>>>                         public void run() {
>>>>>>>                             try {
>>>>>>>                                 pollingEntranceProcessor();
>>>>>>>                                 setIsAtHead(systemStreamPartition,
>>>>>>> true);
>>>>>>>                             } catch (InterruptedException e) {
>>>>>>>                                 e.getStackTrace();
>>>>>>>                                 stop();
>>>>>>>                             }
>>>>>>>                         }
>>>>>>>                     }
>>>>>>>             );
>>>>>>>
>>>>>>>             processorPollingThread.start();
>>>>>>>         }
>>>>>>>
>>>>>>>         @Override
>>>>>>>         public void stop() {
>>>>>>>
>>>>>>>         }
>>>>>>>
>>>>>>>         private void pollingEntranceProcessor() throws
>>>>>>> InterruptedException {
>>>>>>>             int messageCnt = 0;
>>>>>>>             long startTime = System.nanoTime();
>>>>>>>             while(!this.entranceProcessor.isFinished()) {
>>>>>>>                 messageCnt =
>>>>>>> this.getNumMessagesInQueue(systemStreamPartition);
>>>>>>>                 if (this.entranceProcessor.hasNext() && messageCnt
>>>>>>><
>>>>>>> 10000) {
>>>>>>>                     this.put(systemStreamPartition, new
>>>>>>> IncomingMessageEnvelope(systemStreamPartition,null,
>>>>>>> null,this.entranceProcessor.nextEvent()));
>>>>>>>                 } else {
>>>>>>>                     try {
>>>>>>>                         Thread.sleep(100);
>>>>>>>                     } catch (InterruptedException e) {
>>>>>>>                         break;
>>>>>>>                     }
>>>>>>>                 }
>>>>>>>             }
>>>>>>>             // Send last event
>>>>>>>             this.put(systemStreamPartition, new
>>>>>>> IncomingMessageEnvelope(systemStreamPartition,null,
>>>>>>> null,this.entranceProcessor.nextEvent()));
>>>>>>>         }
>>>>>>>
>>>>>>>  In the first Task:
>>>>>>> @Override
>>>>>>>     public void process(IncomingMessageEnvelope envelope,
>>>>>>> MessageCollector collector, TaskCoordinator coordinator) throws
>>>>>>>Exception {
>>>>>>>
>>>>>>>         this.outputStream.setCollector(collector);
>>>>>>>         this.outputStream.put(event); // This will call
>>>>>>> collector.send()
>>>>>>>
>>>>>>>     }
>>>>>>>
>>>>>>> In the second Task:
>>>>>>> public void process(IncomingMessageEnvelope envelope,
>>>>>>>MessageCollector
>>>>>>> collector, TaskCoordinator coordinator) throws Exception {
>>>>>>>         for (SamzaStream stream:this.outputStreams) {
>>>>>>>             stream.setCollector(collector);
>>>>>>>         }
>>>>>>>         this.processor.process((ContentEvent)
>>>>>>>envelope.getMessage());
>>>>>>> // The content of this method is shown below
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>> public boolean process(ContentEvent event) {
>>>>>>>         counter++;
>>>>>>>
>>>>>>>         if(counter == 1){
>>>>>>>             sampleStart = System.nanoTime();
>>>>>>>             expStart = sampleStart;
>>>>>>>             logger.info("End processor starts receiving events");
>>>>>>>         }
>>>>>>>
>>>>>>>         sampleEnd = System.nanoTime();
>>>>>>>         if (counter  % freq == 0) {
>>>>>>>             long sampleDuration =
>>>>>>>TimeUnit.SECONDS.convert(sampleEnd -
>>>>>>> sampleStart, TimeUnit.NANOSECONDS);
>>>>>>>             logger.info("Instances index: {} - {} seconds",
>>>>>>>counter,
>>>>>>> sampleDuration);
>>>>>>>             sampleStart = sampleEnd;
>>>>>>>         }
>>>>>>>
>>>>>>>         if(event.isLastEvent()){
>>>>>>>             long sampleDuration =
>>>>>>>TimeUnit.SECONDS.convert(sampleEnd -
>>>>>>> expStart, TimeUnit.NANOSECONDS);
>>>>>>>             logger.info("Total: {} - {} seconds", counter,
>>>>>>> sampleDuration);
>>>>>>>
>>>>>>>         }
>>>>>>> }
>>>>>>>
>>>>>>> As I said, I will write the new samza tasks and test NOW. Will let
>>>>>>>you
>>>>>>> know later.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Casey
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hey Anh,
>>>>>>>>
>>>>>>>> For a simple read/write StreamTask that has little logic in it,
>>>>>>>>you
>>>>>>>> should
>>>>>>>> be able to get 10,000+ messages/sec per-container with a 1kb msg
>>>>>>>> payload
>>>>>>>> when talking to a remote Kafka broker.
>>>>>>>>
>>>>>>>>
>>>>>>>> At first glance, setting a batch size of 1 with a sync producer
>>>>>>>>will
>>>>>>>> definitely slow down your task, especially if num.acks is set to a
>>>>>>>> number
>>>>>>>> other than zero.
>>>>>>>>
>>>>>>>> Could you please post your job config file, and your code (if
>>>>>>>>that's
>>>>>>>> OK)?
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote:
>>>>>>>>
>>>>>>>> >I forgot to clarify this. My application is a simple pipeline of
>>>>>>>>2
>>>>>>>> jobs:
>>>>>>>> >The first one reads from a file and write to a kafka topic.
>>>>>>>> >The second reads from that kafka topic.
>>>>>>>> >
>>>>>>>> >The measured throughput is done in the second job (get timestamp
>>>>>>>>when
>>>>>>>> >receive the 1st, 1000th,... message)
>>>>>>>> >
>>>>>>>> >Casey
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu
>>>>>>>><[email protected]>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> >> Hi guys,
>>>>>>>> >>
>>>>>>>> >> I'm running my application on both local and on a small cluster
>>>>>>>>of
>>>>>>>> 5
>>>>>>>> >>nodes
>>>>>>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I
>>>>>>>> think) and
>>>>>>>> >> the observed throughput seems very slow.
>>>>>>>> >>
>>>>>>>> >> Do you have any idea about an expected throughput when run with
>>>>>>>>one
>>>>>>>> >> 7200RPM harddrive?
>>>>>>>> >> My estimated throughput is about 1000 messages per second. Each
>>>>>>>> message
>>>>>>>> >>is
>>>>>>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer.
>>>>>>>> >>
>>>>>>>> >> When I try with async producer, with different batchsize, there
>>>>>>>> can be a
>>>>>>>> >> slight improvement.
>>>>>>>> >>
>>>>>>>> >> The config for the job has only the essential properties.
>>>>>>>> >>
>>>>>>>> >> Any suggestion? Could I misconfigure something?
>>>>>>>> >>
>>>>>>>> >> Casey
>>>>>>>> >>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to