Hi Reid,

On Thu, Feb 12, 2015 at 5:56 PM, Reid Spencer <[email protected]> wrote:

> hAkkers,
>
> I've been unable to *grok <http://en.wikipedia.org/wiki/Grok>* how to
> communicate with a TCP socket using akka-stream and StreamTcp extension.
> At this point, I'm not sure the fault is entirely mine. :)
>

I can understand that many of the concepts might be alien at first. In our
defense, it is always hard to see what might be unclear for newcomers,
since we build the stuff and know it inside-out :) Your feedback is very
valuable in this regard.


>
>
>    - The repl value is defined by invoking Flow[ByteString]. Now, I know
>    the API well enough to know that Flow requires two type parameters:
>    Flow[+In,-Out]
>    
> <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/?_ga=1.17864249.1697328887.1413587984#akka.stream.scaladsl.Flow$>
>    . This is confusing because the Flow companion object's apply method
>    takes only one type argument which it expands by duplicating. So
>    Flow[ByteString] actually instantiates a Flow[ByteString,ByteString].
>    I only note this because it took some digging around in the API before I
>    understood how this worked and while it is handy, it is also not straight
>    forward.
>
>
This might need to be explained in the docs yes. But the logic is simple:
when you create a Flow by Flow[T] then what you get is an empty Flow that
just does nothing to the input, it passes down it unmodified (since the
transformation pipeline is empty). Obviously this means that you cannot say
Flow[Int, String] for an empty flow, since Int will never become a String
if there is no processing step there. Why this setup is needed is that
every step on Flow gives you back a new flow. So:

 val doNothing: Flow[Int, Int] = Flow[Int]
 val gimmeString: Flow[Int, String] = doNothing.map(_.toString)

The doNothing flow must have its two type parameters equal, because that
Flow just does not transform anything. On the other hand gimmeString has a
different type signature since a map stage is appended to the empty Flow.

Flow is simply a data structure for representing a set of transformations
(usually in a linear fashion but not necessarily)


>    - The documentation implies that a Flow is uni-directional
>    
> <http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M3/scala/stream-flows-and-basics.html#Defining_and_running_streams>.
>    It says a Flow "connects its up- and downstreams by transforming the
>    data elements flowing through it." That, to me, says "unidirectional". The
>    use of a Flow[ByteString,ByteString] for the repl value indicates to
>    me that a uni-directional "transformation" from ByteString to ByteString is
>     occurring and yet this code implies that it is doing both reading and
>    writing to the socket (i.e. it is bi-directional). How can that be?
>
> Well, the doc is a bit imprecise. Basically a Flow is a description of a
transformation that have exactly one input and output port. In *most* of
the cases this is simply a unidirectional flow of elements coming from the
input and passed down through output, but there are exceptions to this
rule. The docs simplified this issue and just refers to it as a
unidirectional element.

Now in the TCP example it *is* however unidirectional, since what it looks
like is:


  +->(inbytes)-->+
  |              |
[TCP]       [HandlerFlow]
  |              |
  +<-(outbytes)<-+

As you see the "HandlerFlow" is really just taking input bytes and
transforms them to output bytes.


>    - I see repl as a Flow that does this: takes a ByteString as input,
>    chunks it into \n terminated lines up to 256 bytes, prints those lines out
>    prefixed by "Server: " and then discards that input and replaces it with a
>    line read from the console which is then output with a newline appended
>    unless the input was "q" in which case it is replaced by "BYE\n" and a
>    termination signal. Okay, that's all great and it is all unidirectional
>    writing data to the socket. So, now the questions:
>       - Where is the reading from the server to get the original line(s)
>       as input to the flow? I.e. where is the Source[ByteString]?
>
> The source of confusion is that "handleWith" convenience method that we
added exactly to make things simpler. But what happens is actually:

def handleWith(handler: Flow[ByteString, ByteString])(implicit fm:
FlowMaterializer) =
   flow.join(handler).run()

Now you may rightly ask what is that flow there, well, OutgoingConnection
are nothing but a wrapper around a Flow[ByteString, ByteString], which you
can get via OutgoingConnection::flow.  That's right, a TCP connection is
not a Sink and a Source, but a Flow! (remember Flow is something that have
exactly one input and output port). Why is it a Flow and not a Sink+Source?
Simply because it is many times more convenient in client setups:

sourceOfRequests -> EncodePackets -> tcpFlowRepresentingRemoteService ->
DecodePackets -> sinkOfResponses

I.e. a TCP connection becomes a transformation step.

Now the only remaining mystery is flow.join(handler). This simply wires
together two flows by making a cycle: connecting flow1's output to flow2's
input, and connecting flow2's output to flow1's input.


