I am starting to work with akka-streams and have a problem to solve which 
seems easy but I'm having trouble with it.

I have a Source[String, Unit] and the strings can be quite large. I'd like 
to split them into chunks of some maximum size such as 4k, transmit them 
and reassemble them at the other end.

The first thought i had was to create a Flow which:

   - assigns a unique reassembly key of type String, to each original 
   String,
   - outputs a Stream of  (String, String), elements of which are 
   understood as (reassemblyKey, chunk)

Then create a Flow which does the reasembly. So I coded these as follows 
before having serious doubts about them:


import akka.stream.scaladsl.{ Flow, FlowGraph, MergePreferred, Source }
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.atomic.AtomicLong

object WithStringChunking {

  private val counter = new AtomicLong(0L)

  /**
   * Factory for a string-chunking flow.
   *
   * @param maxChunkSize the maximum size of chunk
   * @return string chunking flow to be connected to a `Source[String]`
   */
  def apply(maxChunkSize: Int): Flow[String, (String, String), Unit] = {
    val uniqueReassemblyKey = counter.getAndIncrement().toString

    Flow[String].mapConcat { s =>
      s.grouped(maxChunkSize).map((uniqueReassemblyKey, _)).toList
    }
  }
}


import scala.concurrent.ExecutionContext

import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

object WithStringUnchunking {

  /**
   * Factory for a string-unchunking flow.
   *
   * @param maxChunkSize the maximum size of chunk
   * @return string chunking flow to be connected to a `Source[(String, 
String)]`
   * where the string pairs consist of:
   *   - reassemblyKey (all related chunks have the same
   *
   * BUG: I think this will buffer too much
   * Need to take advantage of adjacency
   */
  def apply()
           (implicit materializer: Materializer, ec: ExecutionContext)
  : Flow[(String, String), String, Unit]
  = {
    val result = Flow[(String, String)]
      // Group together the chunks with the same re-assembly key
      .groupBy { case (key, chunk) => key }
      .mapAsync(1) { case (_, pairsSource) =>
        // Discard the keys.
        val chunksSource: Source[String, Unit] = pairsSource.map { _._2 }

        // Concatenate the chunks
        for {
          chunks <- chunksSource.grouped(1000000).runWith(Sink.head)
        } yield chunks.fold(""){ _ + _ }
      }
    result
  }
}


This code compiles but I think the unchunking part will always buffer "too 
much", since most messages will be much less than 8 million characters.


I really want the unchunking part to:

   - read and buffer chunks until the reassembly key changes
   - concatenate the chunks and emit the original message

How to express this using the akks-streams API? 


Or maybe a more tractable encoding would be convert the original 
Source[String, Unit] to another Source[String, Unit] which consists of:

   - a string which when interpreted as an integer is understood to be N, 
   the number of chunks to follow which are to be reassembled to a single 
   message
   - the N chunks
   - ....etc
   

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to