Hi Johannes,
First I would recommend you to upgrade to the latest Akka Stream, 2.4.3 if
possible, it contains many fixes and improvements over the version you seem
to be using (OutputStreamSink is a private, internal API nowadays).
The corresponding method in the newer versions is
StreamConverters.fromOutputStream(creator) and it materializes into a
CompletionStage[IOResult] which completes when the OutputStream is closed.
To get the materialized value out of the stream you would do source.
Secondly it will not be safe to use Piped Input/OutputStream as they
require that usage is bound to one specific thread each, which the
StreamConverters will not guarantee.
This is how you would do something like what you are asking for with Akka
2.4.3:
ActorSystem system = ActorSystem.create();
Materializer mat = ActorMaterializer.create(system);
CompletionStage<HttpResponse> responseFuture =
Http.get(system).singleRequest(HttpRequest.create("http://example.com"), mat);
CompletionStage<IOResult> done = responseFuture.thenCompose(response -> {
Source<ByteString, Object> source = response.entity().getDataBytes();
// note that it is not safe/correct to create the outputstream outside of
the
// lambda/creator given to fromOutputStream
Sink<ByteString, CompletionStage<IOResult>> sink =
StreamConverters.fromOutputStream(HttpClientExample::someApiReturningANewOutputStream);
// just to make the type clear, ofc you can just return it
CompletionStage<IOResult> completionStage = source.toMat(sink,
Keep.right()).run(mat);
return completionStage;
});
done.thenAccept((result) -> {
if (result.wasSuccessful())
System.out.println("Done, wrote " + result.getCount() + " bytes");
else
System.out.println("Failed: " + result.getError().getMessage());
});
I hope this helps
--
Johan Andrén
Akka Team, Lightbend Inc.
On Wednesday, March 30, 2016 at 11:51:13 AM UTC+2, Johannes Berg wrote:
>
> Hi!
>
> I have a streaming Akka HTTP client request that I would want to register
> an oncomplete callback on (in addition to the streaming which is working
> fine) which is called when the complete request-response is done. There
> doesn't seem to be anything in the Akka HTTP API directly (only toStrict
> method but I can't use that as I need to stream this) but I guess I can be
> looking at when the Akka stream is completed, right? If the stream is
> completed, is the full HTTP request-response completed then (all sockets
> closed and other resources released)?
>
> I'm very new to Akka streams but how can I register an oncomplete callback
> on an Akka stream in Java?
>
> This is roughly what I'm doing and I would want an oncomplete callback
> when it's all done streaming.
>
> Future<HttpResponse> response_future = Http.get(system).singleRequest(
> HttpRequest.create(url), am);
> Future<Source<ByteString, ?>> source_future = response_future.map(new
> Mapper<HttpResponse, Source<ByteString, ?>>() {
> public Source<ByteString, ?> apply(HttpResponse response) {
> return response.entity().getDataBytes();
> }
> }, system.dispatcher());
>
> source_future.flatMap(new Mapper<Source<ByteString, ?>, Future<String>>()
> {
> public Future<String> apply(Source<ByteString, ?> source) {
> PipedOutputStream output = new PipedOutputStream();
> InputStream input = new PipedInputStream(output);
>
> source.to(OutputStreamSink.create(new akka.japi.function.Creator<
> OutputStream>() {
> public OutputStream create() {
> return output;
> }
> })).run(am);
>
> //doing stuff with input
>
> //how to register oncomplete callback?
> source.onComplete ???
> }
> }, system.dispatcher());
>
> Anyone have any good suggestions?
>
> Thanks,
> Johannes
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.