The parameter to mapAsync is how many currently in progress are allowed.

I'd probably write it as:

val finish = source.via(connectionFlow).mapAsync(1){

  case response if response.status == StatusCodes.OK =>
response.entity.dataBytes.map(println).runForeach(identity)

  case _ => Future.failed(new IllegalArgumentException("wrong"))
}.runForeach(identity)


On Fri, Jun 5, 2015 at 4:36 PM, Prog <programad...@gmail.com> wrote:

> I have done this code which seems to work, is this the right way to do
> what you said? How the parameter of the mapAsync influence the process of
> it?
>
>
> val finish = source.via(connectionFlow).mapAsync(1){ response =>
>   if (response.status == StatusCodes.OK) {
>     val futureInner: Future[Unit] = response.entity.dataBytes.map( data =>
>       println(data)
>     ).runForeach(r => r)
>     futureInner
>   } else{
>     Future.failed(new IllegalArgumentException("wrong"))
>   }
> }.runForeach({resp => resp})
>
>
> finish.onComplete({
>
>   case Success(resp) => {
>     println("---- out succes: " + resp)
>   }
>   case Failure(error) => {
>     println("---- out failure")
>   }
>   case _ => {
>     println("---- out error")
>   }
> })
>
>
>
> On Friday, June 5, 2015 at 2:49:28 PM UTC+2, √ wrote:
>>
>> If you want to break the outer stream on the first failure of the inner
>> stream then I recommend using mapAsync and have the inner stream return a
>> future in the inner stream.
>>
>> On Fri, Jun 5, 2015 at 2:39 PM, Prog <progra...@gmail.com> wrote:
>>
>>> My point is imagine you have a large file (>1GB) received by that stream
>>> (I don't know if the file come as a chunked message or not ), even if the
>>> file was not complete you get the response in the outer stream that it was
>>> successful . So I could not send the outer future to some other function to
>>> do some tasks if the file was downloaded successful or truncated by
>>> failure, as it would send a false positive. Is there any way I could
>>> aggregate the futures in the inner stream and break the outer stream in
>>> case of failure,  as I have in this case a dependency relation between the
>>> inner and outer streams?
>>>
>>>
>>> On Friday, June 5, 2015 at 1:42:05 PM UTC+2, √ wrote:
>>>>
>>>> My point is that if there are multiple elements in the substream then
>>>> there could be multiple failures, multiple successes or a combination
>>>> thereof. How would you represent  that in a single result?
>>>>
>>>> On Fri, Jun 5, 2015 at 12:27 PM, Prog <progra...@gmail.com> wrote:
>>>>
>>>>> I would like to have the same failure as I get in the inner onComplete
>>>>>
>>>>> akka.http.scaladsl.model.EntityStreamException: Entity stream
>>>>> truncation
>>>>>
>>>>>
>>>>> As the stream was not complete I think there is no reason the outer
>>>>> onComplete finish with success.
>>>>>
>>>>>
>>>>>
>>>>> On Friday, June 5, 2015 at 12:22:10 PM UTC+2, √ wrote:
>>>>>>
>>>>>> What behavior would you want when there are multiple elements in the
>>>>>> first source? What would the value in the outer onComplete be?
>>>>>>
>>>>>> On Fri, Jun 5, 2015 at 12:17 PM, Prog <progra...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have the code bellow and I have tried many different variations of
>>>>>>> it with mapAsync in the place of forEach and goes on, but for some 
>>>>>>> reason
>>>>>>> when I break the communication on the server side in the middle of the
>>>>>>> transfer I get the error in the inside Sink.onComplete, but the error 
>>>>>>> it is
>>>>>>> not reflected to the outer Sink.onComplete. So I get in the console:
>>>>>>>  ---- in error
>>>>>>>  ---  out success
>>>>>>>
>>>>>>> I would like to have a Failure in both Sink.onComplete. How can I
>>>>>>> achieve that?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  val conn: Flow[HttpRequest, HttpResponse, 
>>>>>>> Future[Http.OutgoingConnection]] =
>>>>>>>   Http().outgoingConnection(host)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  val finish =  Source.single(HttpRequest(uri = 
>>>>>>> uri)).via(conn).runForeach({ response =>
>>>>>>>   if (response.status == StatusCodes.OK) {
>>>>>>>     response.entity.dataBytes.map { data =>
>>>>>>>       println(data)
>>>>>>>     }.runWith(Sink.onComplete({
>>>>>>>       case Success(resp) => {
>>>>>>>         println("---- in success")
>>>>>>>         }
>>>>>>>       case Failure(error) => {
>>>>>>>         println("---- in failure")
>>>>>>>
>>>>>>>       }
>>>>>>>       case _ => {
>>>>>>>         println("---- in error")
>>>>>>>       }
>>>>>>>     }))
>>>>>>>   } else
>>>>>>>    { println("status"+ response.status)}
>>>>>>> })
>>>>>>>
>>>>>>>
>>>>>>> finish.onComplete({
>>>>>>>   case Success(resp) => {
>>>>>>>     println("---- out succes: " + resp)
>>>>>>>   }
>>>>>>>   case Failure(error) => {
>>>>>>>     println("---- out failure")
>>>>>>>   }
>>>>>>>   case _ => {
>>>>>>>     println("---- out error")
>>>>>>>   }
>>>>>>> })
>>>>>>>
>>>>>>>
>>>>>>>  --
>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>> >>>>>>>>>> Check the FAQ:
>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>> >>>>>>>>>> Search the archives:
>>>>>>> https://groups.google.com/group/akka-user
>>>>>>> ---
>>>>>>> You received this message because you are subscribed to the Google
>>>>>>> Groups "Akka User List" group.
>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>> send an email to akka-user+...@googlegroups.com.
>>>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> √
>>>>>>
>>>>>  --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ:
>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives:
>>>>> https://groups.google.com/group/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to akka-user+...@googlegroups.com.
>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> √
>>>>
>>>  --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Cheers,
>> √
>>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to