Each incoming websocket request materializes a new Flow. For each item
within each of those Flows you create and run a new Flow (i.e. the dump
method). That Flow then ends up in the sink created by FileIO.toFile.
Reading the documentation for that method: "Overwrites existing files by
default", so each message sent via WS will create the files anew each time.
You need to create your flow such that it is continuous, i.e. items flow
all the way down to the sinks in a single flow. Rather than dump within the
collect, you can branch off the flow using alsoTo - something like
(untested):
Flow[Message].collect {
case TextMessage.Strict(text) => (text, longToIP(id))
}
.alsoTo(fileSink)
.map(pair => TextMessage(s"I got your message: ${pair._1}!")
Then fileSink looks like:
val fileSink =
Flow[(String, String)]
.map(s => ByteString(s + "\n")) // do something to extract the data you
want here
.to(FileIO.toFile(new File(filename)))
You can add in additional steps to write to other files, either using the
builder DSL as you did, or using alsoTo again.
This gets around the issue for multiple messages on one WS connection, but
a second WS connection will again overwrite the file. Two solutions that
spring to mind for that:
1. Use a sink that appends rather than overwrites the file (though note
that depending on volume of requests you could well end up with contention
on the file)
2. Use a MergeHub to route the requests from the websockets into a single
Flow which then writes to the files.
Julian
On Saturday, April 1, 2017 at 8:39:43 AM UTC+1, Madabhattula Rajesh Kumar
wrote:
>
> Hi,
>
> I need to read messages from clients(web socket clients) and persist these
> messages into two files. But, messages are overriding to previous messages.
> Every time only last message is present in the file.
>
> How to append the messages. Please help me to resolve this issue.
>
> *Code :- *
>
> def route(implicit actorSystem: ActorSystem, materializer: Materializer):
> Route =
> pathPrefix("ssys" / LongNumber) { id =>
> parameter('name) { name ⇒
> handleWebSocketMessages(broadcast(id, name))
> }
> }
> def broadcast(id: Long, name: String): Flow[Message, Message, Any] = {
> Flow[Message].collect {
> case TextMessage.Strict(text) =>
> dump(text, longToIP(id))
> TextMessage(s"I got your message: $text!")
> }
> }
>
> def dump(text: String, id: String ) {
> val value1 = Source.single(text, id)
> val sink1 = lineSink("/Users/log/data/test1.txt")
> val sink2 = lineSink("/Users/log/data/test2.txt")
> val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
> import GraphDSL.Implicits._
> val bcast = b.add(Broadcast[String](2))
> value1.map(x => x) ~> bcast.in
> bcast.out(0) ~> sink1
> bcast.out(1) ~> sink2
> ClosedShape
> })
> g.run()
>
> }
>
> def longToIP(long: Long): String = {
> (0 until 4).map(a => long / math.pow(256, a).floor.toInt %
> 256).reverse.mkString(".")
> }
>
> def lineSink(filename: String): Sink[String, Future[IOResult]] = {
> Flow[String]
> .map(s => ByteString(s + "\n"))
> .toMat(FileIO.toFile(new File(filename)))(Keep.right)
> }
>
> Regards,
> Rajesh
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.