And now I added another version where the server just streams random numbers until the client disconnects, then it closes the connection. It needed a custom stage though to make emitting from an Iterable interruptible (mapConcat does not interrupt on completion, only on errors).
On Wed, Jul 29, 2015 at 1:59 PM, Endre Varga <[email protected]> wrote: > I now updated the gist with the reverse direction: Now a client sends a > String command and expects an Iterable[Int] back as a response. I currently > limited the funcionality to one request per connection, since otherwise I > would need a bit more elaborate codec which would complicate the example (I > would need to add a delimiter between the iterables on the wire. Not too > hard to add it though). It still shows how these things are supposed to > work. > > -Endre > > On Wed, Jul 29, 2015 at 1:14 PM, Akka Team <[email protected]> > wrote: > >> Hi Derek, >> >> It is not that hard, but you need to develop a certain kind of intuition >> to attack these problems. I very much recommend the new documentation page >> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html >> as it helps to visualize the ideas. >> >> I created a sample app that does what you want, you can find the gist >> here: https://gist.github.com/drewhk/25bf7472db04b5699b80 >> >> The features in that app: >> - exposes the client API as a Source[Int, Unit]. Anytime you >> materialize that source and send it data, it will open a TCP connection and >> dump the integers to the server, then closes the connection >> - exposes the server API as a Source[(InetSocketAddress, >> Iterable[Int]), Future[ServerBinding]]. It will provide you with a >> continuous stream of client address, client data iterable pairs. >> - includes a simple codec pair for encoding the Ints. It is kind of >> stupid for this use case, but it works. >> >> Some notes: >> - draining the client data to an Iterable might be suboptimal if the >> Iterables are large, in this case a Source[Int] would be a better >> abstraction >> - the implementation caps the size of the Iterable but currently just >> silently ignores overflows (I was lazy to build a stage or use fold for >> this sample, so I used grouped()) >> >> -Endre >> >> >> On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt <[email protected]> >> wrote: >> >>> Hi, >>> >>> I'm still trying to figure out the best way to work with TCP flows and, >>> while I've got something working, this seems really quite wrong, so there's >>> gotta be a better way. >>> >>> What I want to do is send an Iterable[Int] from the client to the >>> server and have the server materialize that resulting flow in a >>> Future[Iterable[Int]]. >>> >>> >>> val bytesStage = // elided... BidiFlow of serialization and framing >>> >>> val serverValuePromise = Promise[Seq[AnyRef]]() >>> >>> // Technically, the materialized value isn't important, since it's >>> actually going to be pulled out >>> // via the Promise >>> val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow >>> .wrap( >>> // Consume the client's stream and complete the serverValuePromise >>> with its folded result >>> Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) => acc :+ v). >>> mapMaterializedValue(v => { serverValuePromise.completeWith(v); v }), >>> // We're not sending anything from this side >>> Source.empty)(Keep.left) >>> >>> // The server >>> val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle( >>> serverConsumerFlow.join(bytesStage), "0.0.0.0", 0, halfClose = true) >>> >>> // We really want to stop listening once the client has successfully >>> connected, but this is good >>> // enough >>> serverValuePromise.future.onComplete { >>> case _ => >>> serverSide.onSuccess { >>> case binding => binding.unbind() >>> } >>> } >>> >>> // I need the endpoint where the client needs to connect >>> val destination = Await.result(serverSide, 1.second).localAddress >>> >>> // Get the source running >>> Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp >>> ().outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run() >>> >>> // Print out what the client has sent to the server >>> Await.result(serverValuePromise.future, 1.second).foreach(t => println(s"tt: >>> $t")) >>> >>> I tried doing this the other way around - where the server side supplies >>> source - but this caused me issues with actually shutting down the socket. >>> Having the client do it seems to make shutting down the socket on >>> completion of the source, just naturally occur. The problem with the >>> server side providing the source was that the client source needed to >>> finish "properly". If I created it as `empty` then it would kill things >>> too quickly. If I then created it as a n Actor source that just didn't do >>> anything, I couldn't find a decent way to close it. >>> >>> There's gotta be a better way to do this, but I'm too much of a noob to >>> see it. Can anyone improve this code for me? >>> >>> Thanks, >>> Derek >>> >>> >>> >>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> >>>>>>>>>> Search the archives: >>> https://groups.google.com/group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at http://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> >> >> -- >> Akka Team >> Typesafe - Reactive apps on the JVM >> Blog: letitcrash.com >> Twitter: @akkateam >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
