I think I know the reason. It is related to the MessageChooser. If I'm not
wrong, the default Kafka stream between the 2 tasks has 2 partitions and
the default MessageChooser tries to read from the 2 partitions in a
round-robin fashion. As the messages are not distributed evenly between the
2 (coz I did not provide message key, I guess) so MessageChooser keeps
waiting on the partition with fewer messages and delays the whole thing.

Does my guess sound right?
I will have to do something to verify my guessing (and fix my problem :) )

I'll update this once I can verify and fix it.

Cheers,
Casey


On Sat, Mar 29, 2014 at 2:40 PM, Anh Thu Vu <[email protected]> wrote:

> Hmm, when I hack samza code and change the default value of 
> noNewMessagesTimeout
> in org.apache.samza.system.SystemConsumers from 10 to 5, the throughput
> (as recorded in second Task) goes exactly 2 times faster.
>
> I'm not clear how this happens, maybe you have a better idea or a better
> guess?
>
> Casey
>
>
> On Sat, Mar 29, 2014 at 12:25 PM, Anh Thu Vu <[email protected]>wrote:
>
>> Hi,
>>
>> I tried running kafka-console-consumer.sh to listen to the output of the
>> first task and the speed was ok (I don't have an exact measurement but it
>> is much faster than the result from the second task). So much guess is
>> something got stuck at the consumer in the second task. Is there possibly
>> any synchronization in the consumer?
>>
>> Casey
>>
>>
>> On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu <[email protected]>wrote:
>>
>>> Ok, I went home and ran these two: FirstTask and SecondTask in
>>> samza-test/ (see attachment - I was lazy so I just inject the code into
>>> hello-samza) on my MacbookAir
>>>
>>> What I got is 1000 messages per 10-11seconds (worse than the result I
>>> got when I ran on my office machine this afternoon, must be my lousy Mac).
>>> Anyway, please help me to see if there is anything wrong with my code or
>>> the config.
>>>
>>> Thanks a lot!!!!
>>> Casey
>>>
>>> PS: I tried with sync & batchsize=1, with async & batchsize=200 but
>>> strangely the performance did not seem to differ.
>>>
>>>
>>> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu <[email protected]>wrote:
>>>
>>>> Hi Chris,
>>>>
>>>> The previous message was sent by mistake. Somehow the SEND button got
>>>> clicked before I could finish writing. Please ignore it and see the one
>>>> below
>>>>
>>>> In fact, my code is "a bit" complicated as it lies in a bigger
>>>> "platform". What I'm planning to do now is to write 2 pure samza tasks to
>>>> test it.
>>>>
>>>> For now, the config is something like this:
>>>> Task 1
>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>>> job.name=Dummy_20140328182500-0
>>>>
>>>> task.clas=...
>>>> task.inputs=mysystem.Dummy_20140328182500-0
>>>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra property)
>>>>
>>>> systems.mysystem.samza.factory=MySystemFactory
>>>>
>>>> serializers.registry.kryo.class=KryoSerdeFactory
>>>> systems.kafka.samza.msg.serde=kryo
>>>>
>>>>
>>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>>>> systems.kafka.producer.batch.num.messages=1
>>>> systems.kafka.producer.producer.type=sync
>>>>
>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>>
>>>>
>>>> Task 2:
>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>>> job.name=Dummy_20140328182500-1
>>>>
>>>> task.class=...
>>>> task.inputs=kafka.Dummy_20140328182500-0-1-0
>>>>
>>>> serializers.registry.kryo.class=KryoSerdeFactory
>>>> systems.kafka.samza.msg.serde=kryo
>>>>
>>>>
>>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>>>> systems.kafka.producer.batch.num.messages=1
>>>> systems.kafka.producer.producer.type=sync
>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>>
>>>>
>>>> For the code, a simplified one is:
>>>> In the custom consumer of MySystem:
>>>> @Override
>>>>         public void start() {
>>>>             Thread processorPollingThread = new Thread(
>>>>                     new Runnable() {
>>>>                         @Override
>>>>                         public void run() {
>>>>                             try {
>>>>                                 pollingEntranceProcessor();
>>>>                                 setIsAtHead(systemStreamPartition,
>>>> true);
>>>>                             } catch (InterruptedException e) {
>>>>                                 e.getStackTrace();
>>>>                                 stop();
>>>>                             }
>>>>                         }
>>>>                     }
>>>>             );
>>>>
>>>>             processorPollingThread.start();
>>>>         }
>>>>
>>>>         @Override
>>>>         public void stop() {
>>>>
>>>>         }
>>>>
>>>>         private void pollingEntranceProcessor() throws
>>>> InterruptedException {
>>>>             int messageCnt = 0;
>>>>             long startTime = System.nanoTime();
>>>>             while(!this.entranceProcessor.isFinished()) {
>>>>                 messageCnt =
>>>> this.getNumMessagesInQueue(systemStreamPartition);
>>>>                 if (this.entranceProcessor.hasNext() && messageCnt <
>>>> 10000) {
>>>>                     this.put(systemStreamPartition, new
>>>> IncomingMessageEnvelope(systemStreamPartition,null,
>>>> null,this.entranceProcessor.nextEvent()));
>>>>                 } else {
>>>>                     try {
>>>>                         Thread.sleep(100);
>>>>                     } catch (InterruptedException e) {
>>>>                         break;
>>>>                     }
>>>>                 }
>>>>             }
>>>>             // Send last event
>>>>             this.put(systemStreamPartition, new
>>>> IncomingMessageEnvelope(systemStreamPartition,null,
>>>> null,this.entranceProcessor.nextEvent()));
>>>>         }
>>>>
>>>>  In the first Task:
>>>> @Override
>>>>     public void process(IncomingMessageEnvelope envelope,
>>>> MessageCollector collector, TaskCoordinator coordinator) throws Exception {
>>>>
>>>>         this.outputStream.setCollector(collector);
>>>>         this.outputStream.put(event); // This will call collector.send()
>>>>
>>>>     }
>>>>
>>>> In the second Task:
>>>> public void process(IncomingMessageEnvelope envelope, MessageCollector
>>>> collector, TaskCoordinator coordinator) throws Exception {
>>>>         for (SamzaStream stream:this.outputStreams) {
>>>>             stream.setCollector(collector);
>>>>         }
>>>>         this.processor.process((ContentEvent) envelope.getMessage());
>>>> // The content of this method is shown below
>>>>     }
>>>>
>>>>
>>>> public boolean process(ContentEvent event) {
>>>>         counter++;
>>>>
>>>>         if(counter == 1){
>>>>             sampleStart = System.nanoTime();
>>>>             expStart = sampleStart;
>>>>             logger.info("End processor starts receiving events");
>>>>         }
>>>>
>>>>         sampleEnd = System.nanoTime();
>>>>         if (counter  % freq == 0) {
>>>>             long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd -
>>>> sampleStart, TimeUnit.NANOSECONDS);
>>>>             logger.info("Instances index: {} - {} seconds", counter,
>>>> sampleDuration);
>>>>             sampleStart = sampleEnd;
>>>>         }
>>>>
>>>>         if(event.isLastEvent()){
>>>>             long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd -
>>>> expStart, TimeUnit.NANOSECONDS);
>>>>             logger.info("Total: {} - {} seconds", counter,
>>>> sampleDuration);
>>>>
>>>>         }
>>>> }
>>>>
>>>> As I said, I will write the new samza tasks and test NOW. Will let you
>>>> know later.
>>>>
>>>> Thanks,
>>>> Casey
>>>>
>>>>
>>>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini <
>>>> [email protected]> wrote:
>>>>
>>>>> Hey Anh,
>>>>>
>>>>> For a simple read/write StreamTask that has little logic in it, you
>>>>> should
>>>>> be able to get 10,000+ messages/sec per-container with a 1kb msg
>>>>> payload
>>>>> when talking to a remote Kafka broker.
>>>>>
>>>>>
>>>>> At first glance, setting a batch size of 1 with a sync producer will
>>>>> definitely slow down your task, especially if num.acks is set to a
>>>>> number
>>>>> other than zero.
>>>>>
>>>>> Could you please post your job config file, and your code (if that's
>>>>> OK)?
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Chris
>>>>>
>>>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote:
>>>>>
>>>>> >I forgot to clarify this. My application is a simple pipeline of 2
>>>>> jobs:
>>>>> >The first one reads from a file and write to a kafka topic.
>>>>> >The second reads from that kafka topic.
>>>>> >
>>>>> >The measured throughput is done in the second job (get timestamp when
>>>>> >receive the 1st, 1000th,... message)
>>>>> >
>>>>> >Casey
>>>>> >
>>>>> >
>>>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu <[email protected]>
>>>>> wrote:
>>>>> >
>>>>> >> Hi guys,
>>>>> >>
>>>>> >> I'm running my application on both local and on a small cluster of 5
>>>>> >>nodes
>>>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I
>>>>> think) and
>>>>> >> the observed throughput seems very slow.
>>>>> >>
>>>>> >> Do you have any idea about an expected throughput when run with one
>>>>> >> 7200RPM harddrive?
>>>>> >> My estimated throughput is about 1000 messages per second. Each
>>>>> message
>>>>> >>is
>>>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer.
>>>>> >>
>>>>> >> When I try with async producer, with different batchsize, there can
>>>>> be a
>>>>> >> slight improvement.
>>>>> >>
>>>>> >> The config for the job has only the essential properties.
>>>>> >>
>>>>> >> Any suggestion? Could I misconfigure something?
>>>>> >>
>>>>> >> Casey
>>>>> >>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to