Hi Alex, Currently there is no nice way to achieve this which I would recommend wholeheartedly (i.e does not involve ugly hacks which are easy to get wrong). We need to fill in some of the puzzle pieces in the 2.0-Mx series, and one of those will hopefully provide a maintainable way to rematerialize Flows on demand (for example reconnecting is one use-case).
-Endre On Wed, Nov 11, 2015 at 1:48 PM, Alexander Zafirov < [email protected]> wrote: > Hi, > > I've been working on a service that connects to a tool called GNIP that > streams tweets through a TCP connection. I have two services that > independently connect to GNIP and yesterday both of them went > simultaneously down. I checked nestat and saw that the tcp connection was > gone which made me think that the GNIP guys have redeployed and closed the > connection for some reason. After looking into what Konrad Malawski posted > on the matter <https://gist.github.com/ktoso/4dda7752bf6f4393d1ac>, I > ended up implementing the following reconnection logic: > > class StreamToKafkaFlowBuilder(config: Config)(implicit val materializer: > akka.stream.Materializer) { > implicit val actorSystem = ActorSystem("ReactiveKafka") > private val log = Logger(this.getClass.getSimpleName) > > private val bufferSize = 60000 > > private val source = Source.actorRef[ByteString]( > bufferSize, > OverflowStrategy.fail > ) > > val decider: Supervision.Decider = { > case e => > log.error("UNEXPECTED {}" + e.getClass) > e.getStackTrace.foreach(println) > Supervision.resume > } > > private val runnableByteStringProcessingFlow = > FlowGraph.closed(source) { implicit builder => > byteStringSource => > val tweetBroadCaster = builder.add(Broadcast[String](2)) > > val byteStringToTweetFlow = Flow[ByteString]. > via(Framing.delimiter( > ByteString("\r\n"), maximumFrameLength = bufferSize, > allowTruncation = true)) > .map(_.utf8String + "\r\n") > > val tweetPrintSink = Sink.foreach[String](t => { > t match { > case "\r\n" => > FalconStatsD.increment("keepAliveReceivedFromGnipPowerTrack") > case _ => > FalconStatsD.increment("activityReceivedFromGnipPowerTrack") > } > log.info(s"PUSHING TO KAFKA: $t") > }) > > val filterKeepAliveAndEmptyStrings = Flow[String].filter(s => s != > "\r\n" && s != "") > > val kafkaSink = Sink.actorSubscriber(Props(new > ActorSupervisor(config))) > > byteStringSource.outlet ~> Gzip.decoderFlow ~> > Flow[ByteString].transform(() => new PushStage[ByteString, ByteString] { > override def onPush(elem: ByteString, ctx: Context[ByteString]): > SyncDirective = ctx.push(elem) > > override def onUpstreamFinish(ctx: Context[ByteString]): > TerminationDirective = { > println("start a new connection!") > ctx.fail(throw new RuntimeException("GNIP closed the stream")) > } > > }) ~> byteStringToTweetFlow ~> tweetBroadCaster.in > tweetBroadCaster.out(0) ~> filterKeepAliveAndEmptyStrings ~> > Flow[String].map(Timer.measure) ~> > > kafkaSink.withAttributes(ActorAttributes.supervisionStrategy(decider)) > tweetBroadCaster.out(1) ~> tweetPrintSink > } > > def build: ActorRef = runnableByteStringProcessingFlow.run() > } > > > I use akka-http to establish a connection with an actor. Once I am > connected I forward all the content to an actor that represents a > materialized stream from above. > > class GnipStreamHttpClient(host: String, port: Int, processor: > ActorRef)(implicit val materializer: Materializer) extends Actor with > ActorLogging { > this: Authorization => > > private val system = context.system > private implicit val executionContext = system.dispatcher > private val config = system.settings.config > > val client = Http(system).outgoingConnectionTls(host, port, settings = > ClientConnectionSettings(system)) > > override def receive: Receive = { > case response: HttpResponse if response.status.intValue / 100 == 2 => > log.info(s"Got successful response $response") > response.entity.dataBytes.map(processor ! _).runWith(Sink.onComplete > { case _ => self ! "reconnect" }) > case response: HttpResponse => > log.info(s"Got unsuccessful response $response") > system.shutdown() > case _ => > val req = HttpRequest(GET, Uri(config.getString("gnip.uri"))) > .withHeaders(`Accept-Encoding`(gzip), Connection("Keep-Alive")) ~> > authorize > > Source.single(req) > .via(client) > .runWith(Sink.head) > .pipeTo(self) > } > } > > > > In order to simulate the closing of the connection I used *tcpkill*. > Unfortunately I started getting > > 2015-11-11 12:22:01 INFO [-dispatcher-156] a.a.LocalActorRef Message > [akka.io.Tcp$ResumeReading$] from Actor[akka://GNIP/system/IO-TCP-STREAM/ > client-1-stream.gnip.com%2F216.46.190.179%3A443#1932455454] to > Actor[akka://GNIP/system/IO-TCP/selectors/$a/0#1642790516] was not > delivered. [1] dead letters encountered. This logging can be turned off or > adjusted with configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'. > 2015-11-11 12:22:02 INFO [-dispatcher-155] a.a.LocalActorRef Message > [akka.io.Tcp$ResumeReading$] from Actor[akka://GNIP/system/IO-TCP-STREAM/ > client-2-stream.gnip.com%2F216.46.190.179%3A443#-1072863245] to > Actor[akka://GNIP/system/IO-TCP/selectors/$a/1#790015180] was not > delivered. [2] dead letters encountered. This logging can be turned off or > adjusted with configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'. > 2015-11-11 12:22:02 INFO [-dispatcher-143] a.a.LocalActorRef Message > [akka.io.Tcp$ResumeReading$] from Actor[akka://GNIP/system/IO-TCP-STREAM/ > client-3-stream.gnip.com%2F216.46.190.179%3A443#-188102508] to > Actor[akka://GNIP/system/IO-TCP/selectors/$a/2#-904885776] was not > delivered. [3] dead letters encountered. This logging can be turned off or > adjusted with configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'. > 2015-11-11 12:22:04 INFO [-dispatcher-156] > c.f.t.MainApp$$anonfun$1$$anon$1 Got successful response HttpResponse(200 > OK,List(Vary: Accept-Encoding, Date: Wed, 11 Nov 2015 12:22:03 GMT, > Content-Encoding: gzip, Connection: > close),HttpEntity.Chunked(application/json,akka.stream.scaladsl.Source@25867bc > ),HttpProtocol(HTTP/1.1)) > 2015-11-11 12:22:04 ERROR [-dispatcher-113] c.s.r.k.KafkaActorSubscriber > Stopping Kafka subscriber due to fatal error. WARNING arguments left: 1 > 2015-11-11 12:22:04 INFO [-dispatcher-156] k.p.Producer Shutting down > producer > 2015-11-11 12:22:04 INFO [-dispatcher-156] k.p.ProducerPool Closing all > sync producers > 2015-11-11 12:22:04 INFO [-dispatcher-156] k.p.Producer Producer shutdown > completed in 10 ms > > So the connection is reestablished but the stream doesn't get recreated. > Or at least that's what I think. Can you please let know what I am missing > here? > > Thank you very much in advance. > > /Alex > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
