Hi

If it can help someone in scala you can do this like this

val future : Future[NetworkResponse] =
 Source.single(req)
 .log("Start Http")
 .map(req => (HttpRequest(HttpMethods.GET, Uri(req.url)), req))
 .log("Map to httpRequest")
 .map(httpReq => httpReq._1 -> (httpReq._2.id, httpReq._2.start))
 .log("Map to request")
 .via(connectionPool)
 .log("after connection pool")
 .map(r => (r._1.get.entity.dataBytes.runWith(Sink.head), r._2))
 .log("after reading response")
 .mapAsync(10)(r => r._1.map(bytes => NetworkResponse(r._2._1, r._2._2, 
bytes.decodeString("UTF-8"))))
 .log("after map async")
 .runWith(Sink.head)
future pipeTo sender

It start the flow for each source as single 
run the flow via the connectionPool
Then map to response that read the data from the http connection
Finally mapAsync send back a future on any structure that you want with the 
data inside


On Saturday, January 30, 2016 at 6:15:36 PM UTC+2, [email protected] wrote:
>
> I ended up creating a flow with flatMapConcat:
> With this flow the bytes of the response get accumulated to a single 
> ByteString  
> for example:
>
> final Flow<Pair, ByteString, BoxedUnit> bytestringFlow = 
> Flow.of(Pair.class).flatMapConcat((Pair pair) -> {
>    Try<HttpResponse> responseTry = (Try<HttpResponse>) pair.first();
>    Source<ByteString, Object> dataBytes = 
> responseTry.get().entity().getDataBytes();
>    return dataBytes;
> });
>
>
> Am Mittwoch, 27. Januar 2016 15:13:32 UTC+1 schrieb Richard Grossman:
>>
>> Hi
>>
>> I need to exactly the same I must get the response on http call into my 
>> flow.
>> I see that you think this tickets can solve your problem is it true ?
>>
>> Thanks
>>
>> On Tuesday, August 4, 2015 at 6:45:07 PM UTC+3, [email protected] wrote:
>>>
>>> i think I need to wait for https://github.com/akka/akka/issues/15089
>>>
>>> Am Donnerstag, 30. Juli 2015 08:18:18 UTC+2 schrieb [email protected]:
>>>>
>>>> // this is part of a BidiFlow
>>>>
>>>> FlowShape<Tuple2<Try<HttpResponse>, RequestResult>,
>>>>       Tuple2<ByteString, Object>>
>>>>       bottom =
>>>>       b.graph(Flow.<Tuple2<Try<HttpResponse>, Object>>empty().
>>>>             mapAsync(4, pair ->
>>>>                         getEntityBytes(pair._1().get(), pair._2(), 
>>>> materializer)
>>>>             ).map((pair) -> new Tuple2<>(pair._1(), pair._2())));
>>>>
>>>>
>>>>
>>>> static Future<Tuple2<ByteString, RequestResult>>
>>>>    getEntityBytes( final HttpResponse response,
>>>>                    final Object requestResult,
>>>>                    final ActorMaterializer materializer) {
>>>>
>>>>    return response.entity().getDataBytes().runFold(
>>>>          new Tuple2(ByteString.empty(),requestResult),
>>>>          (aggr, next) -> new Tuple2(aggr._1().concat(next),aggr._2()), 
>>>> materializer);
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> What looks a little funny to me is that I need to pass a materializer 
>>>> to the inner flow?
>>>>
>>>> I am a little unsure because the docs Modularity, Composition and 
>>>> Hierarchy state:
>>>> "It is rarely useful to embed a closed graph shape in a larger graph"
>>>>
>>>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to