Hey Casey, I've opened:
https://issues.apache.org/jira/browse/SAMZA-213 Would you be up for submitting a patch? Cheers, Chris On 3/31/14 10:00 AM, "Chris Riccomini" <[email protected]> wrote: >Hey Casey, > >Yikes, nice catch. This does indeed look like a bug. > >As you say, I think we just need to move the update call outside of the >refresh.maybeCall block. > >The essence of the problem you've found is that we're updating the chooser >inside the refresh block, even though it doesn't have anything to do with >it. We should always update the chooser on every iteration, provided we >have messages in the unprocessedMessages buffer, regardless of whether we >received messages (or are backing off) on our latest poll to the >consumers. > >Does moving the the update block outside of refresh.maybeCall speed up >your processor? > >Cheers, >Chris > >On 3/29/14 4:58 PM, "Anh Thu Vu" <[email protected]> wrote: > >>Ok, I feel like I'm spamming this mailing list too much but I think I >>finally figure out what happened. >> >>For my case of 2 tasks, first task write to the kafka topic with 2 >>partitions, and the second task reading from that topic. >>Most of the messages are sent to the first partition and very few are in >>the second partition. So the problem happens in the "choose" method of >>SystemConsumers. >> >>As there is very few messages in the 2nd partition, most of the time, >>number of messages in the queue of MessageChooser <= 1. >>After SystemConsumers get back that message from MessageChooser, it calls >>to refresh.maybeCall() but this does not invoke refresh.call() (due to >>the >>backoff) and thus we did not update MessageChooser. MessageChooser queue >>now is empty and next time SystemConsumers.choose() is called, it will >>return a null envelope and cause the blocking. >> >>So the problem is the updating of MessageChooser is packed with the >>reading >>of new messages from consumers. My suggestion is simply move the update >>of >>MessageChooser queue outside refresh.call(), something like the snippet >>below: >> >>def choose: IncomingMessageEnvelope = { >> val envelopeFromChooser = chooser.choose >> >> if (envelopeFromChooser == null) { >> debug("Chooser returned null.") >> ... >> // Allow blocking if the chooser didn't choose a message. >> timeout = noNewMessagesTimeout >> } else { >> debug("Chooser returned an incoming message envelope: %s" format >>envelopeFromChooser) >> // Do something with the envelope from MessageChooser >> ... >> } >> >> refresh.maybeCall() >> // Update MessageChooser here >> envelopeFromChooser >> } >> >>Correct me if I'm wrong. :) >>Cheers, >>Casey >> >> >> >>On Sat, Mar 29, 2014 at 8:49 PM, Anh Thu Vu <[email protected]> >>wrote: >> >>> I think I know the reason. It is related to the MessageChooser. If I'm >>>not >>> wrong, the default Kafka stream between the 2 tasks has 2 partitions >>>and >>> the default MessageChooser tries to read from the 2 partitions in a >>> round-robin fashion. As the messages are not distributed evenly between >>>the >>> 2 (coz I did not provide message key, I guess) so MessageChooser keeps >>> waiting on the partition with fewer messages and delays the whole >>>thing. >>> >>> Does my guess sound right? >>> I will have to do something to verify my guessing (and fix my problem >>>:) ) >>> >>> I'll update this once I can verify and fix it. >>> >>> Cheers, >>> Casey >>> >>> >>> On Sat, Mar 29, 2014 at 2:40 PM, Anh Thu Vu <[email protected]> >>>wrote: >>> >>>> Hmm, when I hack samza code and change the default value of >>>>noNewMessagesTimeout >>>> in org.apache.samza.system.SystemConsumers from 10 to 5, the >>>>throughput >>>> (as recorded in second Task) goes exactly 2 times faster. >>>> >>>> I'm not clear how this happens, maybe you have a better idea or a >>>>better >>>> guess? >>>> >>>> Casey >>>> >>>> >>>> On Sat, Mar 29, 2014 at 12:25 PM, Anh Thu Vu >>>><[email protected]>wrote: >>>> >>>>> Hi, >>>>> >>>>> I tried running kafka-console-consumer.sh to listen to the output of >>>>>the >>>>> first task and the speed was ok (I don't have an exact measurement >>>>>but it >>>>> is much faster than the result from the second task). So much guess >>>>>is >>>>> something got stuck at the consumer in the second task. Is there >>>>>possibly >>>>> any synchronization in the consumer? >>>>> >>>>> Casey >>>>> >>>>> >>>>> On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu >>>>><[email protected]>wrote: >>>>> >>>>>> Ok, I went home and ran these two: FirstTask and SecondTask in >>>>>> samza-test/ (see attachment - I was lazy so I just inject the code >>>>>>into >>>>>> hello-samza) on my MacbookAir >>>>>> >>>>>> What I got is 1000 messages per 10-11seconds (worse than the result >>>>>>I >>>>>> got when I ran on my office machine this afternoon, must be my lousy >>>>>>Mac). >>>>>> Anyway, please help me to see if there is anything wrong with my >>>>>>code >>>>>> or the config. >>>>>> >>>>>> Thanks a lot!!!! >>>>>> Casey >>>>>> >>>>>> PS: I tried with sync & batchsize=1, with async & batchsize=200 but >>>>>> strangely the performance did not seem to differ. >>>>>> >>>>>> >>>>>> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu >>>>>><[email protected]>wrote: >>>>>> >>>>>>> Hi Chris, >>>>>>> >>>>>>> The previous message was sent by mistake. Somehow the SEND button >>>>>>>got >>>>>>> clicked before I could finish writing. Please ignore it and see the >>>>>>>one >>>>>>> below >>>>>>> >>>>>>> In fact, my code is "a bit" complicated as it lies in a bigger >>>>>>> "platform". What I'm planning to do now is to write 2 pure samza >>>>>>>tasks to >>>>>>> test it. >>>>>>> >>>>>>> For now, the config is something like this: >>>>>>> Task 1 >>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory >>>>>>> job.name=Dummy_20140328182500-0 >>>>>>> >>>>>>> task.clas=... >>>>>>> task.inputs=mysystem.Dummy_20140328182500-0 >>>>>>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra >>>>>>>property) >>>>>>> >>>>>>> systems.mysystem.samza.factory=MySystemFactory >>>>>>> >>>>>>> serializers.registry.kryo.class=KryoSerdeFactory >>>>>>> systems.kafka.samza.msg.serde=kryo >>>>>>> >>>>>>> >>>>>>> >>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSyste >>>>>>>m >>>>>>>Factory >>>>>>> systems.kafka.producer.batch.num.messages=1 >>>>>>> systems.kafka.producer.producer.type=sync >>>>>>> >>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181 >>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092 >>>>>>> >>>>>>> >>>>>>> Task 2: >>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory >>>>>>> job.name=Dummy_20140328182500-1 >>>>>>> >>>>>>> task.class=... >>>>>>> task.inputs=kafka.Dummy_20140328182500-0-1-0 >>>>>>> >>>>>>> serializers.registry.kryo.class=KryoSerdeFactory >>>>>>> systems.kafka.samza.msg.serde=kryo >>>>>>> >>>>>>> >>>>>>> >>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSyste >>>>>>>m >>>>>>>Factory >>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181 >>>>>>> systems.kafka.producer.batch.num.messages=1 >>>>>>> systems.kafka.producer.producer.type=sync >>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092 >>>>>>> >>>>>>> >>>>>>> For the code, a simplified one is: >>>>>>> In the custom consumer of MySystem: >>>>>>> @Override >>>>>>> public void start() { >>>>>>> Thread processorPollingThread = new Thread( >>>>>>> new Runnable() { >>>>>>> @Override >>>>>>> public void run() { >>>>>>> try { >>>>>>> pollingEntranceProcessor(); >>>>>>> setIsAtHead(systemStreamPartition, >>>>>>> true); >>>>>>> } catch (InterruptedException e) { >>>>>>> e.getStackTrace(); >>>>>>> stop(); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> ); >>>>>>> >>>>>>> processorPollingThread.start(); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void stop() { >>>>>>> >>>>>>> } >>>>>>> >>>>>>> private void pollingEntranceProcessor() throws >>>>>>> InterruptedException { >>>>>>> int messageCnt = 0; >>>>>>> long startTime = System.nanoTime(); >>>>>>> while(!this.entranceProcessor.isFinished()) { >>>>>>> messageCnt = >>>>>>> this.getNumMessagesInQueue(systemStreamPartition); >>>>>>> if (this.entranceProcessor.hasNext() && messageCnt >>>>>>>< >>>>>>> 10000) { >>>>>>> this.put(systemStreamPartition, new >>>>>>> IncomingMessageEnvelope(systemStreamPartition,null, >>>>>>> null,this.entranceProcessor.nextEvent())); >>>>>>> } else { >>>>>>> try { >>>>>>> Thread.sleep(100); >>>>>>> } catch (InterruptedException e) { >>>>>>> break; >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> // Send last event >>>>>>> this.put(systemStreamPartition, new >>>>>>> IncomingMessageEnvelope(systemStreamPartition,null, >>>>>>> null,this.entranceProcessor.nextEvent())); >>>>>>> } >>>>>>> >>>>>>> In the first Task: >>>>>>> @Override >>>>>>> public void process(IncomingMessageEnvelope envelope, >>>>>>> MessageCollector collector, TaskCoordinator coordinator) throws >>>>>>>Exception { >>>>>>> >>>>>>> this.outputStream.setCollector(collector); >>>>>>> this.outputStream.put(event); // This will call >>>>>>> collector.send() >>>>>>> >>>>>>> } >>>>>>> >>>>>>> In the second Task: >>>>>>> public void process(IncomingMessageEnvelope envelope, >>>>>>>MessageCollector >>>>>>> collector, TaskCoordinator coordinator) throws Exception { >>>>>>> for (SamzaStream stream:this.outputStreams) { >>>>>>> stream.setCollector(collector); >>>>>>> } >>>>>>> this.processor.process((ContentEvent) >>>>>>>envelope.getMessage()); >>>>>>> // The content of this method is shown below >>>>>>> } >>>>>>> >>>>>>> >>>>>>> public boolean process(ContentEvent event) { >>>>>>> counter++; >>>>>>> >>>>>>> if(counter == 1){ >>>>>>> sampleStart = System.nanoTime(); >>>>>>> expStart = sampleStart; >>>>>>> logger.info("End processor starts receiving events"); >>>>>>> } >>>>>>> >>>>>>> sampleEnd = System.nanoTime(); >>>>>>> if (counter % freq == 0) { >>>>>>> long sampleDuration = >>>>>>>TimeUnit.SECONDS.convert(sampleEnd - >>>>>>> sampleStart, TimeUnit.NANOSECONDS); >>>>>>> logger.info("Instances index: {} - {} seconds", >>>>>>>counter, >>>>>>> sampleDuration); >>>>>>> sampleStart = sampleEnd; >>>>>>> } >>>>>>> >>>>>>> if(event.isLastEvent()){ >>>>>>> long sampleDuration = >>>>>>>TimeUnit.SECONDS.convert(sampleEnd - >>>>>>> expStart, TimeUnit.NANOSECONDS); >>>>>>> logger.info("Total: {} - {} seconds", counter, >>>>>>> sampleDuration); >>>>>>> >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> As I said, I will write the new samza tasks and test NOW. Will let >>>>>>>you >>>>>>> know later. >>>>>>> >>>>>>> Thanks, >>>>>>> Casey >>>>>>> >>>>>>> >>>>>>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hey Anh, >>>>>>>> >>>>>>>> For a simple read/write StreamTask that has little logic in it, >>>>>>>>you >>>>>>>> should >>>>>>>> be able to get 10,000+ messages/sec per-container with a 1kb msg >>>>>>>> payload >>>>>>>> when talking to a remote Kafka broker. >>>>>>>> >>>>>>>> >>>>>>>> At first glance, setting a batch size of 1 with a sync producer >>>>>>>>will >>>>>>>> definitely slow down your task, especially if num.acks is set to a >>>>>>>> number >>>>>>>> other than zero. >>>>>>>> >>>>>>>> Could you please post your job config file, and your code (if >>>>>>>>that's >>>>>>>> OK)? >>>>>>>> >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Chris >>>>>>>> >>>>>>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote: >>>>>>>> >>>>>>>> >I forgot to clarify this. My application is a simple pipeline of >>>>>>>>2 >>>>>>>> jobs: >>>>>>>> >The first one reads from a file and write to a kafka topic. >>>>>>>> >The second reads from that kafka topic. >>>>>>>> > >>>>>>>> >The measured throughput is done in the second job (get timestamp >>>>>>>>when >>>>>>>> >receive the 1st, 1000th,... message) >>>>>>>> > >>>>>>>> >Casey >>>>>>>> > >>>>>>>> > >>>>>>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu >>>>>>>><[email protected]> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> >> Hi guys, >>>>>>>> >> >>>>>>>> >> I'm running my application on both local and on a small cluster >>>>>>>>of >>>>>>>> 5 >>>>>>>> >>nodes >>>>>>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I >>>>>>>> think) and >>>>>>>> >> the observed throughput seems very slow. >>>>>>>> >> >>>>>>>> >> Do you have any idea about an expected throughput when run with >>>>>>>>one >>>>>>>> >> 7200RPM harddrive? >>>>>>>> >> My estimated throughput is about 1000 messages per second. Each >>>>>>>> message >>>>>>>> >>is >>>>>>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer. >>>>>>>> >> >>>>>>>> >> When I try with async producer, with different batchsize, there >>>>>>>> can be a >>>>>>>> >> slight improvement. >>>>>>>> >> >>>>>>>> >> The config for the job has only the essential properties. >>>>>>>> >> >>>>>>>> >> Any suggestion? Could I misconfigure something? >>>>>>>> >> >>>>>>>> >> Casey >>>>>>>> >> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >
