Hi Johannes,
On 08/25/2015 05:25 PM, Johannes Rudolph wrote:
Hi Simon,
I think there are two conceptual difficulties you need to tackle:
The first is the problem which you describe with infinite / finite
streams which is actually more one of the "traditional" (= actor
based) push-style asynchronous programming versus the "new" [*]
pull-style of reactive/akka streams which was introduced to deal with
backpressure. The issue with backpressure is that it only works if all
components take part in it. If you have one component that opts-out of
backpressure it will have to fail or drop elements if it becomes
overloaded and this component will become the weakest link (or the
"Sollbruchstelle") of your application under load. Akka currently
supports `Source.actorRef` (and `Sink.actorRef` respectively) which
does exactly this translation from a push-style Actor API to the
pull-style streams API. You usually don't want to use them as they
will be limited by design and bound to fail under load.
Pull-style means that you need to write your program so that it is
completely passive and waits for demand (you could also call that
style "reactive", you setup your program to passively wait for a
source to provide elements and then react to them). Writing "passive"
programs is perfectly suited to services that follow the request /
response principle. You setup your handler as a flow and just put it
between the Source[Request] / Sink[Response].
But what does it mean for a client program which usually actively
tries to achieve something? I think you can also build such a program
in a passive style: if it doesn't take any dynamic input it is easy as
you can create all the sources and sinks from static data. If it does
take dynamic input (like user input), you just need a Source of that
user input that only prompts the user for more input if there's
demand. It should be possible to structure a program like this but it
will be a pervasive change that cannot be recommended in all cases.
So, in reality for client applications you will probably use things
like the brittle `Source.actorRef` and just statically configure the
size of the buffers and queues to be big enough for the common use
cases. (You could say that `Source.actorRef` is not more brittle than
the JVM itself which you also need to configure with a maximum heap
size.) In any case using streams will force you to think about these
kind of issues.
While I read about the concepts, I was not really aware of them. Thanks
for clarification on that area.
The second difficulty is a shortcoming in your description (IMO)
regarding your notion of "reusing a connection" that is also uncovered
by your use of streams. Look at what this line means:
val resp =
Source(byteString).via(tcpFlow).runFold(ByteString.empty)(_++_)
It says, "open a TCP connection, stream the source byteString to the
connection, read all data *until the connection closed by the other
side* and return this data". So, the end of the response is determined
by looking for the end of the TCP stream. To be able to reuse a
connection you will need a different end-of-response marker than the
signal that TCP connection has been closed. You will need some framing
protocol on top of TCP that can discern where one response ends and
the next one starts and implement a streaming parser for that. You
would start by implementing a
def requestRenderer: Flow[Request, ByteString]
and a
def responseParser: Flow[ByteString, Response]
Between those you can put the tcp connection:
def pipeline: Flow[ByteString, ByteString] =
Flow[Request].via(requestRenderer).via(Tcp.outgoingConnection).via(responseParser)
Now you still have the problem how to interface that Flow.(And maybe
that is what all your question is about). If you can structure your
program like hinted above then you could create a
// prompts user for more input
def userInput: Source[UserInput]
and a
def userInputParser: Flow[UserInput, Request]
and a
def output: Sink[Response]
so you could finally create and run your program as
userInput.via(userInputParser).via(pipeline).to(output).run()
(If you are into functional programming, that may be actually very
similar to how you would have structured your program in any case).
For the rest of us, it would be nice if we could wrap the `pipeline`
above with something to either get a function `Request =>
Future[Response]` or an ActorRef to which requests could be sent and
which would send back a message after the response was received.
Unfortunately, The Right Solution (TM) for that case is still missing.
It would be nice if there was a a one-to-one Flow wrapper in
akka-stream that would do exactly this translation but unfortunately
there currently isn't one readily available. You can build such a
component yourself (Mathias actually built a specific solution for
akka-http to implement `Http.singleRequest()` which has exactly the
same problem).
So, how you can build something like that? Here is a sketch:
class RequestResponseActor extends Actor {
val pipelineActor =
Source.actorRef[Request].via(pipeline).to(Sink.actorRef(self)).run()
// should return the actorRef for the Source.actorRef
def receive = {
case req: Request =>
register(req, sender) // put request and sender ref at the end
of a FIFO data structure
pipelineActor ! req
case res: Response =>
val (req, originalSender) = unregister() // gets original
request and sender from the head of the FIFO data structure
originalSender ! req
// what happens on error? what on premature closing of the
connection? etc.
}
}
All of this is based on the premise that your framing protocol and the
semantics of the service you are talking to are using a
request/response style (like HTTP with HTTP pipelining enabled) where
requests are answered with responses in a FIFO manner. Also, in the
sketch I skimmed over a lot of configuration and subtle semantic
details you may have to consider (this is another reason there's no
such shrink-wrapped component in akka-stream).
Does that answer most of your question?
Yes, this was tremendously useful. You described exactly my use case and
a way how I could achieve what I want. Big thank you!
For now it solved all of my problems. Actually it was more an answer to
my other question:
https://groups.google.com/forum/#!topic/akka-user/GviQjB08rS0
Not creating multiple connections also solved my problem that some data
was lost, even though I don't understand where the problem was. I guess
data was sent to the wrong connection or something like this.
There are still a few open questions but since but nothing that blocks
me, therefore I will look into that when the turn into problems. With
the knowledge on how to feed the streams, how to get the values out of
there and how to connect the subcomponents I should be less helpless.
The only thing I didn't understand was this part:
Source.actorRef(1, OverflowStrategy.fail)
When I replace the 1 with a 0 (which is allowed according to the
documentation) I get this error message: Dropping element because there
is no downstream demand
Why do I get it? I expected that since there is a client which awaits
for data that I don't need a cache. Maybe it is because the client
(connected over TCP) is not a reactive stream, i.e. didn't tell
beforehand that it awaits data? Anyway, 1 seems to be enough, even for
100s of requests.
Thanks again for all your help!
Simon
This became quite a long answer but it also covers a lot of stuff :)
HTH
Johannes
[*] Of course, there's not too much conceptually new here. E.g. UNIX
shell pipes and filters are very similar to the whole reactive streams
concept (but constrained to byte streams): you have a buffer that can
be asynchronously written to from one side and read from on the other
side. The reader must poll if no data is currently available while the
writer must poll while the buffer is full. Demand is signalled over
the capacity of the shared buffer. Similar for TCP where demand is
exchanged by notifying the peer of the capacity of the receive buffer.
Etc.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the
Google Groups "Akka User List" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/akka-user/qpqWePkADwU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
[email protected]
<mailto:[email protected]>.
To post to this group, send email to [email protected]
<mailto:[email protected]>.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.