Hmm, when I hack samza code and change the default value of noNewMessagesTimeout in org.apache.samza.system.SystemConsumers from 10 to 5, the throughput (as recorded in second Task) goes exactly 2 times faster.
I'm not clear how this happens, maybe you have a better idea or a better guess? Casey On Sat, Mar 29, 2014 at 12:25 PM, Anh Thu Vu <[email protected]> wrote: > Hi, > > I tried running kafka-console-consumer.sh to listen to the output of the > first task and the speed was ok (I don't have an exact measurement but it > is much faster than the result from the second task). So much guess is > something got stuck at the consumer in the second task. Is there possibly > any synchronization in the consumer? > > Casey > > > On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu <[email protected]> wrote: > >> Ok, I went home and ran these two: FirstTask and SecondTask in >> samza-test/ (see attachment - I was lazy so I just inject the code into >> hello-samza) on my MacbookAir >> >> What I got is 1000 messages per 10-11seconds (worse than the result I got >> when I ran on my office machine this afternoon, must be my lousy Mac). >> Anyway, please help me to see if there is anything wrong with my code or >> the config. >> >> Thanks a lot!!!! >> Casey >> >> PS: I tried with sync & batchsize=1, with async & batchsize=200 but >> strangely the performance did not seem to differ. >> >> >> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu <[email protected]>wrote: >> >>> Hi Chris, >>> >>> The previous message was sent by mistake. Somehow the SEND button got >>> clicked before I could finish writing. Please ignore it and see the one >>> below >>> >>> In fact, my code is "a bit" complicated as it lies in a bigger >>> "platform". What I'm planning to do now is to write 2 pure samza tasks to >>> test it. >>> >>> For now, the config is something like this: >>> Task 1 >>> job.factory.class=org.apache.samza.job.local.LocalJobFactory >>> job.name=Dummy_20140328182500-0 >>> >>> task.clas=... >>> task.inputs=mysystem.Dummy_20140328182500-0 >>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra property) >>> >>> systems.mysystem.samza.factory=MySystemFactory >>> >>> serializers.registry.kryo.class=KryoSerdeFactory >>> systems.kafka.samza.msg.serde=kryo >>> >>> >>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >>> systems.kafka.producer.batch.num.messages=1 >>> systems.kafka.producer.producer.type=sync >>> >>> systems.kafka.consumer.zookeeper.connect=localhost:2181 >>> systems.kafka.producer.metadata.broker.list=localhost:9092 >>> >>> >>> Task 2: >>> job.factory.class=org.apache.samza.job.local.LocalJobFactory >>> job.name=Dummy_20140328182500-1 >>> >>> task.class=... >>> task.inputs=kafka.Dummy_20140328182500-0-1-0 >>> >>> serializers.registry.kryo.class=KryoSerdeFactory >>> systems.kafka.samza.msg.serde=kryo >>> >>> >>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >>> systems.kafka.consumer.zookeeper.connect=localhost:2181 >>> systems.kafka.producer.batch.num.messages=1 >>> systems.kafka.producer.producer.type=sync >>> systems.kafka.producer.metadata.broker.list=localhost:9092 >>> >>> >>> For the code, a simplified one is: >>> In the custom consumer of MySystem: >>> @Override >>> public void start() { >>> Thread processorPollingThread = new Thread( >>> new Runnable() { >>> @Override >>> public void run() { >>> try { >>> pollingEntranceProcessor(); >>> setIsAtHead(systemStreamPartition, true); >>> } catch (InterruptedException e) { >>> e.getStackTrace(); >>> stop(); >>> } >>> } >>> } >>> ); >>> >>> processorPollingThread.start(); >>> } >>> >>> @Override >>> public void stop() { >>> >>> } >>> >>> private void pollingEntranceProcessor() throws >>> InterruptedException { >>> int messageCnt = 0; >>> long startTime = System.nanoTime(); >>> while(!this.entranceProcessor.isFinished()) { >>> messageCnt = >>> this.getNumMessagesInQueue(systemStreamPartition); >>> if (this.entranceProcessor.hasNext() && messageCnt < >>> 10000) { >>> this.put(systemStreamPartition, new >>> IncomingMessageEnvelope(systemStreamPartition,null, >>> null,this.entranceProcessor.nextEvent())); >>> } else { >>> try { >>> Thread.sleep(100); >>> } catch (InterruptedException e) { >>> break; >>> } >>> } >>> } >>> // Send last event >>> this.put(systemStreamPartition, new >>> IncomingMessageEnvelope(systemStreamPartition,null, >>> null,this.entranceProcessor.nextEvent())); >>> } >>> >>> In the first Task: >>> @Override >>> public void process(IncomingMessageEnvelope envelope, >>> MessageCollector collector, TaskCoordinator coordinator) throws Exception { >>> >>> this.outputStream.setCollector(collector); >>> this.outputStream.put(event); // This will call collector.send() >>> >>> } >>> >>> In the second Task: >>> public void process(IncomingMessageEnvelope envelope, MessageCollector >>> collector, TaskCoordinator coordinator) throws Exception { >>> for (SamzaStream stream:this.outputStreams) { >>> stream.setCollector(collector); >>> } >>> this.processor.process((ContentEvent) envelope.getMessage()); // >>> The content of this method is shown below >>> } >>> >>> >>> public boolean process(ContentEvent event) { >>> counter++; >>> >>> if(counter == 1){ >>> sampleStart = System.nanoTime(); >>> expStart = sampleStart; >>> logger.info("End processor starts receiving events"); >>> } >>> >>> sampleEnd = System.nanoTime(); >>> if (counter % freq == 0) { >>> long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd - >>> sampleStart, TimeUnit.NANOSECONDS); >>> logger.info("Instances index: {} - {} seconds", counter, >>> sampleDuration); >>> sampleStart = sampleEnd; >>> } >>> >>> if(event.isLastEvent()){ >>> long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd - >>> expStart, TimeUnit.NANOSECONDS); >>> logger.info("Total: {} - {} seconds", counter, >>> sampleDuration); >>> >>> } >>> } >>> >>> As I said, I will write the new samza tasks and test NOW. Will let you >>> know later. >>> >>> Thanks, >>> Casey >>> >>> >>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini < >>> [email protected]> wrote: >>> >>>> Hey Anh, >>>> >>>> For a simple read/write StreamTask that has little logic in it, you >>>> should >>>> be able to get 10,000+ messages/sec per-container with a 1kb msg payload >>>> when talking to a remote Kafka broker. >>>> >>>> >>>> At first glance, setting a batch size of 1 with a sync producer will >>>> definitely slow down your task, especially if num.acks is set to a >>>> number >>>> other than zero. >>>> >>>> Could you please post your job config file, and your code (if that's >>>> OK)? >>>> >>>> >>>> Cheers, >>>> Chris >>>> >>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote: >>>> >>>> >I forgot to clarify this. My application is a simple pipeline of 2 >>>> jobs: >>>> >The first one reads from a file and write to a kafka topic. >>>> >The second reads from that kafka topic. >>>> > >>>> >The measured throughput is done in the second job (get timestamp when >>>> >receive the 1st, 1000th,... message) >>>> > >>>> >Casey >>>> > >>>> > >>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu <[email protected]> >>>> wrote: >>>> > >>>> >> Hi guys, >>>> >> >>>> >> I'm running my application on both local and on a small cluster of 5 >>>> >>nodes >>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I think) >>>> and >>>> >> the observed throughput seems very slow. >>>> >> >>>> >> Do you have any idea about an expected throughput when run with one >>>> >> 7200RPM harddrive? >>>> >> My estimated throughput is about 1000 messages per second. Each >>>> message >>>> >>is >>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer. >>>> >> >>>> >> When I try with async producer, with different batchsize, there can >>>> be a >>>> >> slight improvement. >>>> >> >>>> >> The config for the job has only the essential properties. >>>> >> >>>> >> Any suggestion? Could I misconfigure something? >>>> >> >>>> >> Casey >>>> >> >>>> >>>> >>> >> >
