Hi Akka Team,
Do you have any benchmarks result to share on mqtt connector ?
Because I'm facing a problem during a load tests.
My current mqtt setup is a cluster of 3 nodes, I created a stream reading
data coming from each mqtt node using an automatic load dispatcher provided
by EMQ (Erlang Mqtt Broker).
My publishers are sending about 15 000 msg / sec with a payload of 1,3 K
using QOS 1, my consumers have the good throughput during 1/2 hours and
then, my consumers throughput decreases without any reasons. I mean that no
exceptions have been raised or my mqtt cluster didn't crash.
I also profile some network informations about TCP errors or something else
but I found nothing in this side too.
The only information (see picture) that I have is about a peak of CPU
before my consumers throughtput decreases.
About my code, I'm doing nothing special :
object MainMqttConsumer {
case object Start
case class ActorConsumer
(
broker : String,
clientId : String,
topics : List[String]
)(implicit val mat : ActorMaterializer) extends Actor with ActorLogging {
import context.dispatcher
val connectionSettings = MqttConnectionSettings(
this.broker,
this.clientId,
new MemoryPersistence
)
val subscriptions = topics.map(t => t -> MqttQoS.AtLeastOnce).toMap
val settings = MqttSourceSettings(connectionSettings, subscriptions)
val mqttSource = MqttSource(settings, bufferSize = 8)
val STREAMING_BATCH_TOPIC_PREFIX = "streaming_batch"
val EVENT_BATCH_TOPIC_PREFIX = "event_batch"
val TOPIC_SEPARATOR = "/"
val FGS_KEY_SEPARATOR = "_"
def removeTopicFromKey(key : String) : Option[String] = {
val chunks = key.split(TOPIC_SEPARATOR)
if(chunks.size != 3) None
else Some(chunks(1) + FGS_KEY_SEPARATOR + chunks(2))
}
def groupDataByTopics(batch : Seq[MqttMessage]) : Map[String, Array[
Array[Byte]]] =
batch
.foldLeft(Map[String, Array[Array[Byte]]]()) { case (acc, msg) =>
val topicName = msg.topic
val bcpPacket =
msg
.payload
.toByteBuffer
.order(ByteOrder.LITTLE_ENDIAN)
.array()
if (topicName.startsWith(STREAMING_BATCH_TOPIC_PREFIX)) {
removeTopicFromKey(topicName) match {
case None => acc
case Some(key) =>
val dataKey = key + "_data"
acc.get(dataKey) match {
case None => acc + (dataKey -> Array(bcpPacket))
case Some(buffer) => acc + (dataKey -> (buffer :+
bcpPacket))
}
}
} else if (topicName.startsWith(EVENT_BATCH_TOPIC_PREFIX)) {
removeTopicFromKey(topicName) match {
case None => acc
case Some(key) =>
val eventKey = key + "_events"
acc.get(eventKey) match {
case None => acc + (eventKey -> Array(bcpPacket))
case Some(buffer) => acc + (eventKey -> (buffer :+
bcpPacket))
}
}
} else acc
}
override def receive = {
case Start =>
log.info("Stream begins on topics " + topics.mkString("-") + " with
client id :" + this.clientId)
mqttSource
.groupedWithin(10000, 1.seconds)
.map { batch =>
log.info("Fetch {} msgs", batch.length)
val grouped = this.groupDataByTopics(batch)
val futures = grouped
.map { case (_, data) =>
Future.successful(data.length)
}
val t0 = System.currentTimeMillis()
Future
.sequence(futures)
.map { i =>
val t1 = System.currentTimeMillis()
log.info("{} futures completed in {} millis",i.size,t1-t0)
}
}.toMat(Sink.ignore)(Keep.both)
.run()
}
}
}
I see that the buffer size is used for the backpressure, I didn't test with
a higher value than 8. Do you think that in context of a high number of
messages per second, it would be better to increase the buffer size ?
Thanking you in advance,
Alifirat.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.