Hi Simon,

I think there are two conceptual difficulties you need to tackle:

The first is the problem which you describe with infinite / finite streams 
which is actually more one of the "traditional" (= actor based) push-style 
asynchronous programming versus the "new" [*] pull-style of reactive/akka 
streams which was introduced to deal with backpressure. The issue with 
backpressure is that it only works if all components take part in it. If 
you have one component that opts-out of backpressure it will have to fail 
or drop elements if it becomes overloaded and this component will become 
the weakest link (or the "Sollbruchstelle") of your application under load. 
Akka currently supports `Source.actorRef` (and `Sink.actorRef` 
respectively) which does exactly this translation from a push-style Actor 
API to the pull-style streams API. You usually don't want to use them as 
they will be limited by design and bound to fail under load.

Pull-style means that you need to write your program so that it is 
completely passive and waits for demand (you could also call that style 
"reactive", you setup your program to passively wait for a source to 
provide elements and then react to them). Writing "passive" programs is 
perfectly suited to services that follow the request / response principle. 
You setup your handler as a flow and just put it between the 
Source[Request] / Sink[Response].

But what does it mean for a client program which usually actively tries to 
achieve something? I think you can also build such a program in a passive 
style: if it doesn't take any dynamic input it is easy as you can create 
all the sources and sinks from static data. If it does take dynamic input 
(like user input), you just need a Source of that user input that only 
prompts the user for more input if there's demand. It should be possible to 
structure a program like this but it will be a pervasive change that cannot 
be recommended in all cases.

So, in reality for client applications you will probably use things like 
the brittle `Source.actorRef` and just statically configure the size of the 
buffers and queues to be big enough for the common use cases. (You could 
say that `Source.actorRef` is not more brittle than the JVM itself which 
you also need to configure with a maximum heap size.) In any case using 
streams will force you to think about these kind of issues.

The second difficulty is a shortcoming in your description (IMO) regarding 
your notion of "reusing a connection" that is also uncovered by your use of 
streams. Look at what this line means:

    val resp = 
Source(byteString).via(tcpFlow).runFold(ByteString.empty)(_++_)

It says, "open a TCP connection, stream the source byteString to the 
connection, read all data *until the connection closed by the other side* 
and return this data". So, the end of the response is determined by looking 
for the end of the TCP stream. To be able to reuse a connection you will 
need a different end-of-response marker than the signal that TCP connection 
has been closed. You will need some framing protocol on top of TCP that can 
discern where one response ends and the next one starts and implement a 
streaming parser for that. You would start by implementing a

def requestRenderer: Flow[Request, ByteString]

and a

def responseParser: Flow[ByteString, Response]

Between those you can put the tcp connection:

def pipeline: Flow[ByteString, ByteString] = 
Flow[Request].via(requestRenderer).via(Tcp.outgoingConnection).via(responseParser)

Now you still have the problem how to interface that Flow.(And maybe that 
is what all your question is about). If you can structure your program like 
hinted above then you could create a

// prompts user for more input
def userInput: Source[UserInput]

and a 

def userInputParser: Flow[UserInput, Request]

and a

def output: Sink[Response]

so you could finally create and run your program as 

userInput.via(userInputParser).via(pipeline).to(output).run()

(If you are into functional programming, that may be actually very similar 
to how you would have structured your program in any case).

For the rest of us, it would be nice if we could wrap the `pipeline` above 
with something to either get a function `Request => Future[Response]` or an 
ActorRef to which requests could be sent and which would send back a 
message after the response was received. Unfortunately, The Right Solution 
(TM) for that case is still missing. It would be nice if there was a a 
one-to-one Flow wrapper in akka-stream that would do exactly this 
translation but unfortunately there currently isn't one readily available. 
You can build such a component yourself (Mathias actually built a specific 
solution for akka-http to implement `Http.singleRequest()` which has 
exactly the same problem).

So, how you can build something like that? Here is a sketch:

class RequestResponseActor extends Actor {
  val pipelineActor = 
Source.actorRef[Request].via(pipeline).to(Sink.actorRef(self)).run() // 
should return the actorRef for the Source.actorRef

  def receive = {
    case req: Request => 
      register(req, sender) // put request and sender ref at the end of a 
FIFO data structure
      pipelineActor ! req
    case res: Response =>
      val (req, originalSender) = unregister() // gets original request and 
sender from the head of the FIFO data structure
      originalSender ! req
    // what happens on error? what on premature closing of the connection? 
etc.
  }
}

All of this is based on the premise that your framing protocol and the 
semantics of the service you are talking to are using a request/response 
style (like HTTP with HTTP pipelining enabled) where requests are answered 
with responses in a FIFO manner. Also, in the sketch I skimmed over a lot 
of configuration and subtle semantic details you may have to consider (this 
is another reason there's no such shrink-wrapped component in akka-stream).

Does that answer most of your question?

This became quite a long answer but it also covers a lot of stuff :)

HTH
Johannes

[*] Of course, there's not too much conceptually new here. E.g. UNIX shell 
pipes and filters are very similar to the whole reactive streams concept 
(but constrained to byte streams): you have a buffer that can be 
asynchronously written to from one side and read from on the other side. 
The reader must poll if no data is currently available while the writer 
must poll while the buffer is full. Demand is signalled over the capacity 
of the shared buffer. Similar for TCP where demand is exchanged by 
notifying the peer of the capacity of the receive buffer. Etc.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to