I think I know the reason. It is related to the MessageChooser. If I'm not wrong, the default Kafka stream between the 2 tasks has 2 partitions and the default MessageChooser tries to read from the 2 partitions in a round-robin fashion. As the messages are not distributed evenly between the 2 (coz I did not provide message key, I guess) so MessageChooser keeps waiting on the partition with fewer messages and delays the whole thing.
Does my guess sound right? I will have to do something to verify my guessing (and fix my problem :) ) I'll update this once I can verify and fix it. Cheers, Casey On Sat, Mar 29, 2014 at 2:40 PM, Anh Thu Vu <[email protected]> wrote: > Hmm, when I hack samza code and change the default value of > noNewMessagesTimeout > in org.apache.samza.system.SystemConsumers from 10 to 5, the throughput > (as recorded in second Task) goes exactly 2 times faster. > > I'm not clear how this happens, maybe you have a better idea or a better > guess? > > Casey > > > On Sat, Mar 29, 2014 at 12:25 PM, Anh Thu Vu <[email protected]>wrote: > >> Hi, >> >> I tried running kafka-console-consumer.sh to listen to the output of the >> first task and the speed was ok (I don't have an exact measurement but it >> is much faster than the result from the second task). So much guess is >> something got stuck at the consumer in the second task. Is there possibly >> any synchronization in the consumer? >> >> Casey >> >> >> On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu <[email protected]>wrote: >> >>> Ok, I went home and ran these two: FirstTask and SecondTask in >>> samza-test/ (see attachment - I was lazy so I just inject the code into >>> hello-samza) on my MacbookAir >>> >>> What I got is 1000 messages per 10-11seconds (worse than the result I >>> got when I ran on my office machine this afternoon, must be my lousy Mac). >>> Anyway, please help me to see if there is anything wrong with my code or >>> the config. >>> >>> Thanks a lot!!!! >>> Casey >>> >>> PS: I tried with sync & batchsize=1, with async & batchsize=200 but >>> strangely the performance did not seem to differ. >>> >>> >>> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu <[email protected]>wrote: >>> >>>> Hi Chris, >>>> >>>> The previous message was sent by mistake. Somehow the SEND button got >>>> clicked before I could finish writing. Please ignore it and see the one >>>> below >>>> >>>> In fact, my code is "a bit" complicated as it lies in a bigger >>>> "platform". What I'm planning to do now is to write 2 pure samza tasks to >>>> test it. >>>> >>>> For now, the config is something like this: >>>> Task 1 >>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory >>>> job.name=Dummy_20140328182500-0 >>>> >>>> task.clas=... >>>> task.inputs=mysystem.Dummy_20140328182500-0 >>>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra property) >>>> >>>> systems.mysystem.samza.factory=MySystemFactory >>>> >>>> serializers.registry.kryo.class=KryoSerdeFactory >>>> systems.kafka.samza.msg.serde=kryo >>>> >>>> >>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >>>> systems.kafka.producer.batch.num.messages=1 >>>> systems.kafka.producer.producer.type=sync >>>> >>>> systems.kafka.consumer.zookeeper.connect=localhost:2181 >>>> systems.kafka.producer.metadata.broker.list=localhost:9092 >>>> >>>> >>>> Task 2: >>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory >>>> job.name=Dummy_20140328182500-1 >>>> >>>> task.class=... >>>> task.inputs=kafka.Dummy_20140328182500-0-1-0 >>>> >>>> serializers.registry.kryo.class=KryoSerdeFactory >>>> systems.kafka.samza.msg.serde=kryo >>>> >>>> >>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >>>> systems.kafka.consumer.zookeeper.connect=localhost:2181 >>>> systems.kafka.producer.batch.num.messages=1 >>>> systems.kafka.producer.producer.type=sync >>>> systems.kafka.producer.metadata.broker.list=localhost:9092 >>>> >>>> >>>> For the code, a simplified one is: >>>> In the custom consumer of MySystem: >>>> @Override >>>> public void start() { >>>> Thread processorPollingThread = new Thread( >>>> new Runnable() { >>>> @Override >>>> public void run() { >>>> try { >>>> pollingEntranceProcessor(); >>>> setIsAtHead(systemStreamPartition, >>>> true); >>>> } catch (InterruptedException e) { >>>> e.getStackTrace(); >>>> stop(); >>>> } >>>> } >>>> } >>>> ); >>>> >>>> processorPollingThread.start(); >>>> } >>>> >>>> @Override >>>> public void stop() { >>>> >>>> } >>>> >>>> private void pollingEntranceProcessor() throws >>>> InterruptedException { >>>> int messageCnt = 0; >>>> long startTime = System.nanoTime(); >>>> while(!this.entranceProcessor.isFinished()) { >>>> messageCnt = >>>> this.getNumMessagesInQueue(systemStreamPartition); >>>> if (this.entranceProcessor.hasNext() && messageCnt < >>>> 10000) { >>>> this.put(systemStreamPartition, new >>>> IncomingMessageEnvelope(systemStreamPartition,null, >>>> null,this.entranceProcessor.nextEvent())); >>>> } else { >>>> try { >>>> Thread.sleep(100); >>>> } catch (InterruptedException e) { >>>> break; >>>> } >>>> } >>>> } >>>> // Send last event >>>> this.put(systemStreamPartition, new >>>> IncomingMessageEnvelope(systemStreamPartition,null, >>>> null,this.entranceProcessor.nextEvent())); >>>> } >>>> >>>> In the first Task: >>>> @Override >>>> public void process(IncomingMessageEnvelope envelope, >>>> MessageCollector collector, TaskCoordinator coordinator) throws Exception { >>>> >>>> this.outputStream.setCollector(collector); >>>> this.outputStream.put(event); // This will call collector.send() >>>> >>>> } >>>> >>>> In the second Task: >>>> public void process(IncomingMessageEnvelope envelope, MessageCollector >>>> collector, TaskCoordinator coordinator) throws Exception { >>>> for (SamzaStream stream:this.outputStreams) { >>>> stream.setCollector(collector); >>>> } >>>> this.processor.process((ContentEvent) envelope.getMessage()); >>>> // The content of this method is shown below >>>> } >>>> >>>> >>>> public boolean process(ContentEvent event) { >>>> counter++; >>>> >>>> if(counter == 1){ >>>> sampleStart = System.nanoTime(); >>>> expStart = sampleStart; >>>> logger.info("End processor starts receiving events"); >>>> } >>>> >>>> sampleEnd = System.nanoTime(); >>>> if (counter % freq == 0) { >>>> long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd - >>>> sampleStart, TimeUnit.NANOSECONDS); >>>> logger.info("Instances index: {} - {} seconds", counter, >>>> sampleDuration); >>>> sampleStart = sampleEnd; >>>> } >>>> >>>> if(event.isLastEvent()){ >>>> long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd - >>>> expStart, TimeUnit.NANOSECONDS); >>>> logger.info("Total: {} - {} seconds", counter, >>>> sampleDuration); >>>> >>>> } >>>> } >>>> >>>> As I said, I will write the new samza tasks and test NOW. Will let you >>>> know later. >>>> >>>> Thanks, >>>> Casey >>>> >>>> >>>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini < >>>> [email protected]> wrote: >>>> >>>>> Hey Anh, >>>>> >>>>> For a simple read/write StreamTask that has little logic in it, you >>>>> should >>>>> be able to get 10,000+ messages/sec per-container with a 1kb msg >>>>> payload >>>>> when talking to a remote Kafka broker. >>>>> >>>>> >>>>> At first glance, setting a batch size of 1 with a sync producer will >>>>> definitely slow down your task, especially if num.acks is set to a >>>>> number >>>>> other than zero. >>>>> >>>>> Could you please post your job config file, and your code (if that's >>>>> OK)? >>>>> >>>>> >>>>> Cheers, >>>>> Chris >>>>> >>>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote: >>>>> >>>>> >I forgot to clarify this. My application is a simple pipeline of 2 >>>>> jobs: >>>>> >The first one reads from a file and write to a kafka topic. >>>>> >The second reads from that kafka topic. >>>>> > >>>>> >The measured throughput is done in the second job (get timestamp when >>>>> >receive the 1st, 1000th,... message) >>>>> > >>>>> >Casey >>>>> > >>>>> > >>>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu <[email protected]> >>>>> wrote: >>>>> > >>>>> >> Hi guys, >>>>> >> >>>>> >> I'm running my application on both local and on a small cluster of 5 >>>>> >>nodes >>>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I >>>>> think) and >>>>> >> the observed throughput seems very slow. >>>>> >> >>>>> >> Do you have any idea about an expected throughput when run with one >>>>> >> 7200RPM harddrive? >>>>> >> My estimated throughput is about 1000 messages per second. Each >>>>> message >>>>> >>is >>>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer. >>>>> >> >>>>> >> When I try with async producer, with different batchsize, there can >>>>> be a >>>>> >> slight improvement. >>>>> >> >>>>> >> The config for the job has only the essential properties. >>>>> >> >>>>> >> Any suggestion? Could I misconfigure something? >>>>> >> >>>>> >> Casey >>>>> >> >>>>> >>>>> >>>> >>> >> >
