Hello,
I have an application that reads events from a Kafka queue and based on the
content calls an API endpoint.
Here is the relevant part of the class that creates the stream:
private def createFlowsFor(consumer: Publisher[StringKafkaMessage]) =
Source(consumer)
.map(_.message())
.map(RuleActionBuilder.build)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.via(ruleActionFlow)
.to(Sink.foreach(x => {
FalconStatsD.increment("addingRule")
logger.info("addingRule " + x)
})).run()
private def ruleActionFlow =
Flow[RuleAction].mapAsync(parallelism) {
ruleAction => {
val ruleHttpClient = new
GnipRuleHttpClient(config.getString("gnip.host"),
config.getInt("gnip.port"), config.getString("gnip" +
".uri")) with BasicAuthorization
ruleHttpClient.updateRules(ruleAction)
}
}
And here is the class that is responsible for calling the http endpoint:
class GnipRuleHttpClient(host: String, port: Int, uri: String)(implicit
system: ActorSystem, implicit val materializer: Materializer) {
this: Authorization =>
private implicit val executionContext = system.dispatcher
lazy val connectionFlow = Http().outgoingConnectionTls(host, port)
def updateRules(ruleAction: RuleAction): Future[HttpResponse] =
ruleAction match {
case RuleAction(ActionType.Post, rule) =>
requestFlow(POST, rule)
case RuleAction(ActionType.Delete, rule) =>
requestFlow(HttpMethod.custom("DELETE", safe = false, idempotent =
true, entityAccepted = true), rule)
}
private def requestFlow(httpsMethod: HttpMethod, rule: Rule):
Future[HttpResponse] = {
val format: String =
String.format("{\"rules\":[{\"value\":\"from:%s\"},{\"value\":\"@%s\"}]}",
rule.profileId.id, rule.handle.handle)
val json = ByteString(format)
val request = HttpRequest(
httpsMethod,
uri = Uri(uri),
entity = HttpEntity(ContentType(MediaTypes.`application/json`,
HttpCharsets.`UTF-8`), json)
) ~> authorize
Source.single(request)
.via(connectionFlow)
.runWith(Sink.head)
}
Every now and then the AWS instance that the service is running on gets an
unusually high load. I am suspecting that I am materializing too many
streams which build up over time. In the docs I read
> Closing Connections
>
> Akka HTTP actively closes an established connection upon reception of a
> response containing Connection: closeheader. The connection can also be
> closed by the server.
>
> An application can actively trigger the closing of the connection by
> completing the request stream. In this case the underlying TCP connection
> will be closed when the last pending response has been received.
>
I have checked and the endpoint is not returning a close header. So the
question now is - how do I enforce the closing of a connection? Do you
agree that this could be the problem i.e. inefficient use of akka http?
Any ideas or suggestions are appreciated!
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.