I am starting to work with akka-streams and have a problem to solve which
seems easy but I'm having trouble with it.
I have a Source[String, Unit] and the strings can be quite large. I'd like
to split them into chunks of some maximum size such as 4k, transmit them
and reassemble them at the other end.
The first thought i had was to create a Flow which:
- assigns a unique reassembly key of type String, to each original
String,
- outputs a Stream of (String, String), elements of which are
understood as (reassemblyKey, chunk)
Then create a Flow which does the reasembly. So I coded these as follows
before having serious doubts about them:
import akka.stream.scaladsl.{ Flow, FlowGraph, MergePreferred, Source }
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.atomic.AtomicLong
object WithStringChunking {
private val counter = new AtomicLong(0L)
/**
* Factory for a string-chunking flow.
*
* @param maxChunkSize the maximum size of chunk
* @return string chunking flow to be connected to a `Source[String]`
*/
def apply(maxChunkSize: Int): Flow[String, (String, String), Unit] = {
val uniqueReassemblyKey = counter.getAndIncrement().toString
Flow[String].mapConcat { s =>
s.grouped(maxChunkSize).map((uniqueReassemblyKey, _)).toList
}
}
}
import scala.concurrent.ExecutionContext
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object WithStringUnchunking {
/**
* Factory for a string-unchunking flow.
*
* @param maxChunkSize the maximum size of chunk
* @return string chunking flow to be connected to a `Source[(String,
String)]`
* where the string pairs consist of:
* - reassemblyKey (all related chunks have the same
*
* BUG: I think this will buffer too much
* Need to take advantage of adjacency
*/
def apply()
(implicit materializer: Materializer, ec: ExecutionContext)
: Flow[(String, String), String, Unit]
= {
val result = Flow[(String, String)]
// Group together the chunks with the same re-assembly key
.groupBy { case (key, chunk) => key }
.mapAsync(1) { case (_, pairsSource) =>
// Discard the keys.
val chunksSource: Source[String, Unit] = pairsSource.map { _._2 }
// Concatenate the chunks
for {
chunks <- chunksSource.grouped(1000000).runWith(Sink.head)
} yield chunks.fold(""){ _ + _ }
}
result
}
}
This code compiles but I think the unchunking part will always buffer "too
much", since most messages will be much less than 8 million characters.
I really want the unchunking part to:
- read and buffer chunks until the reassembly key changes
- concatenate the chunks and emit the original message
How to express this using the akks-streams API?
Or maybe a more tractable encoding would be convert the original
Source[String, Unit] to another Source[String, Unit] which consists of:
- a string which when interpreted as an integer is understood to be N,
the number of chunks to follow which are to be reassembled to a single
message
- the N chunks
- ....etc
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.