Hey Endre,
Thanks so much for doing this. I've looked through it (and skimmed the
new doc page) but haven't digested it well enough to say anything other
than "thanks". :) I'll get back to you in the next day or so, perhaps
with a couple of questions.
Akka Team wrote:
And now I added another version where the server just streams random
numbers until the client disconnects, then it closes the connection.
It needed a custom stage though to make emitting from an Iterable
interruptible (mapConcat does not interrupt on completion, only on
errors).
On Wed, Jul 29, 2015 at 1:59 PM, Endre Varga <[email protected]
<mailto:[email protected]>> wrote:
I now updated the gist with the reverse direction: Now a client
sends a String command and expects an Iterable[Int] back as a
response. I currently limited the funcionality to one request per
connection, since otherwise I would need a bit more elaborate
codec which would complicate the example (I would need to add a
delimiter between the iterables on the wire. Not too hard to add
it though). It still shows how these things are supposed to work.
-Endre
On Wed, Jul 29, 2015 at 1:14 PM, Akka Team
<[email protected] <mailto:[email protected]>> wrote:
Hi Derek,
It is not that hard, but you need to develop a certain kind of
intuition to attack these problems. I very much recommend the
new documentation page
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html
as it helps to visualize the ideas.
I created a sample app that does what you want, you can find
the gist here: https://gist.github.com/drewhk/25bf7472db04b5699b80
The features in that app:
- exposes the client API as a Source[Int, Unit]. Anytime you
materialize that source and send it data, it will open a TCP
connection and dump the integers to the server, then closes
the connection
- exposes the server API as a Source[(InetSocketAddress,
Iterable[Int]), Future[ServerBinding]]. It will provide you
with a continuous stream of client address, client data
iterable pairs.
- includes a simple codec pair for encoding the Ints. It is
kind of stupid for this use case, but it works.
Some notes:
- draining the client data to an Iterable might be suboptimal
if the Iterables are large, in this case a Source[Int] would
be a better abstraction
- the implementation caps the size of the Iterable but
currently just silently ignores overflows (I was lazy to build
a stage or use fold for this sample, so I used grouped())
-Endre
On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt
<[email protected] <mailto:[email protected]>> wrote:
Hi,
I'm still trying to figure out the best way to work with
TCP flows and, while I've got something working, this
seems really quite wrong, so there's gotta be a better way.
What I want to do is send an Iterable[Int] from the client
to the server and have the server materialize that
resulting flow in a Future[Iterable[Int]].
||
val bytesStage =// elided... BidiFlow of serialization and
framing
val serverValuePromise =Promise[Seq[AnyRef]]()
// Technically, the materialized value isn't important,
since it's actually going to be pulled out
// via the Promise
val
serverConsumerFlow:Flow[AnyRef,AnyRef,Future[Seq[AnyRef]]]=Flow.wrap(
// Consume the client's stream and complete the
serverValuePromise with its folded result
Sink.fold(Vector.empty[AnyRef])((acc,v:AnyRef)=>acc
:+v).mapMaterializedValue(v
=>{serverValuePromise.completeWith(v);v }),
// We're not sending anything from this side
Source.empty)(Keep.left)
// The server
val
serverSide:Future[ServerBinding]=StreamTcp().bindAndHandle(serverConsumerFlow.join(bytesStage),"0.0.0.0",0,halfClose
=true)
// We really want to stop listening once the client has
successfully connected, but this is good
// enough
serverValuePromise.future.onComplete {
case_ =>
serverSide.onSuccess {
casebinding =>binding.unbind()
}
}
// I need the endpoint where the client needs to connect
val destination
=Await.result(serverSide,1.second).localAddress
// Get the source running
Source((1to
10).map(newInteger(_))).via(bytesStage.joinMat(StreamTcp().outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run()
// Print out what the client has sent to the server
Await.result(serverValuePromise.future,1.second).foreach(t
=>println(s"tt: $t"))
I tried doing this the other way around - where the server
side supplies source - but this caused me issues with
actually shutting down the socket. Having the client do it
seems to make shutting down the socket on completion of
the source, just naturally occur. The problem with the
server side providing the source was that the client
source needed to finish "properly". If I created it as
`empty` then it would kill things too quickly. If I then
created it as a n Actor source that just didn't do
anything, I couldn't find a decent way to close it.
There's gotta be a better way to do this, but I'm too much
of a noob to see it. Can anyone improve this code for me?
Thanks,
Derek
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives:
https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to
the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails
from it, send an email to
[email protected]
<mailto:[email protected]>.
To post to this group, send email to
[email protected]
<mailto:[email protected]>.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com <http://letitcrash.com>
Twitter: @akkateam
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives:
https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the
Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from
it, send an email to [email protected]
<mailto:[email protected]>.
To post to this group, send email to
[email protected] <mailto:[email protected]>.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives:
https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google
Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it,
send an email to [email protected]
<mailto:[email protected]>.
To post to this group, send email to [email protected]
<mailto:[email protected]>.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com <http://letitcrash.com>
Twitter: @akkateam
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the
Google Groups "Akka User List" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
[email protected]
<mailto:[email protected]>.
To post to this group, send email to [email protected]
<mailto:[email protected]>.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.