Hi Richard, unfortunately my scala is very bad. (Actually I learning scala
by looking at the akka source and this list)
But If I am right, the line
.map(r => (r._1.get.entity.dataBytes.runWith(Sink.head), r._2)
uses an implicit materializer. So your are aggregating the whole ByteBuffer
in memory.
Hacker √ complained about this in the 4'th Mail in this thread.
Pleas forgive if I am wrong!
Many Greetings John
Am Dienstag, 2. Februar 2016 10:41:07 UTC+1 schrieb Richard Grossman:
>
> Hi
>
> If it can help someone in scala you can do this like this
>
> val future : Future[NetworkResponse] =
> Source.single(req)
> .log("Start Http")
> .map(req => (HttpRequest(HttpMethods.GET, Uri(req.url)), req))
> .log("Map to httpRequest")
> .map(httpReq => httpReq._1 -> (httpReq._2.id, httpReq._2.start))
> .log("Map to request")
> .via(connectionPool)
> .log("after connection pool")
> .map(r => (r._1.get.entity.dataBytes.runWith(Sink.head), r._2))
> .log("after reading response")
> .mapAsync(10)(r => r._1.map(bytes => NetworkResponse(r._2._1, r._2._2,
> bytes.decodeString("UTF-8"))))
> .log("after map async")
> .runWith(Sink.head)
> future pipeTo sender
>
> It start the flow for each source as single
> run the flow via the connectionPool
> Then map to response that read the data from the http connection
> Finally mapAsync send back a future on any structure that you want with
> the data inside
>
>
> On Saturday, January 30, 2016 at 6:15:36 PM UTC+2, [email protected]
> wrote:
>>
>> I ended up creating a flow with flatMapConcat:
>> With this flow the bytes of the response get accumulated to a single
>> ByteString
>> for example:
>>
>> final Flow<Pair, ByteString, BoxedUnit> bytestringFlow =
>> Flow.of(Pair.class).flatMapConcat((Pair pair) -> {
>> Try<HttpResponse> responseTry = (Try<HttpResponse>) pair.first();
>> Source<ByteString, Object> dataBytes =
>> responseTry.get().entity().getDataBytes();
>> return dataBytes;
>> });
>>
>>
>> Am Mittwoch, 27. Januar 2016 15:13:32 UTC+1 schrieb Richard Grossman:
>>>
>>> Hi
>>>
>>> I need to exactly the same I must get the response on http call into my
>>> flow.
>>> I see that you think this tickets can solve your problem is it true ?
>>>
>>> Thanks
>>>
>>> On Tuesday, August 4, 2015 at 6:45:07 PM UTC+3, [email protected]
>>> wrote:
>>>>
>>>> i think I need to wait for https://github.com/akka/akka/issues/15089
>>>>
>>>> Am Donnerstag, 30. Juli 2015 08:18:18 UTC+2 schrieb [email protected]:
>>>>>
>>>>> // this is part of a BidiFlow
>>>>>
>>>>> FlowShape<Tuple2<Try<HttpResponse>, RequestResult>,
>>>>> Tuple2<ByteString, Object>>
>>>>> bottom =
>>>>> b.graph(Flow.<Tuple2<Try<HttpResponse>, Object>>empty().
>>>>> mapAsync(4, pair ->
>>>>> getEntityBytes(pair._1().get(), pair._2(),
>>>>> materializer)
>>>>> ).map((pair) -> new Tuple2<>(pair._1(), pair._2())));
>>>>>
>>>>>
>>>>>
>>>>> static Future<Tuple2<ByteString, RequestResult>>
>>>>> getEntityBytes( final HttpResponse response,
>>>>> final Object requestResult,
>>>>> final ActorMaterializer materializer) {
>>>>>
>>>>> return response.entity().getDataBytes().runFold(
>>>>> new Tuple2(ByteString.empty(),requestResult),
>>>>> (aggr, next) -> new Tuple2(aggr._1().concat(next),aggr._2()),
>>>>> materializer);
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> What looks a little funny to me is that I need to pass a materializer
>>>>> to the inner flow?
>>>>>
>>>>> I am a little unsure because the docs Modularity, Composition and
>>>>> Hierarchy state:
>>>>> "It is rarely useful to embed a closed graph shape in a larger graph"
>>>>>
>>>>>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.