Hi Endre,

Thanks for the quick response. My thoughts inline, below ...

> On Feb 13, 2015, at 6:40 AM, Endre Varga <[email protected]> wrote:
> 
> Hi Reid,
> 
> On Thu, Feb 12, 2015 at 5:56 PM, Reid Spencer <[email protected] 
> <mailto:[email protected]>> wrote:
> hAkkers,
> 
> I've been unable to grok <http://en.wikipedia.org/wiki/Grok> how to 
> communicate with a TCP socket using akka-stream and StreamTcp extension.  At 
> this point, I'm not sure the fault is entirely mine. :)
> 
> I can understand that many of the concepts might be alien at first. In our 
> defense, it is always hard to see what might be unclear for newcomers, since 
> we build the stuff and know it inside-out :) Your feedback is very valuable 
> in this regard.

I’ve been on your side of things many times and so I’m not bashful about 
telling you where I’m confused. Hopefully it leads to better design/doc for 
everyone in the long run.

> The repl value is defined by invoking Flow[ByteString]. Now, I know the API 
> well enough to know that Flow requires two type parameters: Flow[+In,-Out] 
> <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/?_ga=1.17864249.1697328887.1413587984#akka.stream.scaladsl.Flow$>.
>  This is confusing because the Flow companion object's apply method takes 
> only one type argument which it expands by duplicating. So Flow[ByteString] 
> actually instantiates a Flow[ByteString,ByteString]. I only note this because 
> it took some digging around in the API before I understood how this worked 
> and while it is handy, it is also not straight forward. 
> 
> This might need to be explained in the docs yes. But the logic is simple: 
> when you create a Flow by Flow[T] then what you get is an empty Flow that 
> just does nothing to the input, it passes down it unmodified (since the 
> transformation pipeline is empty). Obviously this means that you cannot say 
> Flow[Int, String] for an empty flow, since Int will never become a String if 
> there is no processing step there. Why this setup is needed is that every 
> step on Flow gives you back a new flow. So:
> 
>  val doNothing: Flow[Int, Int] = Flow[Int]
>  val gimmeString: Flow[Int, String] = doNothing.map(_.toString)
> 
> The doNothing flow must have its two type parameters equal, because that Flow 
> just does not transform anything. On the other hand gimmeString has a 
> different type signature since a map stage is appended to the empty Flow.

Thank you for that. I completely missed the “transform nothing” aspect of this 
apply function. It would be useful to mention that this is where to start in 
building a flow or even flow graph. I see that is just exactly how it is used 
in the example but the simplicity of that was lost on me. 

> 
> Flow is simply a data structure for representing a set of transformations 
> (usually in a linear fashion but not necessarily)
> 
> The documentation implies that a Flow is uni-directional 
> <http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M3/scala/stream-flows-and-basics.html#Defining_and_running_streams>.
>  It says a Flow "connects its up- and downstreams by transforming the data 
> elements flowing through it." That, to me, says "unidirectional". The use of 
> a Flow[ByteString,ByteString] for the repl value indicates to me that a 
> uni-directional "transformation" from ByteString to ByteString is  occurring 
> and yet this code implies that it is doing both reading and writing to the 
> socket (i.e. it is bi-directional). How can that be? 
> Well, the doc is a bit imprecise. Basically a Flow is a description of a 
> transformation that have exactly one input and output port. In *most* of the 
> cases this is simply a unidirectional flow of elements coming from the input 
> and passed down through output, but there are exceptions to this rule. The 
> docs simplified this issue and just refers to it as a unidirectional element.
> 
> Now in the TCP example it *is* however unidirectional, since what it looks 
> like is:
> 
>    
>   +->(inbytes)-->+
>   |              |
> [TCP]       [HandlerFlow]
>   |              |
>   +<-(outbytes)<-+
> 
> As you see the "HandlerFlow" is really just taking input bytes and transforms 
> them to output bytes. 

Ah hah! :)

That little diagram needs to make its way to the documentation, please :)

> 
> I see repl as a Flow that does this: takes a ByteString as input, chunks it 
> into \n terminated lines up to 256 bytes, prints those lines out prefixed by 
> "Server: " and then discards that input and replaces it with a line read from 
> the console which is then output with a newline appended unless the input was 
> "q" in which case it is replaced by "BYE\n" and a termination signal. Okay, 
> that's all great and it is all unidirectional writing data to the socket. So, 
> now the questions:
> Where is the reading from the server to get the original line(s) as input to 
> the flow? I.e. where is the Source[ByteString]?
> The source of confusion is that "handleWith" convenience method that we added 
> exactly to make things simpler. But what happens is actually:
> 
> def handleWith(handler: Flow[ByteString, ByteString])(implicit fm: 
> FlowMaterializer) =
>    flow.join(handler).run()

