Thanks Johan!
This looks promising, I have to see at what point we're able to upgrade but
should be possible soonish.
,
Johannes
On Thursday, April 7, 2016 at 6:37:02 PM UTC+3, Johan Andrén wrote:
>
> Hi Johannes,
>
> First I would recommend you to upgrade to the latest Akka Stream, 2.4.3 if
> possible, it contains many fixes and improvements over the version you seem
> to be using (OutputStreamSink is a private, internal API nowadays).
> The corresponding method in the newer versions is
> StreamConverters.fromOutputStream(creator) and it materializes into a
> CompletionStage[IOResult] which completes when the OutputStream is closed.
> To get the materialized value out of the stream you would do source.
>
> Secondly it will not be safe to use Piped Input/OutputStream as they
> require that usage is bound to one specific thread each, which the
> StreamConverters will not guarantee.
>
> This is how you would do something like what you are asking for with Akka
> 2.4.3:
>
> ActorSystem system = ActorSystem.create();
> Materializer mat = ActorMaterializer.create(system);
>
> CompletionStage<HttpResponse> responseFuture =
>
> Http.get(system).singleRequest(HttpRequest.create("http://example.com"), mat);
>
> CompletionStage<IOResult> done = responseFuture.thenCompose(response -> {
>
> Source<ByteString, Object> source = response.entity().getDataBytes();
>
> // note that it is not safe/correct to create the outputstream outside of
> the
> // lambda/creator given to fromOutputStream
> Sink<ByteString, CompletionStage<IOResult>> sink =
>
> StreamConverters.fromOutputStream(HttpClientExample::someApiReturningANewOutputStream);
>
> // just to make the type clear, ofc you can just return it
> CompletionStage<IOResult> completionStage = source.toMat(sink,
> Keep.right()).run(mat);
>
> return completionStage;
> });
>
>
> done.thenAccept((result) -> {
> if (result.wasSuccessful())
> System.out.println("Done, wrote " + result.getCount() + " bytes");
> else
> System.out.println("Failed: " + result.getError().getMessage());
> });
>
>
> I hope this helps
> --
> Johan Andrén
> Akka Team, Lightbend Inc.
>
>
> On Wednesday, March 30, 2016 at 11:51:13 AM UTC+2, Johannes Berg wrote:
>>
>> Hi!
>>
>> I have a streaming Akka HTTP client request that I would want to register
>> an oncomplete callback on (in addition to the streaming which is working
>> fine) which is called when the complete request-response is done. There
>> doesn't seem to be anything in the Akka HTTP API directly (only toStrict
>> method but I can't use that as I need to stream this) but I guess I can be
>> looking at when the Akka stream is completed, right? If the stream is
>> completed, is the full HTTP request-response completed then (all sockets
>> closed and other resources released)?
>>
>> I'm very new to Akka streams but how can I register an oncomplete
>> callback on an Akka stream in Java?
>>
>> This is roughly what I'm doing and I would want an oncomplete callback
>> when it's all done streaming.
>>
>> Future<HttpResponse> response_future = Http.get(system).singleRequest(
>> HttpRequest.create(url), am);
>> Future<Source<ByteString, ?>> source_future = response_future.map(new
>> Mapper<HttpResponse, Source<ByteString, ?>>() {
>> public Source<ByteString, ?> apply(HttpResponse response) {
>> return response.entity().getDataBytes();
>> }
>> }, system.dispatcher());
>>
>> source_future.flatMap(new Mapper<Source<ByteString, ?>, Future<String>>()
>> {
>> public Future<String> apply(Source<ByteString, ?> source) {
>> PipedOutputStream output = new PipedOutputStream();
>> InputStream input = new PipedInputStream(output);
>>
>> source.to(OutputStreamSink.create(new akka.japi.function.Creator<
>> OutputStream>() {
>> public OutputStream create() {
>> return output;
>> }
>> })).run(am);
>>
>> //doing stuff with input
>>
>> //how to register oncomplete callback?
>> source.onComplete ???
>> }
>> }, system.dispatcher());
>>
>> Anyone have any good suggestions?
>>
>> Thanks,
>> Johannes
>>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.