Hi,
I've been working on a service that connects to a tool called GNIP that
streams tweets through a TCP connection. I have two services that
independently connect to GNIP and yesterday both of them went
simultaneously down. I checked nestat and saw that the tcp connection was
gone which made me think that the GNIP guys have redeployed and closed the
connection for some reason. After looking into what Konrad Malawski posted
on the matter <https://gist.github.com/ktoso/4dda7752bf6f4393d1ac>, I ended
up implementing the following reconnection logic:
class StreamToKafkaFlowBuilder(config: Config)(implicit val materializer:
akka.stream.Materializer) {
implicit val actorSystem = ActorSystem("ReactiveKafka")
private val log = Logger(this.getClass.getSimpleName)
private val bufferSize = 60000
private val source = Source.actorRef[ByteString](
bufferSize,
OverflowStrategy.fail
)
val decider: Supervision.Decider = {
case e =>
log.error("UNEXPECTED {}" + e.getClass)
e.getStackTrace.foreach(println)
Supervision.resume
}
private val runnableByteStringProcessingFlow =
FlowGraph.closed(source) { implicit builder =>
byteStringSource =>
val tweetBroadCaster = builder.add(Broadcast[String](2))
val byteStringToTweetFlow = Flow[ByteString].
via(Framing.delimiter(
ByteString("\r\n"), maximumFrameLength = bufferSize,
allowTruncation = true))
.map(_.utf8String + "\r\n")
val tweetPrintSink = Sink.foreach[String](t => {
t match {
case "\r\n" =>
FalconStatsD.increment("keepAliveReceivedFromGnipPowerTrack")
case _ =>
FalconStatsD.increment("activityReceivedFromGnipPowerTrack")
}
log.info(s"PUSHING TO KAFKA: $t")
})
val filterKeepAliveAndEmptyStrings = Flow[String].filter(s => s !=
"\r\n" && s != "")
val kafkaSink = Sink.actorSubscriber(Props(new
ActorSupervisor(config)))
byteStringSource.outlet ~> Gzip.decoderFlow ~>
Flow[ByteString].transform(() => new PushStage[ByteString, ByteString] {
override def onPush(elem: ByteString, ctx: Context[ByteString]):
SyncDirective = ctx.push(elem)
override def onUpstreamFinish(ctx: Context[ByteString]):
TerminationDirective = {
println("start a new connection!")
ctx.fail(throw new RuntimeException("GNIP closed the stream"))
}
}) ~> byteStringToTweetFlow ~> tweetBroadCaster.in
tweetBroadCaster.out(0) ~> filterKeepAliveAndEmptyStrings ~>
Flow[String].map(Timer.measure) ~>
kafkaSink.withAttributes(ActorAttributes.supervisionStrategy(decider))
tweetBroadCaster.out(1) ~> tweetPrintSink
}
def build: ActorRef = runnableByteStringProcessingFlow.run()
}
I use akka-http to establish a connection with an actor. Once I am
connected I forward all the content to an actor that represents a
materialized stream from above.
class GnipStreamHttpClient(host: String, port: Int, processor:
ActorRef)(implicit val materializer: Materializer) extends Actor with
ActorLogging {
this: Authorization =>
private val system = context.system
private implicit val executionContext = system.dispatcher
private val config = system.settings.config
val client = Http(system).outgoingConnectionTls(host, port, settings =
ClientConnectionSettings(system))
override def receive: Receive = {
case response: HttpResponse if response.status.intValue / 100 == 2 =>
log.info(s"Got successful response $response")
response.entity.dataBytes.map(processor ! _).runWith(Sink.onComplete
{ case _ => self ! "reconnect" })
case response: HttpResponse =>
log.info(s"Got unsuccessful response $response")
system.shutdown()
case _ =>
val req = HttpRequest(GET, Uri(config.getString("gnip.uri")))
.withHeaders(`Accept-Encoding`(gzip), Connection("Keep-Alive")) ~>
authorize
Source.single(req)
.via(client)
.runWith(Sink.head)
.pipeTo(self)
}
}
In order to simulate the closing of the connection I used *tcpkill*.
Unfortunately I started getting
2015-11-11 12:22:01 INFO [-dispatcher-156] a.a.LocalActorRef Message
[akka.io.Tcp$ResumeReading$] from
Actor[akka://GNIP/system/IO-TCP-STREAM/client-1-stream.gnip.com%2F216.46.190.179%3A443#1932455454]
to Actor[akka://GNIP/system/IO-TCP/selectors/$a/0#1642790516] was not
delivered. [1] dead letters encountered. This logging can be turned off or
adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
2015-11-11 12:22:02 INFO [-dispatcher-155] a.a.LocalActorRef Message
[akka.io.Tcp$ResumeReading$] from
Actor[akka://GNIP/system/IO-TCP-STREAM/client-2-stream.gnip.com%2F216.46.190.179%3A443#-1072863245]
to Actor[akka://GNIP/system/IO-TCP/selectors/$a/1#790015180] was not
delivered. [2] dead letters encountered. This logging can be turned off or
adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
2015-11-11 12:22:02 INFO [-dispatcher-143] a.a.LocalActorRef Message
[akka.io.Tcp$ResumeReading$] from
Actor[akka://GNIP/system/IO-TCP-STREAM/client-3-stream.gnip.com%2F216.46.190.179%3A443#-188102508]
to Actor[akka://GNIP/system/IO-TCP/selectors/$a/2#-904885776] was not
delivered. [3] dead letters encountered. This logging can be turned off or
adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
2015-11-11 12:22:04 INFO [-dispatcher-156]
c.f.t.MainApp$$anonfun$1$$anon$1 Got successful response HttpResponse(200
OK,List(Vary: Accept-Encoding, Date: Wed, 11 Nov 2015 12:22:03 GMT,
Content-Encoding: gzip, Connection:
close),HttpEntity.Chunked(application/json,akka.stream.scaladsl.Source@25867bc),HttpProtocol(HTTP/1.1))
2015-11-11 12:22:04 ERROR [-dispatcher-113] c.s.r.k.KafkaActorSubscriber
Stopping Kafka subscriber due to fatal error. WARNING arguments left: 1
2015-11-11 12:22:04 INFO [-dispatcher-156] k.p.Producer Shutting down
producer
2015-11-11 12:22:04 INFO [-dispatcher-156] k.p.ProducerPool Closing all
sync producers
2015-11-11 12:22:04 INFO [-dispatcher-156] k.p.Producer Producer shutdown
completed in 10 ms
So the connection is reestablished but the stream doesn't get recreated. Or
at least that's what I think. Can you please let know what I am missing
here?
Thank you very much in advance.
/Alex
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.