Hi Konrad, The Source<ByteString, CompletableFuture<Optional<ByteString>>> clientSource = Source.<ByteString>maybe(); does the trick, don't see a disconnect on the netcat server send. Also figured, I shouldn't be running the graph multiple times once to connect and the second time to send data. Once its run, any data fed by the source will be sent out. This was the reason I was seeing multiple connections.
Thank you. Appreciate the help. -chhil On Monday, August 1, 2016 at 2:57:47 PM UTC+5:30, Konrad Malawski wrote: > > Source.single completes the stream (closes the connection) once it has > signalled the data. > Simply put a Source.maybe behind it (a.concat(b)) or use a different > source, like unfold or an iterator). > > -- > Konrad `ktoso` Malawski > Akka <http://akka.io> @ Lightbend <http://lightbend.com> > > On 1 August 2016 at 11:24:51, murtuza chhil ([email protected] > <javascript:>) wrote: > > Hi Johan, > > Thank you very much. Yes indeed stream + tcp is hurting me for sure and I > have tried reading the docs and sample code over and over. > > So yes, I made some progress. Please ignore the verbosity of the code, I > just need to familiarize myself with return values and look at the source > of those easily via eclipse. > > > final ActorSystem system = ActorSystem.create("Client"); > final ActorMaterializer mat = ActorMaterializer.create(system); > > Tcp tcp = Tcp.get(system); > Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> > outgoingConnection = tcp > .outgoingConnection("127.0.0.1", 6000); > > Source<ByteString, NotUsed> clientSource = Source.empty(); > > CompletionStage<Done> cs = clientSource.via(outgoingConnection) > .runWith(Sink.ignore(), mat); > > cs.exceptionally(ex -> { > ex.printStackTrace(); > return null; > }); > > Source<ByteString, NotUsed> clientSource1 = Source.single(ByteString > .fromString("Chhil")); > > NotUsed var = clientSource1.via(outgoingConnection).to(Sink.ignore()) > .run(mat); > > > > Managed to get a connection made usingSource.empty() for the initial > connect. > The CompletionStage kicks in nicely. You mentioned I should be able to get > CompletionStage<OutgoingConnection> to determine problems with the > connection, this I don't know how or when it gets returned when the graph > is run. Please do correct me if understanding is wrong. > I use the materialized value from the connect and that works fine, as I > don't see a new connect after the initial connect with the Source.empty(). > I assume this is the right way to get the initial connection and then > change the source for regular communication. > > *Now the problem I have run into is, that after sending data, the channel > disconnects. What must I do to keep it open?* > > -chhil > > > On Monday, August 1, 2016 at 12:31:08 PM UTC+5:30, Johan Andrén wrote: >> >> Hi Chhil, >> >> The flow returned by outgoingConnection materializes into a >> CompletionStage<OutgoingConnection> which will be failed if the >> connection fails, so that is where you can implement your error handling >> for the initial connection. You would have to keep the materialized value >> when you construct the flow though, so that it is returned from run(). >> >> You can read more about materialized values here: >> http://doc.akka.io/docs/akka/2.4/java/stream/stream-composition.html#materialized-values >> >> and TCP with streams here: >> http://doc.akka.io/docs/akka/2.4/java/stream/stream-io.html#stream-io-java >> >> >> In general, read as much of the streams docs as you can to get a good >> understanding of how Akka Streams work, trial and error with streams + tcp >> will hurt! >> >> -- >> Johan >> Akka Team >> >> On Monday, August 1, 2016 at 7:20:23 AM UTC+2, murtuza chhil wrote: >>> >>> Hello, >>> >>> I am a newbie trying to understand Akka Streams. >>> >>> Coming from regular socket programming I am trying to mimic a client >>> connection and have the following runnable graph. >>> >>> Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> >>> flow = Tcp >>> .get(system).outgoingConnection("127.0.0.1", 6000); >>> Source<ByteString, NotUsed> clientSource = Source.single(ByteString >>> .fromString("XYZ")); >>> >>> clientSource.via(flow).to(Sink.ignore()).run(mat); >>> >>> >>> >>> This works fine when used with the samples in the doc, where the source >>> is attached that has a list and there is a server listening and we run the >>> source->flow->sink through a materializer. >>> >>> But this is not a typical use case, typically one would attempt to >>> connect to a server, and either it connects or fails. If connected send >>> data that will be made available and if connection fails, have the ability >>> to backoff. >>> >>> >>> Currently I don't know how to access the error when a server is not >>> available and want to establish a connection before data is available. >>> Don't know how to prevent connections every time data is sent, I >>> understand why it happens, but don't know how to reuse a materialized flow >>> that has already connected. >>> What I am trying to get my head around is how does one run a graph >>> without any source data and get a completable future that tells me if it >>> succeeded or not. >>> Then I want to use the same connection to send data later using the >>> source. >>> >>> I have attempted with a runfold and that give me the akka TCP Exception >>> when the server is not availble, but I simply want to forward the data from >>> the source without manipulation. >>> >>> There was another thread that I had started >>> https://groups.google.com/d/msg/akka-user/1ztizMy9FnI/U9toR-cOBgAJ, >>> would appreciate if someone responds to it. >>> >>> Any pointers or snippets would be greatly appreciated. Yes I am not used >>> to this paradigm of programming but trying to figure it out. >>> >>> -chhil >>> >>> -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] <javascript:>. > To post to this group, send email to [email protected] > <javascript:>. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
