Hi there,
I actually implemented something like a “reconnect” a number of times (most
of them ending up in overcomplication though…).

The style we ended up with is indeed an Actor which encapsulates the
connection handling, and gets notified if the stream is completed
(i.e. .toMat(Sink.onComplete
{ case _ => self ! Disconnected })(Keep.left), and then it creates the flow
a-new simply). The upside of this is that it can easily handle “wait a bit
before reconnecting”, since you can use the Actor’s access to the system
scheduler.

You may need to fan-out explicitly into the completion-signal-emitting-part
and the data-signalling-part (just use Broadcast :-)).
​

On Tue, Sep 8, 2015 at 3:17 PM, אדם חונן <[email protected]> wrote:

> I'm OK with lost data - I assume messages can be lost anyway, even without
> disconnection, so the API I want to provide will not guarantee message
> delivery and users will be aware of that, and that they are actually
> sending asynchronous messages.
>
> I was also thinking that a possible alternative might be to create the
> Akka Streams' graph within a stateful object (an Actor probably) which will
> be responsible of recreating the graph when it breaks (preferably depending
> on reason, of course).
> Is there a way to do that?
> Basically I need something similar to Actor's watch so I know when my flow
> has terminated.
>
> I'm not sure what's the mechanism that can be used for this - a call to
> recover at the end of the flow, a materialized future value, something
> else...
>
> On Tue, Sep 8, 2015 at 2:05 PM, Akka Team <[email protected]> wrote:
>
>> Hi Adam,
>>
>> This is not a trivial task to be honest, and I cannot really give you a
>> nice way to use it. The difference from Rx Observables is that RS
>> implements explicit backpressure (i.e. instead of blocking it uses explicit
>> asynchronous signals) and therefore makes all processing stage inherently
>> stateful (even if the processing stage itself implements only a simple
>> stateless transformation like map or filter).
>>
>> This means that a layout like this:
>>
>> tcpWriter -> tcpConnection -> tcpReader
>>
>> has to deal with
>>  - stopping the tcpConnection without the writer and reader realizing it
>>  - take the backpressure state of the tcpConnection (pending demand,
>> buffered elements)
>>  - create a new connection, and initialize it with the backpressure state
>> of the previous connection
>>
>> This is not simple to do but I hope we can provide such functionality
>> soon. You have to prepare though for lost data (this is independent of Rx
>> or RS) since there might been bytes in the TCP send buffer in the kernel at
>> the point when the connection was lost.
>>
>> -Endre
>>
>> On Sun, Sep 6, 2015 at 12:25 PM, Adam <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> I've been looking for a way to create a client that upon disconnection
>>> will try to reconnect.
>>> I've done this in the past with RxJava observables, but I'm not sure how
>>> to do this using Akka Streams.
>>>
>>> I saw some code examples where PushStage is being used to implement
>>> this, but the code samples are old (the API has changed) and I'm not sure
>>> exactly how to migrate them correctly.
>>> In the code examples, it seems like past versions of the API allowed
>>> outgoing TCP connections to be handled using a function (by calling
>>> handleWith), which means a lazy approach could be used.
>>> I *think* this is no longer possible.
>>>
>>> So I figured I can get the same effect with an approach similar to what
>>> I've used in the past with Rx observables - create a Source of TCP outgoing
>>> connection flows.
>>> Essentially what I need is similar to a theoretic Source.repeat[T](gen:
>>> ()=>T).
>>> There's no such method on Source, so I implemented this using
>>> ActorPublisher.
>>> I plan to also have some delay between re-connection attempts with an
>>> exponential backoff and also support multiple target hosts, but for now I
>>> only want to immediately reconnect to the same address.
>>>
>>> If I didn't care about how many connections my client actually opens at
>>> the same time, I could say this approach works.
>>> In actuality I do, of course, care and I only want a single connection
>>> to be open at any given time.
>>> Unfortunately, the Request objects my ActorPublisher receives contain
>>> numbers larger than 1.
>>>
>>> Is there a way to force the downstream flow to only ask for a single
>>> item at a time?
>>> Alternatively, is there actually a better way to achieve my original
>>> goal (a TCP client that reconnects)?
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Akka Team
>> Typesafe - Reactive apps on the JVM
>> Blog: letitcrash.com
>> Twitter: @akkateam
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to a topic in the
>> Google Groups "Akka User List" group.
>> To unsubscribe from this topic, visit
>> https://groups.google.com/d/topic/akka-user/hBSiWAYKEiw/unsubscribe.
>> To unsubscribe from this group and all its topics, send an email to
>> [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
Konrad 'ktoso' Malawski
Akka <http://akka.io/> @ Typesafe <http://typesafe.com/>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to