Source.single completes the stream (closes the connection) once it has
signalled the data.
Simply put a Source.maybe behind it (a.concat(b)) or use a different
source, like unfold or an iterator).

-- 
Konrad `ktoso` Malawski
Akka <http://akka.io> @ Lightbend <http://lightbend.com>

On 1 August 2016 at 11:24:51, murtuza chhil (chil...@gmail.com) wrote:

Hi Johan,

Thank you very much. Yes indeed stream + tcp is hurting me for sure and I
have tried reading the docs and sample code over and over.

So yes, I made some progress. Please ignore the verbosity of the code, I
just need to familiarize myself with return values and look at the source
of those easily via eclipse.


        final ActorSystem system = ActorSystem.create("Client");
        final ActorMaterializer mat = ActorMaterializer.create(system);

        Tcp tcp = Tcp.get(system);
        Flow<ByteString, ByteString,
CompletionStage<OutgoingConnection>> outgoingConnection = tcp
                .outgoingConnection("127.0.0.1", 6000);

        Source<ByteString, NotUsed> clientSource = Source.empty();

        CompletionStage<Done> cs = clientSource.via(outgoingConnection)
                .runWith(Sink.ignore(), mat);

        cs.exceptionally(ex -> {
            ex.printStackTrace();
            return null;
        });

        Source<ByteString, NotUsed> clientSource1 = Source.single(ByteString
                .fromString("Chhil"));

        NotUsed var = clientSource1.via(outgoingConnection).to(Sink.ignore())
                .run(mat);

​

Managed to get a connection made usingSource.empty()​ for the initial
connect.
The CompletionStage kicks in nicely. You mentioned I should be able to get
CompletionStage<OutgoingConnection> to determine problems with the
connection, this I don't know how or when it gets returned when the graph
is run. Please do correct me if understanding is wrong.
I use the materialized value from the connect and that works fine, as I
don't see a new connect after the initial connect with the Source.empty().
I assume this is the right way to get the initial connection and then
change the source for regular communication.

*Now the problem I have run into is, that after sending data, the channel
disconnects. What must I do to keep it open?*

-chhil


On Monday, August 1, 2016 at 12:31:08 PM UTC+5:30, Johan Andrén wrote:
>
> Hi Chhil,
>
> The flow returned by outgoingConnection materializes into a
> CompletionStage<OutgoingConnection> which will be failed if the
> connection fails, so that is where you can implement your error handling
> for the initial connection. You would have to keep the materialized value
> when you construct the flow though, so that it is returned from run().
>
> You can read more about materialized values here:
> http://doc.akka.io/docs/akka/2.4/java/stream/stream-composition.html#materialized-values
> and TCP with streams here:
> http://doc.akka.io/docs/akka/2.4/java/stream/stream-io.html#stream-io-java
>
>
> In general, read as much of the streams docs as you can to get a good
> understanding of how Akka Streams work, trial and error with streams + tcp
> will hurt!
>
> --
> Johan
> Akka Team
>
> On Monday, August 1, 2016 at 7:20:23 AM UTC+2, murtuza chhil wrote:
>>
>> Hello,
>>
>> I am a newbie trying to understand Akka Streams.
>>
>> Coming from regular socket programming I am trying to mimic a client
>> connection and have the following runnable graph.
>>
>>         Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> 
>> flow = Tcp
>>                 .get(system).outgoingConnection("127.0.0.1", 6000);
>>         Source<ByteString, NotUsed> clientSource = Source.single(ByteString
>>                 .fromString("XYZ"));
>>
>>         clientSource.via(flow).to(Sink.ignore()).run(mat);
>>
>> ​
>>
>> This works fine when used with the samples in the doc, where the source
>> is attached that has a list and there is a server listening and we run the
>> source->flow->sink through a materializer.
>>
>> But this is not a typical use case, typically one would attempt to
>> connect to a server,  and either it connects or fails. If connected send
>> data that will be made available and if connection fails, have the ability
>> to backoff.
>>
>>
>> Currently I don't know how to access the error when a server is not
>> available and want to establish a connection before data is available.
>> Don't know how to prevent connections every time data is sent, I
>> understand why it happens, but don't know how to reuse a materialized flow
>> that has already connected.
>> What I am trying to get my head around is how does one run a graph
>> without any source data and get a completable future that tells me if it
>> succeeded or not.
>> Then I want to use the same connection to send data later using the
>> source.
>>
>> I have attempted with a runfold and that give me the akka TCP Exception
>> when the server is not availble, but I simply want to forward the data from
>> the source without manipulation.
>>
>> There was another thread that I had started
>> https://groups.google.com/d/msg/akka-user/1ztizMy9FnI/U9toR-cOBgAJ,
>> would appreciate if someone responds to it.
>>
>> Any pointers or snippets would be greatly appreciated. Yes I am not used
>> to this paradigm of programming but trying to figure it out.
>>
>> -chhil
>>
>> --
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to