Hmm, when I hack samza code and change the default value of
noNewMessagesTimeout
in org.apache.samza.system.SystemConsumers from 10 to 5, the throughput (as
recorded in second Task) goes exactly 2 times faster.

I'm not clear how this happens, maybe you have a better idea or a better
guess?

Casey


On Sat, Mar 29, 2014 at 12:25 PM, Anh Thu Vu <[email protected]> wrote:

> Hi,
>
> I tried running kafka-console-consumer.sh to listen to the output of the
> first task and the speed was ok (I don't have an exact measurement but it
> is much faster than the result from the second task). So much guess is
> something got stuck at the consumer in the second task. Is there possibly
> any synchronization in the consumer?
>
> Casey
>
>
> On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu <[email protected]> wrote:
>
>> Ok, I went home and ran these two: FirstTask and SecondTask in
>> samza-test/ (see attachment - I was lazy so I just inject the code into
>> hello-samza) on my MacbookAir
>>
>> What I got is 1000 messages per 10-11seconds (worse than the result I got
>> when I ran on my office machine this afternoon, must be my lousy Mac).
>> Anyway, please help me to see if there is anything wrong with my code or
>> the config.
>>
>> Thanks a lot!!!!
>> Casey
>>
>> PS: I tried with sync & batchsize=1, with async & batchsize=200 but
>> strangely the performance did not seem to differ.
>>
>>
>> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu <[email protected]>wrote:
>>
>>> Hi Chris,
>>>
>>> The previous message was sent by mistake. Somehow the SEND button got
>>> clicked before I could finish writing. Please ignore it and see the one
>>> below
>>>
>>> In fact, my code is "a bit" complicated as it lies in a bigger
>>> "platform". What I'm planning to do now is to write 2 pure samza tasks to
>>> test it.
>>>
>>> For now, the config is something like this:
>>> Task 1
>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>> job.name=Dummy_20140328182500-0
>>>
>>> task.clas=...
>>> task.inputs=mysystem.Dummy_20140328182500-0
>>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra property)
>>>
>>> systems.mysystem.samza.factory=MySystemFactory
>>>
>>> serializers.registry.kryo.class=KryoSerdeFactory
>>> systems.kafka.samza.msg.serde=kryo
>>>
>>>
>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>>> systems.kafka.producer.batch.num.messages=1
>>> systems.kafka.producer.producer.type=sync
>>>
>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>
>>>
>>> Task 2:
>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>> job.name=Dummy_20140328182500-1
>>>
>>> task.class=...
>>> task.inputs=kafka.Dummy_20140328182500-0-1-0
>>>
>>> serializers.registry.kryo.class=KryoSerdeFactory
>>> systems.kafka.samza.msg.serde=kryo
>>>
>>>
>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>>> systems.kafka.producer.batch.num.messages=1
>>> systems.kafka.producer.producer.type=sync
>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>
>>>
>>> For the code, a simplified one is:
>>> In the custom consumer of MySystem:
>>> @Override
>>>         public void start() {
>>>             Thread processorPollingThread = new Thread(
>>>                     new Runnable() {
>>>                         @Override
>>>                         public void run() {
>>>                             try {
>>>                                 pollingEntranceProcessor();
>>>                                 setIsAtHead(systemStreamPartition, true);
>>>                             } catch (InterruptedException e) {
>>>                                 e.getStackTrace();
>>>                                 stop();
>>>                             }
>>>                         }
>>>                     }
>>>             );
>>>
>>>             processorPollingThread.start();
>>>         }
>>>
>>>         @Override
>>>         public void stop() {
>>>
>>>         }
>>>
>>>         private void pollingEntranceProcessor() throws
>>> InterruptedException {
>>>             int messageCnt = 0;
>>>             long startTime = System.nanoTime();
>>>             while(!this.entranceProcessor.isFinished()) {
>>>                 messageCnt =
>>> this.getNumMessagesInQueue(systemStreamPartition);
>>>                 if (this.entranceProcessor.hasNext() && messageCnt <
>>> 10000) {
>>>                     this.put(systemStreamPartition, new
>>> IncomingMessageEnvelope(systemStreamPartition,null,
>>> null,this.entranceProcessor.nextEvent()));
>>>                 } else {
>>>                     try {
>>>                         Thread.sleep(100);
>>>                     } catch (InterruptedException e) {
>>>                         break;
>>>                     }
>>>                 }
>>>             }
>>>             // Send last event
>>>             this.put(systemStreamPartition, new
>>> IncomingMessageEnvelope(systemStreamPartition,null,
>>> null,this.entranceProcessor.nextEvent()));
>>>         }
>>>
>>>  In the first Task:
>>> @Override
>>>     public void process(IncomingMessageEnvelope envelope,
>>> MessageCollector collector, TaskCoordinator coordinator) throws Exception {
>>>
>>>         this.outputStream.setCollector(collector);
>>>         this.outputStream.put(event); // This will call collector.send()
>>>
>>>     }
>>>
>>> In the second Task:
>>> public void process(IncomingMessageEnvelope envelope, MessageCollector
>>> collector, TaskCoordinator coordinator) throws Exception {
>>>         for (SamzaStream stream:this.outputStreams) {
>>>             stream.setCollector(collector);
>>>         }
>>>         this.processor.process((ContentEvent) envelope.getMessage()); //
>>> The content of this method is shown below
>>>     }
>>>
>>>
>>> public boolean process(ContentEvent event) {
>>>         counter++;
>>>
>>>         if(counter == 1){
>>>             sampleStart = System.nanoTime();
>>>             expStart = sampleStart;
>>>             logger.info("End processor starts receiving events");
>>>         }
>>>
>>>         sampleEnd = System.nanoTime();
>>>         if (counter  % freq == 0) {
>>>             long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd -
>>> sampleStart, TimeUnit.NANOSECONDS);
>>>             logger.info("Instances index: {} - {} seconds", counter,
>>> sampleDuration);
>>>             sampleStart = sampleEnd;
>>>         }
>>>
>>>         if(event.isLastEvent()){
>>>             long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd -
>>> expStart, TimeUnit.NANOSECONDS);
>>>             logger.info("Total: {} - {} seconds", counter,
>>> sampleDuration);
>>>
>>>         }
>>> }
>>>
>>> As I said, I will write the new samza tasks and test NOW. Will let you
>>> know later.
>>>
>>> Thanks,
>>> Casey
>>>
>>>
>>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini <
>>> [email protected]> wrote:
>>>
>>>> Hey Anh,
>>>>
>>>> For a simple read/write StreamTask that has little logic in it, you
>>>> should
>>>> be able to get 10,000+ messages/sec per-container with a 1kb msg payload
>>>> when talking to a remote Kafka broker.
>>>>
>>>>
>>>> At first glance, setting a batch size of 1 with a sync producer will
>>>> definitely slow down your task, especially if num.acks is set to a
>>>> number
>>>> other than zero.
>>>>
>>>> Could you please post your job config file, and your code (if that's
>>>> OK)?
>>>>
>>>>
>>>> Cheers,
>>>> Chris
>>>>
>>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote:
>>>>
>>>> >I forgot to clarify this. My application is a simple pipeline of 2
>>>> jobs:
>>>> >The first one reads from a file and write to a kafka topic.
>>>> >The second reads from that kafka topic.
>>>> >
>>>> >The measured throughput is done in the second job (get timestamp when
>>>> >receive the 1st, 1000th,... message)
>>>> >
>>>> >Casey
>>>> >
>>>> >
>>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu <[email protected]>
>>>> wrote:
>>>> >
>>>> >> Hi guys,
>>>> >>
>>>> >> I'm running my application on both local and on a small cluster of 5
>>>> >>nodes
>>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I think)
>>>> and
>>>> >> the observed throughput seems very slow.
>>>> >>
>>>> >> Do you have any idea about an expected throughput when run with one
>>>> >> 7200RPM harddrive?
>>>> >> My estimated throughput is about 1000 messages per second. Each
>>>> message
>>>> >>is
>>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer.
>>>> >>
>>>> >> When I try with async producer, with different batchsize, there can
>>>> be a
>>>> >> slight improvement.
>>>> >>
>>>> >> The config for the job has only the essential properties.
>>>> >>
>>>> >> Any suggestion? Could I misconfigure something?
>>>> >>
>>>> >> Casey
>>>> >>
>>>>
>>>>
>>>
>>
>

Reply via email to