>
>    - Assuming that connection.handleWith(repl)does some magic to set up
>       the Source, how does the conversation get started? Is it assumed the 
> server
>       will send some data upon connection? The echo server example
>       
> <http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M3/scala/stream-io.html#Accepting_connections__Echo_Server>
>       seems to have the same issue!
>
> Echo server does not need to start the conversation, because it just
echoes back whatever it has written. I.e. the echo "service flow" that you
attach to the TCP flow of the incoming connection just transforms each
incoming bytes to outgoing bytes without modification. If you need the
server side to start the conversation you can use the concatenation
operators to send an initial sequence of elements before sending anything
other.


>
>    - FYI: OutgoingConnection.handleWith's documentation
>       
> <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/?_ga=1.17864249.1697328887.1413587984#akka.stream.scaladsl.StreamTcp$$OutgoingConnection>
>       says this:
>
> "Handles the connection using the given flow. This method can be called
>> several times, every call will materialize the given flow exactly once
>> thereby triggering a new connection attempt to the remoteAddress. If the
>> connection cannot be established the materialized stream will immediately
>> be terminated with a akka.stream.StreamTcpException
>> <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/akka/stream/StreamTcpException.html>
>> ."
>
>
>    - It would be nice if there was a little more description around
>       "Handles the connection". Handles it how? Does it set up a Source and 
> Sink?
>       Are asynchronous bi-directional reading from the socket and writing to 
> the
>       socket implied?
>
>
The definition is the code snipped I pasted above: it wires the Flow's
input part to the incoming bytes port of TCP flow, and wires the Flow's
output port to the outgoing bytes port of TCP.


>
>    - Am I to infer from all this that the StreamTcp.outgoingConnection
>    creates a Source[ByteString] from the socket's input and a
>    Sink[ByteString] for the socket's output and that the flow provided to
>    handleWith is run between these two? In other words, the
>    OutgoingConnection can really only transform its input to its output?
>    If so, then:
>
> Yes, on a conceptual level this is true.


>
>    - How is that generally applicable?
>
> Every Server/Client is just a transformation between inputs and outputs
which is expressed as a protocol. The missing piece you are probably after
are graphs. A Flow (which, as I said is something with exactly one inpurt
and output port) can host actually a graph with complex routing in it,
calls to other parts of the application via mapAsync, or sending off
messages to actors.



>
>    - This approach works fine for an echo client, but clearly there are
>       protocols where the input and output can and should be processed
>       independently, aren't there?
>
> Yes, even Http is such protocol, even though it looks like a simple
request/reply protocol (but it is more horrible than that).



>
>    - How would one do what Mongo needs and have an asynch flow of
>       requests that is independent of an asynch flow of responses?
>
> This is too generic to answer. If you can decompose your problem we can
help with some of the subproblems.



>
>    - I noticed the Add Bi-directional Flow
>       <https://github.com/akka/akka/issues/16416> issue that is slated
>       for inclusion in 1.0-M4. Is this intended for solving this issue where 
> two
>       related flows are paired to do bi-directional input/output?
>
> Yes and no. It will basically introduce a new DSL element that has exactly
*two* input and *two* output ports. This is rather handy when working with
networking. SSL will be such a bidi-stage.


>
>    - Am I just trying to implement my mongo driver before the required
>       features are ready?
>
>
I recommend to wait for M4, we have some rewrite in progress. It will make
though many of the concepts simpler.


> The documentation says, about this code:
>
> A resilient REPL client would be more sophisticated than this, for example
>> it should split out the input reading into a separate mapAsync step and
>> have a way to let the server write more data than one ByteString chunk at
>> any given time, these improvements however are left as exercise for the
>> reader.
>
>
> I would like a "resilient client" and I think leaving this part as an
> "exercise for the reader" is asking a bit much from the audience. We need
> an example of how to do this as it is likely the typical case not the
> exception (nobody needs another echo server/client). I suspect that the
> answer to my confusion lies in the information intended but not stated by
> this sentence from the documentation.
>

The input reading here refers to reading from the console, i.e. it
expresses that simply using a map there is not optimal, we just didn't want
to complicate the example. For example you can have an actor that actually
does the console reading, and then use mapAsync together with the ask
pattern to communicate with that actor from within the stream. There will
be other options to factor out code and error handling. Please be patient
about this :)


>
> Specifically, I do not comprehend how mapAsync (or mapAsyncUnordered) help
> to split out the input reading because it is NOT obvious to me where this
> "input reading" is being done!
>

As I mentioned, this refers to the reading from console (i.e. user input).
Reading from the socket is behind the curtains, the chain of
transformations will grab the next TCP bytes as soon as they are ready for
processing again (for example a map finished mapping the previous element
and could pass it to the next stage)


> If I used mapAsynch to obtain the request data from my driver's clients,
> it seems very obtuse to be setting up numerous Futures as opposed to just
> allowing them to give me a Source[Request] from which their requests are
> read and processed.
>

I am not sure I got this. If you have actual small sized subproblems then
we can help.

-Endre


>
> Any help you can provide to prevent me from drowning in these waters would
> be much appreciated!!
>
> Thanks,
>
> Reid.
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to