Hi Guozhang,

I pre-populated the topic with 25 million rows. There is no new writes to 
the topic

Below is the metrics for a run that consumes about 6 million rows from 
topic throughput-test2 at the 60th second.

To isolate the problem, I am going to write a simple non-samza program 
that consumes kafka, deploy it via yarn, and profile its performance.

Thanks,

George

{
   "metrics":{
      "org.apache.samza.system.SystemConsumersMetrics":{
         "kafka-messages-per-poll":117,
         "chose-object":5780074,
         "kafka-ssp-fetches-per-poll":117,
         "deserialization error":0,
         "ssps-needed-by-chooser":1,
         "unprocessed-messages":47501,
         "kafka-throughput-test2-messages-chosen":5780073,
         "chose-null":116,
         "kafka-polls":1215,
         "poll-timeout":0
      },
      "org.apache.samza.metrics.JvmMetrics":{
         "mem-heap-committed-mb":211.9375,
         "threads-runnable":8,
         "threads-new":0,
         "mem-non-heap-committed-mb":39.296875,
         "mem-heap-used-mb":146.62978,
         "mem-non-heap-used-mb":23.491356,
         "threads-terminated":0,
         "concurrentmarksweep-gc-count":7,
         "parnew-gc-time-millis":2809,
         "parnew-gc-count":99,
         "gc-time-millis":3106,
         "concurrentmarksweep-gc-time-millis":297,
         "threads-blocked":0,
         "threads-timed-waiting":7,
         "threads-waiting":4,
         "gc-count":106
      },
      "org.apache.samza.container.SamzaContainerMetrics":{
         "process-null-envelopes":116,
         "process-envelopes":5785521,
         "process-calls":5785637,
         "process-ms":1.0759992493901296,
         "commit-calls":2,
         "commit-ms":1.036664885088188,
         "choose-ms":1.0810021518598216,
         "window-calls":0,
         "send-calls":0,
         "window-ms":1.0081726739312658
      },
      "org.apache.samza.system.chooser.RoundRobinChooserMetrics":{
         "buffered-messages":0
      },
      "org.apache.samza.system.kafka.KafkaSystemConsumerMetrics":{
         "kafka-throughput-test2-0-high-watermark":21003263,
         "buffered-message-count-SystemStreamPartition [kafka, 
throughput-test2, 0]":39108,
         "kafka-throughput-test2-0-offset-change":5866682,
         "blocking-poll-timeout-count-SystemStreamPartition [kafka, 
throughput-test2, 0]":2,
         "kafka-10.0.0.19-9092-topic-partitions":1,
         "blocking-poll-count-SystemStreamPartition [kafka, 
throughput-test2, 0]":0,
         "poll-count":117,
         "no-more-messages-SystemStreamPartition [kafka, throughput-test2, 
0]":false,
         "kafka-throughput-test2-0-bytes-read":336582316,
         "kafka-10.0.0.19-9092-bytes-read":336582316,
 "kafka-throughput-test2-0-messages-behind-high-watermark":15136581,
         "kafka-10.0.0.19-9092-reconnects":0,
         "kafka-throughput-test2-0-messages-read":5866682,
         "kafka-10.0.0.19-9092-messages-read":451,
         "kafka-10.0.0.19-9092-skipped-fetch-requests":409
      },
      "org.apache.samza.system.kafka.KafkaSystemProducerMetrics":{
         "kafka-producer-sends":0,
         "kafka-flush-ms":0.0,
         "kafka-flush-failed":0,
         "kafka-flushes":1,
         "kafka-producer-send-success":0,
         "kafka-producer-retries":0,
         "kafka-producer-send-failed":0
      },
      "org.apache.samza.system.SystemProducersMetrics":{
         "taskname-partition 0-flushes":1,
         "flushes":1,
         "taskname-partition 0-sends":0,
         "serialization error":0,
         "sends":0
      }
   },
   "header":{
      "reset-time":1432237990203,
      "job-id":"1",
      "time":1432238050889,
      "host":"testhost7",
      "container-name":"samza-container-0",
      "source":"samza-container-0",
      "job-name":"container-performance",
      "samza-version":"0.9.0",
      "version":"0.0.1"
   }
}{
   "metrics":{
      "org.apache.samza.container.TaskInstanceMetrics":{
         "process-calls":5785521,
         "messages-sent":0,
         "commit-calls":2,
         "flush-calls":2,
         "kafka-throughput-test2-0-offset":"5785520",
         "window-calls":0,
         "send-calls":0
      }
   },
   "header":{
      "reset-time":1432237990203,
      "job-id":"1",
      "time":1432238050904,
      "host":"testhost7",
      "container-name":"samza-container-0",
      "source":"TaskName-Partition 0",
      "job-name":"container-performance",
      "samza-version":"0.9.0",
      "version":"0.0.1"
   }
}





