Hi Guozhang, That was introduced when I was trying to figure out odd handshakes between kafka pulls. I turn it off, and the throughput increased by about 5%
The fafka reader program I ran as baseline is from kafka repository,i.e., $KAFKA_BIN/kafka-consumer-perf-test.sh --zookeeper $ZOOKEEPER --topic throughput-test2 That program is where I get kafka consumer configs, including systems.kafka.consumer.socket.receive.buffer.bytes=2200000 systems.kafka.consumer.fetch.message.max.bytes=1100000 and some JVM options in task.opts Thanks, George From: Guozhang Wang <wangg...@gmail.com> To: dev@samza.apache.org, Date: 21/05/2015 12:31 AM Subject: Re: Samza job throughput much lower than Kafka throughput Hi George, Is there any reason you need to set the following configs? systems.kafka.consumer.fetch.wait.max.ms= 1 This setting will basically disable long pooling of the consumer which will then busy fetching data from broker, which has a large impact on network latency especially when the consumer is already caught up with the Kafka broker. Also when you say it is "slower than a program reading directly from Kafka." which consumer did your program use to read data from Kafka? Guozhang On Wed, May 20, 2015 at 5:01 PM, George Li <g...@ca.ibm.com> wrote: > Hi Yi, > > Thanks for the reply. Below is my job config and code. > > When we run this job inside our dev docker container, which has zookeeper, > broker, and yarn installed locally, its throughput is at least 50% higher > than our cluster run's. > > Thanks, > > George > > Configuration: > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > job.name=container-performance > > # YARN > yarn.container.count=1 > yarn.container.memory.mb=2548 > yarn.package.path={my package on hdfs} > yarn.container.retry.count=0 > yarn.am.container.memory.mb=2048 > yarn.am.jmx.enabled=false > > # Task > task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark > -XX:+DisableExplicitGC -Djava.awt.headless=true > > task.class=samza.TestPerformanceTask > task.inputs=kafka.throughput-test2 > task.log.interval=1000000 > task.checkpoint.factory = > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > task.checkpoint.system=kafka > task.checkpoint.replication.factor=1 > > # Kafka System (only used for coordinator stream in this test) > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.samza.fetch.threshold=50000 > > systems.kafka.consumer.zookeeper.connect= {zookeeper} > systems.kafka.producer.bootstrap.servers={broker node} > systems.kafka.consumer.auto.offset.reset=smallest > systems.kafka.consumer.socket.receive.buffer.bytes= 2200000 > systems.kafka.consumer.fetch.message.max.bytes= 1100000 > systems.kafka.consumer.fetch.min.bytes= 1 > systems.kafka.consumer.fetch.wait.max.ms= 1 > > #define coordinator system > job.coordinator.system=kafka > job.coordinator.replication.factor=1 > > systems.kafka.streams.throughput-test2.samza.reset.offset=true > systems.kafka.streams.throughput-test2.samza.offset.default=oldest > ~ > > Job's code. This is mostly a copy-paste of the one in the repository > > object TestPerformanceTask { > // No thread safety is needed for these variables because they're > mutated in > // // the process method, which is single threaded. > var messagesProcessed = 0 > var startTime = 0L > } > > class TestPerformanceTask extends StreamTask with InitableTask with > Logging { > import TestPerformanceTask._ > > /** > * * How many messages to process before a log message is printed. > * */ > var logInterval = 10000 > > /** > * * How many messages to process before shutting down. > * */ > var maxMessages = 10000000 > > > var outputSystemStream: Option[SystemStream] = None > > def init(config: Config, context: TaskContext) { > logInterval = config.getInt("task.log.interval", 10000) > maxMessages = config.getInt("task.max.messages", 10000000) > outputSystemStream = Option(config.get("task.outputs", > null)).map(Util.getSystemStreamFromNames(_)) > println("init!!") > } > > def process(envelope: IncomingMessageEnvelope, collector: > MessageCollector, coordinator: TaskCoordinator) { > if (startTime == 0) { > startTime = System.currentTimeMillis > } > > if (outputSystemStream.isDefined) { > collector.send(new OutgoingMessageEnvelope(outputSystemStream.get, > envelope.getKey, envelope.getMessage)) > } > > messagesProcessed += 1 > > if (messagesProcessed % logInterval == 0) { > val seconds = (System.currentTimeMillis - startTime) / 1000 > println("Processed %s messages in %s seconds." format > (messagesProcessed, seconds)) > } > > > if (messagesProcessed >= maxMessages) { > coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER) > } > } > } > > > > From: Yi Pan <nickpa...@gmail.com> > To: dev@samza.apache.org, > Date: 20/05/2015 05:03 PM > Subject: Re: Samza job throughput much lower than Kafka throughput > > > > Hi, George, > > Could you share w/ us the code and configuration of your sample test job? > Thanks! > > -Yi > > On Wed, May 20, 2015 at 1:19 PM, George Li <g...@ca.ibm.com> wrote: > > > Hi, > > > > We are evaluating Samza's performance, and our sample job with > > TestPerformanceTask is much slower than a program reading directly from > > Kafka. > > > > Scenario: > > > > * Cluster: > > 1 master node for Zookeeper and yarn. > > 3 Kafka broker nodes > > 3 yarn worker nodes > > > > * Kafka: > > Topic has only 1 partition. Average message size is around 100 byte. > > On a yarn worker node, run the performance test program from Kafka > > repository to read the topic. The throughput is about 400k messages/sec > > > > *Samza > > Run TestPerformanceTask from Samza repository with no output stream > > defined, and the throughput no more than 130k messages/sec > > > > > > How can I explain/fix this performance difference? > > > > What I have done so far: > > > > 1. Monitor yarn worker node resource usage. > > When the job is running, cpu and memory usage are never more than 5% > > except at the beginning of the run. No significant network and disk IO > > either > > > > 2. Monitor worker node network traffic > > Tcpdump shows an interesting pattern. The yarn worker node will fetch a > > block of data from the kafka broker, and after that, it will handshake > > with the same kafka broker once every 100 ms for 300 ms before fetching > > the next block. > > > > If I increase systems.kafka.samza.fetch.threshold to 500k, i.e., 10x the > > default settings, this handshake lasts about 3 seconds. If I set > > fetch.threshold to 250k, this idle period then becomes 1.5 sec. It seems > > kafka consumer greatly outpaced process() call > > > > 3. Check Samza metrics > > I do not see any excessive network calls to master or kafka broker, > i.e., > > no more than 30 calls/sec. However, process-ms and choose-ms are > > approaching 2ms > > > > Any input would be greatly appreciated. > > > > Thanks, > > > > George > > > > > > > > > > > > -- -- Guozhang