Yes, I saw this in the code and thought I understood it as joining the handler 
to the flow, but ….
> 
> Now you may rightly ask what is that flow there, well, OutgoingConnection are 
> nothing but a wrapper around a Flow[ByteString, ByteString], which you can 
> get via OutgoingConnection::flow.  That's right, a TCP connection is not a 
> Sink and a Source, but a Flow! (remember Flow is something that have exactly 
> one input and output port). Why is it a Flow and not a Sink+Source? Simply 
> because it is many times more convenient in client setups:
> 
> sourceOfRequests -> EncodePackets -> tcpFlowRepresentingRemoteService -> 
> DecodePackets -> sinkOfResponses
> 
> I.e. a TCP connection becomes a transformation step. 

I can see, now, why you think of it this way. I think I need to stop thinking 
of Flow as analogous to a Unix pipe. It really is what the doc says: a 
transformation between some input and some output even if that’s in a cycle or 
graph, not just a uni-directional linear flow.

> 
> Now the only remaining mystery is flow.join(handler). This simply wires 
> together two flows by making a cycle: connecting flow1's output to flow2's 
> input, and connecting flow2's output to flow1's input.

That part was non-obvious. I didn’t realize the cyclic nature of “join” here. 

>  
> Assuming that connection.handleWith(repl)does some magic to set up the 
> Source, how does the conversation get started? Is it assumed the server will 
> send some data upon connection? The echo server example 
> <http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M3/scala/stream-io.html#Accepting_connections__Echo_Server>
>  seems to have the same issue! 
> Echo server does not need to start the conversation, because it just echoes 
> back whatever it has written. I.e. the echo "service flow" that you attach to 
> the TCP flow of the incoming connection just transforms each incoming bytes 
> to outgoing bytes without modification. If you need the server side to start 
> the conversation you can use the concatenation operators to send an initial 
> sequence of elements before sending anything other.

Okay, fair enough. I can see now that echo service is likely one of the 
simplest use cases for Flows as it just does a simple turnaround in the 
“HandlerFlow"

> FYI: OutgoingConnection.handleWith's documentation 
> <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/?_ga=1.17864249.1697328887.1413587984#akka.stream.scaladsl.StreamTcp$$OutgoingConnection>
>  says this:
> "Handles the connection using the given flow. This method can be called 
> several times, every call will materialize the given flow exactly once 
> thereby triggering a new connection attempt to the remoteAddress. If the 
> connection cannot be established the materialized stream will immediately be 
> terminated with a akka.stream.StreamTcpException 
> <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/akka/stream/StreamTcpException.html>."
> It would be nice if there was a little more description around "Handles the 
> connection". Handles it how? Does it set up a Source and Sink? Are 
> asynchronous bi-directional reading from the socket and writing to the socket 
> implied?
> 
> The definition is the code snipped I pasted above: it wires the Flow's input 
> part to the incoming bytes port of TCP flow, and wires the Flow's output port 
> to the outgoing bytes port of TCP.
>  
> Am I to infer from all this that the StreamTcp.outgoingConnection creates a 
> Source[ByteString] from the socket's input and a Sink[ByteString] for the 
> socket's output and that the flow provided to handleWith is run between these 
> two? In other words, the OutgoingConnection can really only transform its 
> input to its output? If so, then:
> Yes, on a conceptual level this is true.
>  
> How is that generally applicable? 
> Every Server/Client is just a transformation between inputs and outputs which 
> is expressed as a protocol.

I suspected this but it didn’t seem intuitive to me. I suppose this is true at 
some level of abstraction, it just isn’t the way I’m used to thinking about it 
or dealing with I/O (at a much lower level). Not seeing how the socket is read 
and written, using a Flow via StreamTcp seems a bit like “smoke and mirrors” 
(magic) to me. Perhaps this is the mental shift I need to make to use 
StreamTcp.  

> The missing piece you are probably after are graphs. A Flow (which, as I said 
> is something with exactly one inpurt and output port) can host actually a 
> graph with complex routing in it, calls to other parts of the application via 
> mapAsync, or sending off messages to actors.

Yes, Eric’s response to my message gave me a clue that graphs are what is 
needed. I’m going to have to study that part of the documentation in more 
detail and figure out how to build the necessary graph to do the kind of 
asynchronous request/response cycles I’m used to thinking about. 
> This approach works fine for an echo client, but clearly there are protocols 
> where the input and output can and should be processed independently, aren't 
> there?
> Yes, even Http is such protocol, even though it looks like a simple 
> request/reply protocol (but it is more horrible than that).

Yes, unfortunately :) 
> How would one do what Mongo needs and have an asynch flow of requests that is 
> independent of an asynch flow of responses?
> This is too generic to answer. If you can decompose your problem we can help 
> with some of the subproblems.