From:   Guozhang Wang <wangg...@gmail.com>
To:     dev@samza.apache.org, 
Date:   21/05/2015 05:46 PM
Subject:        Re: Samza job throughput much lower than Kafka throughput



Hi George,

How is the incoming traffic to your source topic? Is it more than 130K?

Guozhang


On Thu, May 21, 2015 at 11:44 AM, George Li <g...@ca.ibm.com> wrote:

> Hi Roger,
>
> These parameters dont seem to affect throughput much, probably because 
my
> test job just reads from kafka and doesnt write to it?
>
> Thanks,
>
> George
>
>
>
> From:   Roger Hoover <roger.hoo...@gmail.com>
> To:     "dev@samza.apache.org" <dev@samza.apache.org>,
> Date:   21/05/2015 12:04 PM
> Subject:        Re: Samza job throughput much lower than Kafka 
throughput
>
>
>
> Oops.  Sent too soon.  I mean:
>
> producer.batch.size=262144
> producer.linger.ms=5
> producer.compression.type=lz4
>
>
> On Thu, May 21, 2015 at 9:00 AM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > Hi George,
> >
> > You might also try tweaking the producer settings.
> >
> >         producer.batch.size=262144
> >         producer.linger.ms=5
> > producer.compression.type: lz4
> >
> > On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hi George,
> >>
> >> Is there any reason you need to set the following configs?
> >>
> >> systems.kafka.consumer.fetch.wait.max.ms= 1
> >>
> >> This setting will basically disable long pooling of the consumer 
which
> >> will
> >> then busy fetching data from broker, which has a large impact on
> network
> >> latency especially when the consumer is already caught up with the
> Kafka
> >> broker.
> >>
> >> Also when you say it is "slower than a program reading directly from
> >> Kafka." which consumer did your program use to read data from Kafka?
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, May 20, 2015 at 5:01 PM, George Li <g...@ca.ibm.com> wrote:
> >>
> >> > Hi Yi,
> >> >
> >> > Thanks for the reply. Below is my job config and code.
> >> >
> >> > When we run this job inside our dev docker container, which has
> >> zookeeper,
> >> > broker, and yarn installed locally,  its throughput is at least 50%
> >> higher
> >> > than our cluster run's.
> >> >
> >> > Thanks,
> >> >
> >> > George
> >> >
> >> > Configuration:
> >> >
> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >> > job.name=container-performance
> >> >
> >> > # YARN
> >> > yarn.container.count=1
> >> > yarn.container.memory.mb=2548
> >> > yarn.package.path={my package on hdfs}
> >> > yarn.container.retry.count=0
> >> > yarn.am.container.memory.mb=2048
> >> > yarn.am.jmx.enabled=false
> >> >
> >> > # Task
> >> > task.opts=-server -Xmx1024m -XX:+UseParNewGC 
-XX:+UseConcMarkSweepGC
> >> > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
> >> > -XX:+DisableExplicitGC -Djava.awt.headless=true
> >> >
> >> > task.class=samza.TestPerformanceTask
> >> > task.inputs=kafka.throughput-test2
> >> > task.log.interval=1000000
> >> > task.checkpoint.factory =
> >> > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >> > task.checkpoint.system=kafka
> >> > task.checkpoint.replication.factor=1
> >> >
> >> > # Kafka System (only used for coordinator stream in this test)
> >> >
> >> >
> >>
>
> 
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >> > systems.kafka.samza.fetch.threshold=50000
> >> >
> >> > systems.kafka.consumer.zookeeper.connect= {zookeeper}
> >> > systems.kafka.producer.bootstrap.servers={broker node}
> >> > systems.kafka.consumer.auto.offset.reset=smallest
> >> > systems.kafka.consumer.socket.receive.buffer.bytes= 2200000
> >> > systems.kafka.consumer.fetch.message.max.bytes= 1100000
> >> > systems.kafka.consumer.fetch.min.bytes= 1
> >> > systems.kafka.consumer.fetch.wait.max.ms= 1
> >> >
> >> > #define coordinator system
> >> > job.coordinator.system=kafka
> >> > job.coordinator.replication.factor=1
> >> >
> >> > systems.kafka.streams.throughput-test2.samza.reset.offset=true
> >> > systems.kafka.streams.throughput-test2.samza.offset.default=oldest
> >> > ~
> >> >
> >> > Job's code. This is mostly a copy-paste of the one in the 
repository
> >> >
> >> > object TestPerformanceTask {
> >> >   // No thread safety is needed for these variables because they're
> >> > mutated in
> >> >   //   // the process method, which is single threaded.
> >> >   var messagesProcessed = 0
> >> >   var startTime = 0L
> >> > }
> >> >
> >> > class TestPerformanceTask extends StreamTask with InitableTask with
> >> > Logging {
> >> >   import TestPerformanceTask._
> >> >
> >> >   /**
> >> >    *    * How many messages to process before a log message is
> printed.
> >> >    *       */
> >> >   var logInterval = 10000
> >> >
> >> >   /**
> >> >    *    * How many messages to process before shutting down.
> >> >    *       */
> >> >   var maxMessages = 10000000
> >> >
> >> >
> >> >   var outputSystemStream: Option[SystemStream] = None
> >> >
> >> >   def init(config: Config, context: TaskContext) {
> >> >     logInterval = config.getInt("task.log.interval", 10000)
> >> >     maxMessages = config.getInt("task.max.messages", 10000000)
> >> >     outputSystemStream = Option(config.get("task.outputs",
> >> > null)).map(Util.getSystemStreamFromNames(_))
> >> >     println("init!!")
> >> >   }
> >> >
> >> >   def process(envelope: IncomingMessageEnvelope, collector:
> >> > MessageCollector, coordinator: TaskCoordinator) {
> >> >     if (startTime == 0) {
> >> >       startTime = System.currentTimeMillis
> >> >     }
> >> >
> >> >     if (outputSystemStream.isDefined) {
> >> >       collector.send(new
> OutgoingMessageEnvelope(outputSystemStream.get,
> >> > envelope.getKey, envelope.getMessage))
> >> >     }
> >> >
> >> >     messagesProcessed += 1
> >> >
> >> >     if (messagesProcessed % logInterval == 0) {
> >> >       val seconds = (System.currentTimeMillis - startTime) / 1000
> >> >       println("Processed %s messages in %s seconds." format
> >> > (messagesProcessed, seconds))
> >> >     }
> >> >
> >> >
> >> >     if (messagesProcessed >= maxMessages) {
> >> >       coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
> >> >     }
> >> >   }
> >> > }
> >> >
> >> >
> >> >
> >> > From:   Yi Pan <nickpa...@gmail.com>
> >> > To:     dev@samza.apache.org,
> >> > Date:   20/05/2015 05:03 PM
> >> > Subject:        Re: Samza job throughput much lower than Kafka
> >> throughput
> >> >
> >> >
> >> >
> >> > Hi, George,
> >> >
> >> > Could you share w/ us the code and configuration of your sample 
test
> >> job?
> >> > Thanks!
> >> >
> >> > -Yi
> >> >
> >> > On Wed, May 20, 2015 at 1:19 PM, George Li <g...@ca.ibm.com> wrote:
> >> >
> >> > > Hi,
> >> > >
> >> > > We are evaluating Samza's performance, and our sample job with
> >> > > TestPerformanceTask is much slower than a program reading 
directly
> >> from
> >> > > Kafka.
> >> > >
> >> > > Scenario:
> >> > >
> >> > > * Cluster:
> >> > > 1 master node for Zookeeper and yarn.
> >> > > 3 Kafka broker nodes
> >> > > 3 yarn worker nodes
> >> > >
> >> > > * Kafka:
> >> > > Topic has only 1 partition. Average message size is around 100
> byte.
> >> > > On a yarn worker node, run the performance test program from 
Kafka
> >> > > repository to read the topic. The throughput is about 400k
> >> messages/sec
> >> > >
> >> > > *Samza
> >> > > Run TestPerformanceTask from Samza repository with no output 
stream
> >> > > defined, and the throughput no more than 130k messages/sec
> >> > >
> >> > >
> >> > > How can I explain/fix this performance difference?
> >> > >
> >> > > What I have done so far:
> >> > >
> >> > > 1. Monitor yarn worker node resource usage.
> >> > > When the job is running, cpu and memory usage are never more than
> 5%
> >> > > except at the beginning of the run. No significant network and 
disk
> IO
> >> > > either
> >> > >
> >> > > 2. Monitor worker node network traffic
> >> > > Tcpdump shows an interesting pattern. The yarn worker node will
> fetch
> >> a
> >> > > block of data from the kafka broker, and after that, it will
> handshake
> >> > > with the same kafka broker once every 100 ms for 300 ms before
> >> fetching
> >> > > the next block.
> >> > >
> >> > > If I increase systems.kafka.samza.fetch.threshold to 500k, i.e.,
> 10x
> >> the
> >> > > default settings, this handshake lasts about 3 seconds. If I set
> >> > > fetch.threshold to 250k, this idle period then becomes 1.5 sec. 
It
> >> seems
> >> > > kafka consumer greatly outpaced process() call
> >> > >
> >> > > 3. Check Samza metrics
> >> > > I do not see any excessive network calls to master or kafka 
broker,
> >> > i.e.,
> >> > > no more than 30 calls/sec. However, process-ms and choose-ms are
> >> > > approaching 2ms
> >> > >
> >> > > Any input would be greatly appreciated.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > George
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> >
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>
>


-- 
-- Guozhang

Reply via email to