Oops.  Sent too soon.  I mean:

producer.batch.size=262144
producer.linger.ms=5
producer.compression.type=lz4


On Thu, May 21, 2015 at 9:00 AM, Roger Hoover <roger.hoo...@gmail.com>
wrote:

> Hi George,
>
> You might also try tweaking the producer settings.
>
>         producer.batch.size=262144
>         producer.linger.ms=5
> producer.compression.type: lz4
>
> On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hi George,
>>
>> Is there any reason you need to set the following configs?
>>
>> systems.kafka.consumer.fetch.wait.max.ms= 1
>>
>> This setting will basically disable long pooling of the consumer which
>> will
>> then busy fetching data from broker, which has a large impact on network
>> latency especially when the consumer is already caught up with the Kafka
>> broker.
>>
>> Also when you say it is "slower than a program reading directly from
>> Kafka." which consumer did your program use to read data from Kafka?
>>
>> Guozhang
>>
>>
>> On Wed, May 20, 2015 at 5:01 PM, George Li <g...@ca.ibm.com> wrote:
>>
>> > Hi Yi,
>> >
>> > Thanks for the reply. Below is my job config and code.
>> >
>> > When we run this job inside our dev docker container, which has
>> zookeeper,
>> > broker, and yarn installed locally,  its throughput is at least 50%
>> higher
>> > than our cluster run's.
>> >
>> > Thanks,
>> >
>> > George
>> >
>> > Configuration:
>> >
>> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> > job.name=container-performance
>> >
>> > # YARN
>> > yarn.container.count=1
>> > yarn.container.memory.mb=2548
>> > yarn.package.path={my package on hdfs}
>> > yarn.container.retry.count=0
>> > yarn.am.container.memory.mb=2048
>> > yarn.am.jmx.enabled=false
>> >
>> > # Task
>> > task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
>> > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
>> > -XX:+DisableExplicitGC -Djava.awt.headless=true
>> >
>> > task.class=samza.TestPerformanceTask
>> > task.inputs=kafka.throughput-test2
>> > task.log.interval=1000000
>> > task.checkpoint.factory =
>> > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>> > task.checkpoint.system=kafka
>> > task.checkpoint.replication.factor=1
>> >
>> > # Kafka System (only used for coordinator stream in this test)
>> >
>> >
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> > systems.kafka.samza.fetch.threshold=50000
>> >
>> > systems.kafka.consumer.zookeeper.connect= {zookeeper}
>> > systems.kafka.producer.bootstrap.servers={broker node}
>> > systems.kafka.consumer.auto.offset.reset=smallest
>> > systems.kafka.consumer.socket.receive.buffer.bytes= 2200000
>> > systems.kafka.consumer.fetch.message.max.bytes= 1100000
>> > systems.kafka.consumer.fetch.min.bytes= 1
>> > systems.kafka.consumer.fetch.wait.max.ms= 1
>> >
>> > #define coordinator system
>> > job.coordinator.system=kafka
>> > job.coordinator.replication.factor=1
>> >
>> > systems.kafka.streams.throughput-test2.samza.reset.offset=true
>> > systems.kafka.streams.throughput-test2.samza.offset.default=oldest
>> > ~
>> >
>> > Job's code. This is mostly a copy-paste of the one in the repository
>> >
>> > object TestPerformanceTask {
>> >   // No thread safety is needed for these variables because they're
>> > mutated in
>> >   //   // the process method, which is single threaded.
>> >   var messagesProcessed = 0
>> >   var startTime = 0L
>> > }
>> >
>> > class TestPerformanceTask extends StreamTask with InitableTask with
>> > Logging {
>> >   import TestPerformanceTask._
>> >
>> >   /**
>> >    *    * How many messages to process before a log message is printed.
>> >    *       */
>> >   var logInterval = 10000
>> >
>> >   /**
>> >    *    * How many messages to process before shutting down.
>> >    *       */
>> >   var maxMessages = 10000000
>> >
>> >
>> >   var outputSystemStream: Option[SystemStream] = None
>> >
>> >   def init(config: Config, context: TaskContext) {
>> >     logInterval = config.getInt("task.log.interval", 10000)
>> >     maxMessages = config.getInt("task.max.messages", 10000000)
>> >     outputSystemStream = Option(config.get("task.outputs",
>> > null)).map(Util.getSystemStreamFromNames(_))
>> >     println("init!!")
>> >   }
>> >
>> >   def process(envelope: IncomingMessageEnvelope, collector:
>> > MessageCollector, coordinator: TaskCoordinator) {
>> >     if (startTime == 0) {
>> >       startTime = System.currentTimeMillis
>> >     }
>> >
>> >     if (outputSystemStream.isDefined) {
>> >       collector.send(new OutgoingMessageEnvelope(outputSystemStream.get,
>> > envelope.getKey, envelope.getMessage))
>> >     }
>> >
>> >     messagesProcessed += 1
>> >
>> >     if (messagesProcessed % logInterval == 0) {
>> >       val seconds = (System.currentTimeMillis - startTime) / 1000
>> >       println("Processed %s messages in %s seconds." format
>> > (messagesProcessed, seconds))
>> >     }
>> >
>> >
>> >     if (messagesProcessed >= maxMessages) {
>> >       coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
>> >     }
>> >   }
>> > }
>> >
>> >
>> >
>> > From:   Yi Pan <nickpa...@gmail.com>
>> > To:     dev@samza.apache.org,
>> > Date:   20/05/2015 05:03 PM
>> > Subject:        Re: Samza job throughput much lower than Kafka
>> throughput
>> >
>> >
>> >
>> > Hi, George,
>> >
>> > Could you share w/ us the code and configuration of your sample test
>> job?
>> > Thanks!
>> >
>> > -Yi
>> >
>> > On Wed, May 20, 2015 at 1:19 PM, George Li <g...@ca.ibm.com> wrote:
>> >
>> > > Hi,
>> > >
>> > > We are evaluating Samza's performance, and our sample job with
>> > > TestPerformanceTask is much slower than a program reading directly
>> from
>> > > Kafka.
>> > >
>> > > Scenario:
>> > >
>> > > * Cluster:
>> > > 1 master node for Zookeeper and yarn.
>> > > 3 Kafka broker nodes
>> > > 3 yarn worker nodes
>> > >
>> > > * Kafka:
>> > > Topic has only 1 partition. Average message size is around 100 byte.
>> > > On a yarn worker node, run the performance test program from Kafka
>> > > repository to read the topic. The throughput is about 400k
>> messages/sec
>> > >
>> > > *Samza
>> > > Run TestPerformanceTask from Samza repository with no output stream
>> > > defined, and the throughput no more than 130k messages/sec
>> > >
>> > >
>> > > How can I explain/fix this performance difference?
>> > >
>> > > What I have done so far:
>> > >
>> > > 1. Monitor yarn worker node resource usage.
>> > > When the job is running, cpu and memory usage are never more than 5%
>> > > except at the beginning of the run. No significant network and disk IO
>> > > either
>> > >
>> > > 2. Monitor worker node network traffic
>> > > Tcpdump shows an interesting pattern. The yarn worker node will fetch
>> a
>> > > block of data from the kafka broker, and after that, it will handshake
>> > > with the same kafka broker once every 100 ms for 300 ms before
>> fetching
>> > > the next block.
>> > >
>> > > If I increase systems.kafka.samza.fetch.threshold to 500k, i.e., 10x
>> the
>> > > default settings, this handshake lasts about 3 seconds. If I set
>> > > fetch.threshold to 250k, this idle period then becomes 1.5 sec. It
>> seems
>> > > kafka consumer greatly outpaced process() call
>> > >
>> > > 3. Check Samza metrics
>> > > I do not see any excessive network calls to master or kafka broker,
>> > i.e.,
>> > > no more than 30 calls/sec. However, process-ms and choose-ms are
>> > > approaching 2ms
>> > >
>> > > Any input would be greatly appreciated.
>> > >
>> > > Thanks,
>> > >
>> > > George
>> > >
>> > >
>> > >
>> > >
>> > >
>> >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to