Hi,

I'm still trying to figure out the best way to work with TCP flows and, 
while I've got something working, this seems really quite wrong, so there's 
gotta be a better way.

What I want to do is send an Iterable[Int] from the client to the server 
and have the server materialize that resulting flow in a 
Future[Iterable[Int]]. 


val bytesStage = // elided... BidiFlow of serialization and framing

val serverValuePromise = Promise[Seq[AnyRef]]()

// Technically, the materialized value isn't important, since it's actually 
going to be pulled out
// via the Promise
val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow.
wrap(
  // Consume the client's stream and complete the serverValuePromise with 
its folded result
  Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) => acc :+ v).
mapMaterializedValue(v => { serverValuePromise.completeWith(v); v }),
  // We're not sending anything from this side
  Source.empty)(Keep.left)

// The server
val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle(
serverConsumerFlow.join(bytesStage), "0.0.0.0", 0, halfClose = true)

// We really want to stop listening once the client has successfully 
connected, but this is good
// enough
serverValuePromise.future.onComplete {
  case _ =>
    serverSide.onSuccess {
      case binding => binding.unbind()
    }
}

// I need the endpoint where the client needs to connect
val destination = Await.result(serverSide, 1.second).localAddress

// Get the source running
Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp().
outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run()

// Print out what the client has sent to the server
Await.result(serverValuePromise.future, 1.second).foreach(t => println(s"tt: 
$t"))

I tried doing this the other way around - where the server side supplies 
source - but this caused me issues with actually shutting down the socket. 
Having the client do it seems to make shutting down the socket on 
completion of the source, just naturally occur.  The problem with the 
server side providing the source was that the client source needed to 
finish "properly".  If I created it as `empty` then it would kill things 
too quickly.  If I then created it as a n Actor source that just didn't do 
anything, I couldn't find a decent way to close it.

There's gotta be a better way to do this, but I'm too much of a noob to see 
it.  Can anyone improve this code for me?

Thanks,
Derek



-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to