In my particular application the messages I'm receiving a few kB in size. 
Sometimes they arrive as Strict and sometimes as Streaming, but they are 
small enough to be collected and processed as a single chunk. Here's a 
pipeline stage I'm using to handle both cases in an uniform way:

    







*val unframing: Flow[Message, String, NotUsed] =      
Flow.fromFunction[Message, Future[String]](_ match {        case 
TextMessage.Strict(text) =>          Future(text)        case 
TextMessage.Streamed(textStream) =>          textStream.runFold("")(_ + _)      
  case _ =>          throw new Exception("unexpected binary message")      
}).mapAsyncUnordered(1)(identity)*


This flow is attached to webSocketClientFlow output.

An alternative solution would be looking up websocket buffering settings 
and jacking it up enough to receive all messages as Strict :)

cheers,
Rafał

W dniu poniedziałek, 10 października 2016 12:43:20 UTC+2 użytkownik Narayan 
Kumar napisał:
>
> thanks  for quick reply rafal .
> Yes you hav mentioned right , I am getting the 'message does no arrive'  , 
> but not any exception .
>
> I have tried with  the TextMessage.Streamed , but no progress from here 
> also . Please find the code below : -
>
> case TextMessage.Streamed(stream) => {
>   stream
>     .limit(10000) // Max frames we are willing to wait for
>     .completionTimeout(50 seconds) // Max time until last frame
>     .runFold("")(_ + _) // Merges the frames
>     .flatMap { msg =>
>       logger.info("Getting streamed message " + msg)
>       val action = (parse(msg) \ (ACTION)).extractOpt[String]
>       if (action.isDefined) {
>         action.get match {
>           case UPDATE_USER_PROFILE =>
>             val userProfile = parse(msg).extractOpt[UserProfileJson].get
>             val cover = userProfile.cover
>             val picture = userProfile.picture
>             val uploadCoverFile = 
> FileUploadUtility.decodeBase64String(cover.get)
>             val uploadPictureFile = 
> FileUploadUtility.decodeBase64String(picture.get)
>             println("@@@@@@@@@@@@ 1 " + uploadCoverFile + " @@@@@@@@@@@  2 " 
> + uploadPictureFile)
>             Future(TextMessage("File Uploaded"))
>           case _ => Future(TextMessage(INVALID_ACTION))
>         }
>       } else {
>         Future(TextMessage(INVALID_ACTION))
>       }
>     }
> }
>
>
>
> Also I cannot test the case  properly cause the web socket client gets 
> hang's every time I send the large file Json.,
> and using the above code also we are not getting any response back . 
> Please can you suggest any possible way to handle the scenarios.
>
>
> Thanks in advance , waiting for positive early reply :) .
>
>
>
> On Monday, October 10, 2016 at 3:12:36 PM UTC+5:30, Rafał Krzewski wrote:
>>
>> Please elaborate on "unable to handle it" -- are you getting an 
>> exception, message does no arrive, something other?
>> Also it would be helpful if you showed your code for TextMessage.Streamed 
>> case because that's how large messages would show up. 
>> I don't know the specifics but there appears to be a buffer in Akka 
>> WebSockets client code: if the incoming message fits into this buffer in 
>> full it's sent to the client code as Strict message, but when the message 
>> is too large to fit in the buffer Akka switches to streaming mode: Streamed 
>> message is sent to client code, carrying a stream that will deliver the 
>> message contents in buffer-sized chunks.
>>
>> Cheers,
>> aRafał
>>
>>
>> W dniu poniedziałek, 10 października 2016 08:21:14 UTC+2 użytkownik 
>> Narayan Kumar napisał:
>>>
>>>
>>>
>>> On Friday, October 7, 2016 at 7:52:03 PM UTC+5:30, √ wrote:
>>>>
>>>> Why are you assuming that it is a Strict message?
>>>>
>>>> On Fri, Oct 7, 2016 at 2:11 PM, Narayan Kumar <nar...@knoldus.com> 
>>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>> Actually i was trying to handle a Web Socket message for base64 
>>>>> encoded string of 10mb file.but unable to handle it.
>>>>> is there any way to handle large message please suggest ?
>>>>>
>>>>> Here is the code:
>>>>>
>>>>> def mediaUploadHandler: Flow[Message, Message, _] = {
>>>>>     val (accountSource, accountQueue) = sourceQueue
>>>>>     Flow[Message]
>>>>>       .collect {
>>>>>         case TextMessage.Strict(txt) ⇒ {
>>>>>               logger.info(s"${phoneNumber}: Got the request. Now 
>>>>> redirecting to account api !!!!!!!!")
>>>>>               val userProfile = parse(txt).extractOpt[UserProfileJson]
>>>>>               println("user profile is ", userProfile)
>>>>>             }
>>>>>         
>>>>>         case _ => TextMessage(INVALID_ACTION)
>>>>>       }
>>>>>       .via(connectSource(accountSource)) // ... and route them through 
>>>>> the receiveNotification ...
>>>>>       .map {
>>>>>       case msg: String ⇒ {
>>>>>         info(s"Huhh !! Why I am getting this message ${msg}")
>>>>>         TextMessage.Strict(msg)
>>>>>       }
>>>>>     }
>>>>>   }
>>>>>
>>>>>
>>>>> Thanks in advance!
>>>>>
>>>>> -- 
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: 
>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: 
>>>>> https://groups.google.com/group/akka-user
>>>>> --- 
>>>>> You received this message because you are subscribed to the Google 
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>>> an email to akka-user+...@googlegroups.com.
>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>
>>>>
>>>> -- 
>>>> Cheers,
>>>> √
>>>>
>>>
>>>
>>>
>>> Actually i have applied both approach of akka-http websocket 
>>> "TextMessage.Strict(txt)" and "TextMessage.Streamed(stream)",but both 
>>> approach didn't work.
>>> is there is another approach to handle it please suggest ?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>  
>>>
>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to