hAkkers,

I've been unable to *grok <http://en.wikipedia.org/wiki/Grok>* how to 
communicate with a TCP socket using akka-stream and StreamTcp extension. 
 At this point, I'm not sure the fault is entirely mine. :)  I'm building a 
MongoDB 
driver <https://github.com/reactific/RxMongo> that uses Akka and I have it 
working well with akka.io. Mongo requires asynchronous reading and writing 
on a TCP socket. You can write requests to it as they happen and you read 
responses as they can be satisfied by the server. Requests and responses 
are matched with an ID number (i.e. each response indicates the request ID 
to which it responds). This seems to be an ideal candidate for 
akka-streams, at least on the surface. I'm now trying to transition my 
design to use akka-streams and StreamTcp. After several days of fumbling 
around, I'm still not able to grasp how to connect all the pieces. So, I'm 
hoping the group can help and that this might be instructive for users of 
akka-stream, or at least shine some light on needed documentation or 
features. 

Just to address the obligatory: 

   - I've read the akka-stream (1.0-M3) documentation, many times, every 
   page. 
   - I've looked at the akka-stream code and discovered that without some 
   sort of internal design document, much of it will be unintelligible because 
   I don't have a conceptual model for how the pieces fit together 
   (essentially a forest/trees issue).
   - I've read the (insufficient, IMO) API documentation. 
   - I've built, tried and studied the TcpEcho sample program. 

That sample TcpEcho program is the source of most of my misunderstanding as 
it is the only sample that relates to what I'm doing and I cannot 
extrapolate from it to do what I want to do.  Here's the program, from the 
documentation 
<http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M3/scala/stream-io.html#Connecting__REPL_Client>
:


val connection: OutgoingConnection = StreamTcp().outgoingConnection(
localhost)
 
val replParser = new PushStage[String, ByteString] {
override def onPush(elem: String, ctx: Context[ByteString]): Directive = {
elem match {
case "q" ⇒ ctx.pushAndFinish(ByteString("BYE\n"))
case _ ⇒ ctx.push(ByteString(s"$elem\n"))
}
}
}
 
val repl = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(text => println("Server: " + text))
.map(_ => readLine("> "))
.transform(() ⇒ replParser)
 
connection.handleWith(repl)


Here are the things that I find confusing:

   - The repl value is defined by invoking Flow[ByteString]. Now, I know 
   the API well enough to know that Flow requires two type parameters: 
   Flow[+In,-Out] 
   
<http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/?_ga=1.17864249.1697328887.1413587984#akka.stream.scaladsl.Flow$>
   . This is confusing because the Flow companion object's apply method 
   takes only one type argument which it expands by duplicating. So 
   Flow[ByteString] actually instantiates a Flow[ByteString,ByteString]. I 
   only note this because it took some digging around in the API before I 
   understood how this worked and while it is handy, it is also not straight 
   forward. 
   - The documentation implies that a Flow is uni-directional 
   
<http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M3/scala/stream-flows-and-basics.html#Defining_and_running_streams>.
 
   It says a Flow "connects its up- and downstreams by transforming the 
   data elements flowing through it." That, to me, says "unidirectional". The 
   use of a Flow[ByteString,ByteString] for the repl value indicates to me 
   that a uni-directional "transformation" from ByteString to ByteString is 
    occurring and yet this code implies that it is doing both reading and 
   writing to the socket (i.e. it is bi-directional). How can that be? 
   - I see repl as a Flow that does this: takes a ByteString as input, 
   chunks it into \n terminated lines up to 256 bytes, prints those lines out 
   prefixed by "Server: " and then discards that input and replaces it with a 
   line read from the console which is then output with a newline appended 
   unless the input was "q" in which case it is replaced by "BYE\n" and a 
   termination signal. Okay, that's all great and it is all unidirectional 
   writing data to the socket. So, now the questions:
      - Where is the reading from the server to get the original line(s) as 
      input to the flow? I.e. where is the Source[ByteString]?
      - Assuming that connection.handleWith(repl)does some magic to set up 
      the Source, how does the conversation get started? Is it assumed the 
server 
      will send some data upon connection? The echo server example 
      
<http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M3/scala/stream-io.html#Accepting_connections__Echo_Server>
 
      seems to have the same issue! 
      - FYI: OutgoingConnection.handleWith's documentation 
      
<http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/?_ga=1.17864249.1697328887.1413587984#akka.stream.scaladsl.StreamTcp$$OutgoingConnection>
 
      says this:
   
"Handles the connection using the given flow. This method can be called 
> several times, every call will materialize the given flow exactly once 
> thereby triggering a new connection attempt to the remoteAddress. If the 
> connection cannot be established the materialized stream will immediately 
> be terminated with a akka.stream.StreamTcpException 
> <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/akka/stream/StreamTcpException.html>
> ."


   - It would be nice if there was a little more description around 
      "Handles the connection". Handles it how? Does it set up a Source and 
Sink? 
      Are asynchronous bi-directional reading from the socket and writing to 
the 
      socket implied?
   - Am I to infer from all this that the StreamTcp.outgoingConnection 
   creates a Source[ByteString] from the socket's input and a 
   Sink[ByteString] for the socket's output and that the flow provided to 
   handleWith is run between these two? In other words, the 
   OutgoingConnection can really only transform its input to its output? If 
   so, then:
      - How is that generally applicable? 
      - This approach works fine for an echo client, but clearly there are 
      protocols where the input and output can and should be processed 
      independently, aren't there?
      - How would one do what Mongo needs and have an asynch flow of 
      requests that is independent of an asynch flow of responses?
      - I noticed the Add Bi-directional Flow 
      <https://github.com/akka/akka/issues/16416> issue that is slated for 
      inclusion in 1.0-M4. Is this intended for solving this issue where two 
      related flows are paired to do bi-directional input/output? 
      - Am I just trying to implement my mongo driver before the required 
      features are ready?
   
The documentation says, about this code: 

A resilient REPL client would be more sophisticated than this, for example 
> it should split out the input reading into a separate mapAsync step and 
> have a way to let the server write more data than one ByteString chunk at 
> any given time, these improvements however are left as exercise for the 
> reader.


I would like a "resilient client" and I think leaving this part as an 
"exercise for the reader" is asking a bit much from the audience. We need 
an example of how to do this as it is likely the typical case not the 
exception (nobody needs another echo server/client). I suspect that the 
answer to my confusion lies in the information intended but not stated by 
this sentence from the documentation.

Specifically, I do not comprehend how mapAsync (or mapAsyncUnordered) help 
to split out the input reading because it is NOT obvious to me where this 
"input reading" is being done! If I used mapAsynch to obtain the request 
data from my driver's clients, it seems very obtuse to be setting up 
numerous Futures as opposed to just allowing them to give me a 
Source[Request] from which their requests are read and processed.

Any help you can provide to prevent me from drowning in these waters would 
be much appreciated!! 

Thanks,

Reid.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to