Yes, I wasn’t asking for help with Mongo specifically, but just how to separate 
the input and output when dealing with a Flow. However, you’ve confirmed that 
graphs and mapAsync are tools for making “calls to other parts of the 
application” so I’m going to work on that over the weekend and see if I can’t 
get all the pieces working. If a smaller subproblem falls out of that, you’ll 
hear from me. OTOH, if I get to some level of success than I’ll be in a better 
position to contribute an example of how to use TcpStream in a bit more 
complicated example than the echo client.

> I noticed the Add Bi-directional Flow 
> <https://github.com/akka/akka/issues/16416> issue that is slated for 
> inclusion in 1.0-M4. Is this intended for solving this issue where two 
> related flows are paired to do bi-directional input/output? 
> Yes and no. It will basically introduce a new DSL element that has exactly 
> *two* input and *two* output ports. This is rather handy when working with 
> networking. SSL will be such a bidi-stage.
>  
> Am I just trying to implement my mongo driver before the required features 
> are ready?
> 
> I recommend to wait for M4, we have some rewrite in progress. It will make 
> though many of the concepts simpler.

Okay. Simplicity is good at this point :) Perhaps I’ll just keep building on 
top of my Akka IO solution for now and re-attempt the conversion to akka-stream 
when M4 is available.  I imagine M4 is some weeks away? Do you have an ETA?
>  
> The documentation says, about this code: 
> 
> A resilient REPL client would be more sophisticated than this, for example it 
> should split out the input reading into a separate mapAsync step and have a 
> way to let the server write more data than one ByteString chunk at any given 
> time, these improvements however are left as exercise for the reader.
> 
> I would like a "resilient client" and I think leaving this part as an 
> "exercise for the reader" is asking a bit much from the audience. We need an 
> example of how to do this as it is likely the typical case not the exception 
> (nobody needs another echo server/client). I suspect that the answer to my 
> confusion lies in the information intended but not stated by this sentence 
> from the documentation.
> 
> The input reading here refers to reading from the console, i.e. it expresses 
> that simply using a map there is not optimal, we just didn't want to 
> complicate the example.

Okay, I had assumed it meant reading the input from the socket. 

> For example you can have an actor that actually does the console reading, and 
> then use mapAsync together with the ask pattern to communicate with that 
> actor from within the stream. There will be other options to factor out code 
> and error handling. Please be patient about this :)

As I said above, I’m going to have to look into more details about graphs and 
mapAsync. I already realized that “akka-streams alone” won’t be enough and have 
my use of it wrapped in an actor.  Glad to hear there will be other options.  
May I assume that the M4 changes will leave the Flow API relatively unchanged? 
That is, mapAsync will still be there and function the same way? I’d hate to 
invest a lot of time in stuff that’s going to be tossed. 
>  
> 
> Specifically, I do not comprehend how mapAsync (or mapAsyncUnordered) help to 
> split out the input reading because it is NOT obvious to me where this "input 
> reading" is being done!
> 
> As I mentioned, this refers to the reading from console (i.e. user input). 
> Reading from the socket is behind the curtains, the chain of transformations 
> will grab the next TCP bytes as soon as they are ready for processing again 
> (for example a map finished mapping the previous element and could pass it to 
> the next stage)

Got it. Thank you. 

>  
> If I used mapAsynch to obtain the request data from my driver's clients, it 
> seems very obtuse to be setting up numerous Futures as opposed to just 
> allowing them to give me a Source[Request] from which their requests are read 
> and processed.
> 
> I am not sure I got this. If you have actual small sized subproblems then we 
> can help.

I think it is a reflection of my misunderstanding about the nature of a Flow. 
What I considered obtuse may be quite natural in Flowville :)  I’m going to 
have to learn more about graphs and integrating actors into a Flow before 
continuing to comment. If a “small sized” subproblem drops out, I’ll be sure to 
let you know.

Thanks much for your help, Endre. It is truly appreciated. 

> 
> -Endre
>  
> 
> Any help you can provide to prevent me from drowning in these waters would be 
> much appreciated!! 
> 
> Thanks,
> 
> Reid.
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
> >>>>>>>>>> <https://groups.google.com/group/akka-user>
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected] 
> <mailto:[email protected]>.
> To post to this group, send email to [email protected] 
> <mailto:[email protected]>.
> Visit this group at http://groups.google.com/group/akka-user 
> <http://groups.google.com/group/akka-user>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.
> 
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
> >>>>>>>>>> <https://groups.google.com/group/akka-user>
> --- 
> You received this message because you are subscribed to a topic in the Google 
> Groups "Akka User List" group.
> To unsubscribe from this topic, visit 
> https://groups.google.com/d/topic/akka-user/BQhnmseCyN0/unsubscribe 
> <https://groups.google.com/d/topic/akka-user/BQhnmseCyN0/unsubscribe>.
> To unsubscribe from this group and all its topics, send an email to 
> [email protected] 
> <mailto:[email protected]>.
> To post to this group, send email to [email protected] 
> <mailto:[email protected]>.
> Visit this group at http://groups.google.com/group/akka-user 
> <http://groups.google.com/group/akka-user>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to