Hi Akka Team,

Do you have any benchmarks result to share on mqtt connector ?

Because I'm facing a problem during a load tests. 

My current mqtt setup is a cluster of 3 nodes, I created a stream reading 
data coming from each mqtt node using an automatic load dispatcher provided 
by EMQ (Erlang Mqtt Broker). 

My publishers are sending about 15 000 msg / sec with a payload of 1,3 K 
using QOS 1, my consumers have the good throughput during 1/2 hours and 
then, my consumers throughput decreases without any reasons. I mean that no 
exceptions have been raised or my mqtt cluster didn't crash.

I also profile some network informations about TCP errors or something else 
but I found nothing in this side too. 

The only information (see picture) that I have is about a peak of CPU 
before my consumers throughtput decreases.

About my code, I'm doing nothing special : 

object MainMqttConsumer {

  case object Start

  case class ActorConsumer
  (
    broker : String,
    clientId : String,
    topics : List[String]
  )(implicit val mat : ActorMaterializer) extends Actor with ActorLogging {

    import context.dispatcher

    val connectionSettings = MqttConnectionSettings(
      this.broker,
      this.clientId,
      new MemoryPersistence
    )

    val subscriptions = topics.map(t => t -> MqttQoS.AtLeastOnce).toMap
    val settings = MqttSourceSettings(connectionSettings, subscriptions)
    val mqttSource = MqttSource(settings, bufferSize = 8)

    val STREAMING_BATCH_TOPIC_PREFIX = "streaming_batch"
    val EVENT_BATCH_TOPIC_PREFIX = "event_batch"
    val TOPIC_SEPARATOR = "/"
    val FGS_KEY_SEPARATOR = "_"


    def removeTopicFromKey(key : String) : Option[String] = {
      val chunks = key.split(TOPIC_SEPARATOR)
      if(chunks.size != 3) None
      else Some(chunks(1) + FGS_KEY_SEPARATOR + chunks(2))
    }

    def groupDataByTopics(batch : Seq[MqttMessage]) : Map[String, Array[
Array[Byte]]] =
      batch
        .foldLeft(Map[String, Array[Array[Byte]]]()) { case (acc, msg) =>
          val topicName = msg.topic
          val bcpPacket =
            msg
              .payload
              .toByteBuffer
              .order(ByteOrder.LITTLE_ENDIAN)
              .array()


          if (topicName.startsWith(STREAMING_BATCH_TOPIC_PREFIX)) {
            removeTopicFromKey(topicName) match {
              case None => acc
              case Some(key) =>
                val dataKey = key + "_data"
                acc.get(dataKey) match {
                  case None => acc + (dataKey -> Array(bcpPacket))
                  case Some(buffer) => acc + (dataKey -> (buffer :+ 
bcpPacket))
                }
            }
          } else if (topicName.startsWith(EVENT_BATCH_TOPIC_PREFIX)) {
            removeTopicFromKey(topicName) match {
              case None => acc
              case Some(key) =>
                val eventKey = key + "_events"
                acc.get(eventKey) match {
                  case None => acc + (eventKey -> Array(bcpPacket))
                  case Some(buffer) => acc + (eventKey -> (buffer :+ 
bcpPacket))
                }
            }
          } else acc
        }


    override def receive = {
      case Start =>
        log.info("Stream begins on topics " + topics.mkString("-") + " with 
client id :" + this.clientId)
        mqttSource
          .groupedWithin(10000, 1.seconds)
          .map { batch =>
            log.info("Fetch {} msgs", batch.length)
            val grouped = this.groupDataByTopics(batch)
            val futures = grouped
              .map { case (_, data) =>
                Future.successful(data.length)
              }
            val t0 = System.currentTimeMillis()
            Future
              .sequence(futures)
              .map { i =>
                val t1 = System.currentTimeMillis()
                log.info("{} futures completed in {} millis",i.size,t1-t0)
              }
          }.toMat(Sink.ignore)(Keep.both)
          .run()
    }
  }
}

I see that the buffer size is used for the backpressure, I didn't test with 
a higher value than 8. Do you think that in context of a high number of 
messages per second, it would be better to increase the buffer size ? 

Thanking you in advance, 
Alifirat. 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to