Keeping the minimal dependencies of your build.sbt didn't change the error 
that I get :

scalaVersion := "2.12.2"


val akkaV = "2.4.17"
val kafkaV = "0.10.0.1"
val reactiveKafkaV = "0.13"


libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka-clients" % kafkaV,
  "org.apache.kafka" % "kafka_2.11" % kafkaV intransitive,
  "com.typesafe.akka" %% "akka-actor" % akkaV,
  "com.typesafe.akka" %% "akka-stream" % akkaV,
  "com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaV
)


fork in run := true


publishMavenStyle in ThisBuild := false



error] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for 
further details.
[info] [WARN] [06/24/2017 23:15:22.403] [toto-akka.kafka.default-dispatcher-
12] [akka://toto/system/kafka-consumer-1] Consumer interrupted with 
WakeupException after timeout. Message: null. Current value of 
akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
[info] [WARN] [06/24/2017 23:15:25.492] [toto-akka.kafka.default-dispatcher-
14] [akka://toto/system/kafka-consumer-1] Consumer interrupted with 
WakeupException after timeout. Message: null. Current value of 
akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
[error] Uncaught error from thread [toto-akka.kafka.default-dispatcher-16] 
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for 
ActorSystem[toto]
[info] [ERROR] [SECURITY][06/24/2017 23:15:26.299] [toto-akka.kafka.default-
dispatcher-16] [akka.actor.ActorSystemImpl(toto)] Uncaught error from 
thread [toto-akka.kafka.default-dispatcher-16] shutting down JVM since 
'akka.jvm-exit-on-fatal-error' is enabled
[error] java.lang.OutOfMemoryError: Direct buffer memory
[error] at java.nio.Bits.reserveMemory(Bits.java:693)
[error] at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
[error] at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
[error] at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
[error] at sun.nio.ch.IOUtil.read(IOUtil.java:195)

The error comes when I produce some data in the kafka topic.

Le jeudi 22 juin 2017 19:03:27 UTC+2, Kilic Ali-Firat a écrit :
>
> Hi everyone, 
>
> I was using the classical kafka-clients from apache and I did a little 
> program to compare the consumer throughput using reactive-kafka (I'm using 
> the last one). 
>
> My current use case is to listen a kafka topic, group the data by batch, 
> consume it and then commit. Below my code : 
>
>     implicit  val actorSystem = ActorSystem("benchmark-kafka")
>     implicit val actorMaterializer = ActorMaterializer()
>     
>     val consumerSettings =
>       ConsumerSettings(actorSystem, new StringDeserializer, new 
> ByteArrayDeserializer)
>         .withBootstrapServers("mybroker:9094")
>         .withGroupId("kafka-producer-bench")
>         .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
>         .withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "2000000")
>
>
>     Consumer
>       .committableSource(consumerSettings, Subscriptions.topics(
> "benchmark-producer"))
>       .groupedWithin(5000, 1.seconds)
>       .mapAsync(1) { group =>
>         println((new java.sql.Timestamp(System.currentTimeMillis()) + " : 
> Fetch " + group.size + " records"))
>         Future.successful(group)
>       }
>       .map {
>         group =>
>           group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) =>
>             batch.updated(elem.committableOffset)
>           }
>       }
>       .mapAsync(1) { msg =>
>         println((new Timestamp(System.currentTimeMillis())) + " Will 
> commit : " + msg.getOffsets())
>         msg.commitScaladsl()
>       }
>       .runWith(Sink.ignore)
>
>
> To make a little benchmark, I'm using a script founded in the kafka libs 
> called: "kafka-producer-perf-test.sh'. 
>
> There is something that I cannot understand : I cannot get the same 
> throughput than the kakfa producer. 
>
> *Kafka perfs trace *
>
> bin/kafka-producer-perf-test.sh --topic benchmark-producer --producer.config 
> config/producer.properties --record-size 1322 --num-records 300000 
> --throughput 
> 10000
> [2017-06-22 18:29:47,554] WARN The configuration 'key.deserializer' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.producer.ProducerConfig)
> [2017-06-22 18:29:47,555] WARN The configuration 'value.deserializer' was 
> supplied but isn't a known config. (org.apache.kafka.clients.producer.
> ProducerConfig)
> 28153 records sent, 5630,6 records/sec (7,10 MB/sec), 1187,7 ms avg 
> latency, 2187,0 max latency.
> 29748 records sent, 5949,6 records/sec (7,50 MB/sec), 3279,2 ms avg 
> latency, 4076,0 max latency.
> 30528 records sent, 6099,5 records/sec (7,69 MB/sec), 3946,9 ms avg 
> latency, 4043,0 max latency.
> 28476 records sent, 5694,1 records/sec (7,18 MB/sec), 4278,0 ms avg 
> latency, 4425,0 max latency.
> 29112 records sent, 5811,9 records/sec (7,33 MB/sec), 4168,8 ms avg 
> latency, 4303,0 max latency.
> 28404 records sent, 5680,8 records/sec (7,16 MB/sec), 4486,7 ms avg 
> latency, 4603,0 max latency.
> 29220 records sent, 5837,0 records/sec (7,36 MB/sec), 4081,9 ms avg 
> latency, 4243,0 max latency.
> 28728 records sent, 5745,6 records/sec (7,24 MB/sec), 4381,9 ms avg 
> latency, 4477,0 max latency.
> 29088 records sent, 5816,4 records/sec (7,33 MB/sec), 4089,1 ms avg 
> latency, 4238,0 max latency.
> 28080 records sent, 5614,9 records/sec (7,08 MB/sec), 4472,6 ms avg 
> latency, 4627,0 max latency.
> 300000 records sent, 5798,446016 records/sec (7,31 MB/sec), 3852,98 ms 
> avg latency, 4627,00 ms max latency, 4103 ms 50th, 4585 ms 95th, 4615 ms 
> 99th, 4623 ms 99.9th.
>
>
> Reactive kafka stream trace 
>
> [info] 2017-06-22 18:38:28.456 : Fetch 12 records
> [info] 2017-06-22 18:38:28.46 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22970806}
> [info] 2017-06-22 18:38:29.456 : Fetch 372 records
> [info] 2017-06-22 18:38:29.459 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22971178}
> [info] 2017-06-22 18:38:30.456 : Fetch 773 records
> [info] 2017-06-22 18:38:30.458 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22971951}
> [info] 2017-06-22 18:38:31.456 : Fetch 773 records
> [info] 2017-06-22 18:38:31.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22972724}
> [info] 2017-06-22 18:38:32.456 : Fetch 773 records
> [info] 2017-06-22 18:38:32.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22973497}
> [info] 2017-06-22 18:38:33.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:33.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22975043}
> [info] 2017-06-22 18:38:34.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:34.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22976589}
> [info] 2017-06-22 18:38:35.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:35.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22978135}
> [info] 2017-06-22 18:38:36.456 : Fetch 2319 records
> [info] 2017-06-22 18:38:36.458 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22980454}
> [info] 2017-06-22 18:38:37.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:37.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22982000}
> [info] 2017-06-22 18:38:38.455 : Fetch 2383 records
> [info] 2017-06-22 18:38:38.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22984383}
> [info] 2017-06-22 18:38:39.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:39.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22985929}
> [info] 2017-06-22 18:38:40.456 : Fetch 2319 records
> [info] 2017-06-22 18:38:40.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22988248}
> [info] 2017-06-22 18:38:41.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:41.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22989794}
> [info] 2017-06-22 18:38:42.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:42.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22991340}
> [info] 2017-06-22 18:38:43.455 : Fetch 2319 records
> [info] 2017-06-22 18:38:43.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22993659}
> [info] 2017-06-22 18:38:44.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:44.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22995205}
> [info] 2017-06-22 18:38:45.456 : Fetch 2319 records
> [info] 2017-06-22 18:38:45.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22997524}
> [info] 2017-06-22 18:38:46.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:46.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=22999070}
> [info] 2017-06-22 18:38:47.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:47.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=23000616}
> [info] 2017-06-22 18:38:48.456 : Fetch 2319 records
> [info] 2017-06-22 18:38:48.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=23002935}
> [info] 2017-06-22 18:38:49.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:49.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=23004481}
> [info] 2017-06-22 18:38:50.456 : Fetch 2319 records
> [info] 2017-06-22 18:38:50.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=23006800}
> [info] 2017-06-22 18:38:51.456 : Fetch 1546 records
> [info] 2017-06-22 18:38:51.457 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=23008346}
> [info] 2017-06-22 18:38:52.455 : Fetch 1546 records
> [info] 2017-06-22 18:38:52.456 Will commit : {GroupTopicPartition(kafka-
> bebch,benchmark-producer,0)=23009892}
> [info] 2017-06-22 18:38:53.455 : Fetch 2046 records
>
> I cannot understand the throughput difference between the akka stream 
> kafka consumer and the kafka benchmark. Maybe my process on the input 
> source is the cause of this latency ? 
>
> If you have advices to increase the throughput of my consumer, I'm 
> listening :) 
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to