Hi Chris, Yep, now my processors are much faster.
I'll submit the patch soon :) Casey On Mon, Mar 31, 2014 at 7:10 PM, Chris Riccomini <[email protected]>wrote: > Hey Casey, > > I've opened: > > https://issues.apache.org/jira/browse/SAMZA-213 > > > Would you be up for submitting a patch? > > Cheers, > Chris > > On 3/31/14 10:00 AM, "Chris Riccomini" <[email protected]> wrote: > > >Hey Casey, > > > >Yikes, nice catch. This does indeed look like a bug. > > > >As you say, I think we just need to move the update call outside of the > >refresh.maybeCall block. > > > >The essence of the problem you've found is that we're updating the chooser > >inside the refresh block, even though it doesn't have anything to do with > >it. We should always update the chooser on every iteration, provided we > >have messages in the unprocessedMessages buffer, regardless of whether we > >received messages (or are backing off) on our latest poll to the > >consumers. > > > >Does moving the the update block outside of refresh.maybeCall speed up > >your processor? > > > >Cheers, > >Chris > > > >On 3/29/14 4:58 PM, "Anh Thu Vu" <[email protected]> wrote: > > > >>Ok, I feel like I'm spamming this mailing list too much but I think I > >>finally figure out what happened. > >> > >>For my case of 2 tasks, first task write to the kafka topic with 2 > >>partitions, and the second task reading from that topic. > >>Most of the messages are sent to the first partition and very few are in > >>the second partition. So the problem happens in the "choose" method of > >>SystemConsumers. > >> > >>As there is very few messages in the 2nd partition, most of the time, > >>number of messages in the queue of MessageChooser <= 1. > >>After SystemConsumers get back that message from MessageChooser, it calls > >>to refresh.maybeCall() but this does not invoke refresh.call() (due to > >>the > >>backoff) and thus we did not update MessageChooser. MessageChooser queue > >>now is empty and next time SystemConsumers.choose() is called, it will > >>return a null envelope and cause the blocking. > >> > >>So the problem is the updating of MessageChooser is packed with the > >>reading > >>of new messages from consumers. My suggestion is simply move the update > >>of > >>MessageChooser queue outside refresh.call(), something like the snippet > >>below: > >> > >>def choose: IncomingMessageEnvelope = { > >> val envelopeFromChooser = chooser.choose > >> > >> if (envelopeFromChooser == null) { > >> debug("Chooser returned null.") > >> ... > >> // Allow blocking if the chooser didn't choose a message. > >> timeout = noNewMessagesTimeout > >> } else { > >> debug("Chooser returned an incoming message envelope: %s" format > >>envelopeFromChooser) > >> // Do something with the envelope from MessageChooser > >> ... > >> } > >> > >> refresh.maybeCall() > >> // Update MessageChooser here > >> envelopeFromChooser > >> } > >> > >>Correct me if I'm wrong. :) > >>Cheers, > >>Casey > >> > >> > >> > >>On Sat, Mar 29, 2014 at 8:49 PM, Anh Thu Vu <[email protected]> > >>wrote: > >> > >>> I think I know the reason. It is related to the MessageChooser. If I'm > >>>not > >>> wrong, the default Kafka stream between the 2 tasks has 2 partitions > >>>and > >>> the default MessageChooser tries to read from the 2 partitions in a > >>> round-robin fashion. As the messages are not distributed evenly between > >>>the > >>> 2 (coz I did not provide message key, I guess) so MessageChooser keeps > >>> waiting on the partition with fewer messages and delays the whole > >>>thing. > >>> > >>> Does my guess sound right? > >>> I will have to do something to verify my guessing (and fix my problem > >>>:) ) > >>> > >>> I'll update this once I can verify and fix it. > >>> > >>> Cheers, > >>> Casey > >>> > >>> > >>> On Sat, Mar 29, 2014 at 2:40 PM, Anh Thu Vu <[email protected]> > >>>wrote: > >>> > >>>> Hmm, when I hack samza code and change the default value of > >>>>noNewMessagesTimeout > >>>> in org.apache.samza.system.SystemConsumers from 10 to 5, the > >>>>throughput > >>>> (as recorded in second Task) goes exactly 2 times faster. > >>>> > >>>> I'm not clear how this happens, maybe you have a better idea or a > >>>>better > >>>> guess? > >>>> > >>>> Casey > >>>> > >>>> > >>>> On Sat, Mar 29, 2014 at 12:25 PM, Anh Thu Vu > >>>><[email protected]>wrote: > >>>> > >>>>> Hi, > >>>>> > >>>>> I tried running kafka-console-consumer.sh to listen to the output of > >>>>>the > >>>>> first task and the speed was ok (I don't have an exact measurement > >>>>>but it > >>>>> is much faster than the result from the second task). So much guess > >>>>>is > >>>>> something got stuck at the consumer in the second task. Is there > >>>>>possibly > >>>>> any synchronization in the consumer? > >>>>> > >>>>> Casey > >>>>> > >>>>> > >>>>> On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu > >>>>><[email protected]>wrote: > >>>>> > >>>>>> Ok, I went home and ran these two: FirstTask and SecondTask in > >>>>>> samza-test/ (see attachment - I was lazy so I just inject the code > >>>>>>into > >>>>>> hello-samza) on my MacbookAir > >>>>>> > >>>>>> What I got is 1000 messages per 10-11seconds (worse than the result > >>>>>>I > >>>>>> got when I ran on my office machine this afternoon, must be my lousy > >>>>>>Mac). > >>>>>> Anyway, please help me to see if there is anything wrong with my > >>>>>>code > >>>>>> or the config. > >>>>>> > >>>>>> Thanks a lot!!!! > >>>>>> Casey > >>>>>> > >>>>>> PS: I tried with sync & batchsize=1, with async & batchsize=200 but > >>>>>> strangely the performance did not seem to differ. > >>>>>> > >>>>>> > >>>>>> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu > >>>>>><[email protected]>wrote: > >>>>>> > >>>>>>> Hi Chris, > >>>>>>> > >>>>>>> The previous message was sent by mistake. Somehow the SEND button > >>>>>>>got > >>>>>>> clicked before I could finish writing. Please ignore it and see the > >>>>>>>one > >>>>>>> below > >>>>>>> > >>>>>>> In fact, my code is "a bit" complicated as it lies in a bigger > >>>>>>> "platform". What I'm planning to do now is to write 2 pure samza > >>>>>>>tasks to > >>>>>>> test it. > >>>>>>> > >>>>>>> For now, the config is something like this: > >>>>>>> Task 1 > >>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory > >>>>>>> job.name=Dummy_20140328182500-0 > >>>>>>> > >>>>>>> task.clas=... > >>>>>>> task.inputs=mysystem.Dummy_20140328182500-0 > >>>>>>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra > >>>>>>>property) > >>>>>>> > >>>>>>> systems.mysystem.samza.factory=MySystemFactory > >>>>>>> > >>>>>>> serializers.registry.kryo.class=KryoSerdeFactory > >>>>>>> systems.kafka.samza.msg.serde=kryo > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSyste > >>>>>>>m > >>>>>>>Factory > >>>>>>> systems.kafka.producer.batch.num.messages=1 > >>>>>>> systems.kafka.producer.producer.type=sync > >>>>>>> > >>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181 > >>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092 > >>>>>>> > >>>>>>> > >>>>>>> Task 2: > >>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory > >>>>>>> job.name=Dummy_20140328182500-1 > >>>>>>> > >>>>>>> task.class=... > >>>>>>> task.inputs=kafka.Dummy_20140328182500-0-1-0 > >>>>>>> > >>>>>>> serializers.registry.kryo.class=KryoSerdeFactory > >>>>>>> systems.kafka.samza.msg.serde=kryo > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSyste > >>>>>>>m > >>>>>>>Factory > >>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181 > >>>>>>> systems.kafka.producer.batch.num.messages=1 > >>>>>>> systems.kafka.producer.producer.type=sync > >>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092 > >>>>>>> > >>>>>>> > >>>>>>> For the code, a simplified one is: > >>>>>>> In the custom consumer of MySystem: > >>>>>>> @Override > >>>>>>> public void start() { > >>>>>>> Thread processorPollingThread = new Thread( > >>>>>>> new Runnable() { > >>>>>>> @Override > >>>>>>> public void run() { > >>>>>>> try { > >>>>>>> pollingEntranceProcessor(); > >>>>>>> setIsAtHead(systemStreamPartition, > >>>>>>> true); > >>>>>>> } catch (InterruptedException e) { > >>>>>>> e.getStackTrace(); > >>>>>>> stop(); > >>>>>>> } > >>>>>>> } > >>>>>>> } > >>>>>>> ); > >>>>>>> > >>>>>>> processorPollingThread.start(); > >>>>>>> } > >>>>>>> > >>>>>>> @Override > >>>>>>> public void stop() { > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> private void pollingEntranceProcessor() throws > >>>>>>> InterruptedException { > >>>>>>> int messageCnt = 0; > >>>>>>> long startTime = System.nanoTime(); > >>>>>>> while(!this.entranceProcessor.isFinished()) { > >>>>>>> messageCnt = > >>>>>>> this.getNumMessagesInQueue(systemStreamPartition); > >>>>>>> if (this.entranceProcessor.hasNext() && messageCnt > >>>>>>>< > >>>>>>> 10000) { > >>>>>>> this.put(systemStreamPartition, new > >>>>>>> IncomingMessageEnvelope(systemStreamPartition,null, > >>>>>>> null,this.entranceProcessor.nextEvent())); > >>>>>>> } else { > >>>>>>> try { > >>>>>>> Thread.sleep(100); > >>>>>>> } catch (InterruptedException e) { > >>>>>>> break; > >>>>>>> } > >>>>>>> } > >>>>>>> } > >>>>>>> // Send last event > >>>>>>> this.put(systemStreamPartition, new > >>>>>>> IncomingMessageEnvelope(systemStreamPartition,null, > >>>>>>> null,this.entranceProcessor.nextEvent())); > >>>>>>> } > >>>>>>> > >>>>>>> In the first Task: > >>>>>>> @Override > >>>>>>> public void process(IncomingMessageEnvelope envelope, > >>>>>>> MessageCollector collector, TaskCoordinator coordinator) throws > >>>>>>>Exception { > >>>>>>> > >>>>>>> this.outputStream.setCollector(collector); > >>>>>>> this.outputStream.put(event); // This will call > >>>>>>> collector.send() > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> In the second Task: > >>>>>>> public void process(IncomingMessageEnvelope envelope, > >>>>>>>MessageCollector > >>>>>>> collector, TaskCoordinator coordinator) throws Exception { > >>>>>>> for (SamzaStream stream:this.outputStreams) { > >>>>>>> stream.setCollector(collector); > >>>>>>> } > >>>>>>> this.processor.process((ContentEvent) > >>>>>>>envelope.getMessage()); > >>>>>>> // The content of this method is shown below > >>>>>>> } > >>>>>>> > >>>>>>> > >>>>>>> public boolean process(ContentEvent event) { > >>>>>>> counter++; > >>>>>>> > >>>>>>> if(counter == 1){ > >>>>>>> sampleStart = System.nanoTime(); > >>>>>>> expStart = sampleStart; > >>>>>>> logger.info("End processor starts receiving events"); > >>>>>>> } > >>>>>>> > >>>>>>> sampleEnd = System.nanoTime(); > >>>>>>> if (counter % freq == 0) { > >>>>>>> long sampleDuration = > >>>>>>>TimeUnit.SECONDS.convert(sampleEnd - > >>>>>>> sampleStart, TimeUnit.NANOSECONDS); > >>>>>>> logger.info("Instances index: {} - {} seconds", > >>>>>>>counter, > >>>>>>> sampleDuration); > >>>>>>> sampleStart = sampleEnd; > >>>>>>> } > >>>>>>> > >>>>>>> if(event.isLastEvent()){ > >>>>>>> long sampleDuration = > >>>>>>>TimeUnit.SECONDS.convert(sampleEnd - > >>>>>>> expStart, TimeUnit.NANOSECONDS); > >>>>>>> logger.info("Total: {} - {} seconds", counter, > >>>>>>> sampleDuration); > >>>>>>> > >>>>>>> } > >>>>>>> } > >>>>>>> > >>>>>>> As I said, I will write the new samza tasks and test NOW. Will let > >>>>>>>you > >>>>>>> know later. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Casey > >>>>>>> > >>>>>>> > >>>>>>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini < > >>>>>>> [email protected]> wrote: > >>>>>>> > >>>>>>>> Hey Anh, > >>>>>>>> > >>>>>>>> For a simple read/write StreamTask that has little logic in it, > >>>>>>>>you > >>>>>>>> should > >>>>>>>> be able to get 10,000+ messages/sec per-container with a 1kb msg > >>>>>>>> payload > >>>>>>>> when talking to a remote Kafka broker. > >>>>>>>> > >>>>>>>> > >>>>>>>> At first glance, setting a batch size of 1 with a sync producer > >>>>>>>>will > >>>>>>>> definitely slow down your task, especially if num.acks is set to a > >>>>>>>> number > >>>>>>>> other than zero. > >>>>>>>> > >>>>>>>> Could you please post your job config file, and your code (if > >>>>>>>>that's > >>>>>>>> OK)? > >>>>>>>> > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Chris > >>>>>>>> > >>>>>>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote: > >>>>>>>> > >>>>>>>> >I forgot to clarify this. My application is a simple pipeline of > >>>>>>>>2 > >>>>>>>> jobs: > >>>>>>>> >The first one reads from a file and write to a kafka topic. > >>>>>>>> >The second reads from that kafka topic. > >>>>>>>> > > >>>>>>>> >The measured throughput is done in the second job (get timestamp > >>>>>>>>when > >>>>>>>> >receive the 1st, 1000th,... message) > >>>>>>>> > > >>>>>>>> >Casey > >>>>>>>> > > >>>>>>>> > > >>>>>>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu > >>>>>>>><[email protected]> > >>>>>>>> wrote: > >>>>>>>> > > >>>>>>>> >> Hi guys, > >>>>>>>> >> > >>>>>>>> >> I'm running my application on both local and on a small cluster > >>>>>>>>of > >>>>>>>> 5 > >>>>>>>> >>nodes > >>>>>>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I > >>>>>>>> think) and > >>>>>>>> >> the observed throughput seems very slow. > >>>>>>>> >> > >>>>>>>> >> Do you have any idea about an expected throughput when run with > >>>>>>>>one > >>>>>>>> >> 7200RPM harddrive? > >>>>>>>> >> My estimated throughput is about 1000 messages per second. Each > >>>>>>>> message > >>>>>>>> >>is > >>>>>>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer. > >>>>>>>> >> > >>>>>>>> >> When I try with async producer, with different batchsize, there > >>>>>>>> can be a > >>>>>>>> >> slight improvement. > >>>>>>>> >> > >>>>>>>> >> The config for the job has only the essential properties. > >>>>>>>> >> > >>>>>>>> >> Any suggestion? Could I misconfigure something? > >>>>>>>> >> > >>>>>>>> >> Casey > >>>>>>>> >> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > > > >
