Hi,

I've been working on a service that connects to a tool called GNIP that 
streams tweets through a TCP connection. I have two services that 
independently connect to GNIP and yesterday both of them went 
simultaneously down. I checked nestat and saw that the tcp connection was 
gone which made me think that the GNIP guys have redeployed and closed the 
connection for some reason. After looking into what Konrad Malawski posted 
on the matter <https://gist.github.com/ktoso/4dda7752bf6f4393d1ac>, I ended 
up implementing the following reconnection logic:

class StreamToKafkaFlowBuilder(config: Config)(implicit val materializer: 
akka.stream.Materializer) {
  implicit val actorSystem = ActorSystem("ReactiveKafka")
  private val log = Logger(this.getClass.getSimpleName)

  private val bufferSize = 60000

  private val source = Source.actorRef[ByteString](
    bufferSize,
    OverflowStrategy.fail
  )

  val decider: Supervision.Decider = {
    case e =>
      log.error("UNEXPECTED {}" + e.getClass)
      e.getStackTrace.foreach(println)
      Supervision.resume
  }

  private val runnableByteStringProcessingFlow =
    FlowGraph.closed(source) { implicit builder =>
      byteStringSource =>
        val tweetBroadCaster = builder.add(Broadcast[String](2))

        val byteStringToTweetFlow = Flow[ByteString].
          via(Framing.delimiter(
            ByteString("\r\n"), maximumFrameLength = bufferSize, 
allowTruncation = true))
          .map(_.utf8String + "\r\n")

        val tweetPrintSink = Sink.foreach[String](t => {
          t match {
            case "\r\n" => 
FalconStatsD.increment("keepAliveReceivedFromGnipPowerTrack")
            case _ => 
FalconStatsD.increment("activityReceivedFromGnipPowerTrack")
          }
          log.info(s"PUSHING TO KAFKA: $t")
        })

        val filterKeepAliveAndEmptyStrings = Flow[String].filter(s => s != 
"\r\n" && s != "")

        val kafkaSink = Sink.actorSubscriber(Props(new 
ActorSupervisor(config)))

        byteStringSource.outlet ~> Gzip.decoderFlow ~> 
Flow[ByteString].transform(() => new PushStage[ByteString, ByteString] {
          override def onPush(elem: ByteString, ctx: Context[ByteString]): 
SyncDirective = ctx.push(elem)

          override def onUpstreamFinish(ctx: Context[ByteString]): 
TerminationDirective = {
            println("start a new connection!")
            ctx.fail(throw new RuntimeException("GNIP closed the stream"))
          }

        }) ~> byteStringToTweetFlow ~> tweetBroadCaster.in
        tweetBroadCaster.out(0) ~> filterKeepAliveAndEmptyStrings ~> 
Flow[String].map(Timer.measure) ~>
          
kafkaSink.withAttributes(ActorAttributes.supervisionStrategy(decider))
        tweetBroadCaster.out(1) ~> tweetPrintSink
    }

  def build: ActorRef = runnableByteStringProcessingFlow.run()
}


I use akka-http to establish a connection with an actor. Once I am 
connected I forward all the content to an actor that represents a 
materialized stream from above.

class GnipStreamHttpClient(host: String, port: Int, processor: 
ActorRef)(implicit val materializer: Materializer) extends Actor with 
ActorLogging {
  this: Authorization =>

  private val system = context.system
  private implicit val executionContext = system.dispatcher
  private val config = system.settings.config

  val client = Http(system).outgoingConnectionTls(host, port, settings = 
ClientConnectionSettings(system))

  override def receive: Receive = {
    case response: HttpResponse if response.status.intValue / 100 == 2 =>
      log.info(s"Got successful response $response")
      response.entity.dataBytes.map(processor ! _).runWith(Sink.onComplete 
{ case _ => self ! "reconnect" })
    case response: HttpResponse =>
      log.info(s"Got unsuccessful response $response")
      system.shutdown()
    case _ =>
      val req = HttpRequest(GET, Uri(config.getString("gnip.uri")))
        .withHeaders(`Accept-Encoding`(gzip), Connection("Keep-Alive")) ~> 
authorize

      Source.single(req)
        .via(client)
        .runWith(Sink.head)
        .pipeTo(self)
  }
}



In order to simulate the closing of the connection I used *tcpkill*. 
Unfortunately I started getting 

2015-11-11 12:22:01 INFO  [-dispatcher-156] a.a.LocalActorRef Message 
[akka.io.Tcp$ResumeReading$] from 
Actor[akka://GNIP/system/IO-TCP-STREAM/client-1-stream.gnip.com%2F216.46.190.179%3A443#1932455454]
 
to Actor[akka://GNIP/system/IO-TCP/selectors/$a/0#1642790516] was not 
delivered. [1] dead letters encountered. This logging can be turned off or 
adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
2015-11-11 12:22:02 INFO  [-dispatcher-155] a.a.LocalActorRef Message 
[akka.io.Tcp$ResumeReading$] from 
Actor[akka://GNIP/system/IO-TCP-STREAM/client-2-stream.gnip.com%2F216.46.190.179%3A443#-1072863245]
 
to Actor[akka://GNIP/system/IO-TCP/selectors/$a/1#790015180] was not 
delivered. [2] dead letters encountered. This logging can be turned off or 
adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
2015-11-11 12:22:02 INFO  [-dispatcher-143] a.a.LocalActorRef Message 
[akka.io.Tcp$ResumeReading$] from 
Actor[akka://GNIP/system/IO-TCP-STREAM/client-3-stream.gnip.com%2F216.46.190.179%3A443#-188102508]
 
to Actor[akka://GNIP/system/IO-TCP/selectors/$a/2#-904885776] was not 
delivered. [3] dead letters encountered. This logging can be turned off or 
adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
2015-11-11 12:22:04 INFO  [-dispatcher-156] 
c.f.t.MainApp$$anonfun$1$$anon$1 Got successful response HttpResponse(200 
OK,List(Vary: Accept-Encoding, Date: Wed, 11 Nov 2015 12:22:03 GMT, 
Content-Encoding: gzip, Connection: 
close),HttpEntity.Chunked(application/json,akka.stream.scaladsl.Source@25867bc),HttpProtocol(HTTP/1.1))
2015-11-11 12:22:04 ERROR [-dispatcher-113] c.s.r.k.KafkaActorSubscriber 
Stopping Kafka subscriber due to fatal error. WARNING arguments left: 1
2015-11-11 12:22:04 INFO  [-dispatcher-156] k.p.Producer Shutting down 
producer
2015-11-11 12:22:04 INFO  [-dispatcher-156] k.p.ProducerPool Closing all 
sync producers
2015-11-11 12:22:04 INFO  [-dispatcher-156] k.p.Producer Producer shutdown 
completed in 10 ms

So the connection is reestablished but the stream doesn't get recreated. Or 
at least that's what I think. Can you please let know what I am missing 
here?

Thank you very much in advance.

/Alex

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to