My 2nd attempt. It type checks so it should work :-)

import scala.concurrent.ExecutionContext

import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import akka.stream.stage.Context
import akka.stream.stage.PushPullStage
import akka.stream.stage.SyncDirective

object WithStringUnchunking {

  type StringPair = (String, String)

  /**
   * Factory for a string-unchunking flow.
   *
   * @param maxChunkSize the maximum size of chunk
   * @return string chunking flow to be connected to a `Source[(String, 
String)]`
   * where the string pairs consist of:
   *   - reassemblyKey (all related chunks have the same key)
   *   - chunk
   */
  def apply(): Flow[(String, String), String, Unit]
  = Flow[(String, String)] transform( () => new Unchunker )

  private class Unchunker extends PushPullStage[StringPair, String] {

    private var currentKey: Option[String] = None
    private val currentMsg = new StringBuilder()

    override def onPush(pair: (String, String), ctx: Context[String]): 
SyncDirective = {
      val (key, chunk) = pair

      if (currentKey.isEmpty) {
        // Case: 1st time
        currentKey = Some(key)
        currentMsg ++= chunk
        ctx.pull()
      } else if (currentKey == Some(key)) {
        // Case: new chunk for current message.
        currentMsg ++= chunk
        ctx.pull()
      } else {
        // Case: chunk for next message arrived. Push out previous message.
        val result = ctx.push(currentMsg.mkString)
        currentMsg.clear()

        currentKey = Some(key)
        currentMsg ++= chunk
        result
      }
    }

    override def onPull(ctx: Context[String]) : SyncDirective =
      if (!ctx.isFinishing) ctx.pull()
      else {
        if (currentMsg.nonEmpty) ctx.pushAndFinish(currentMsg.mkString)
        else ctx.finish()
      }
  }
}



On Tuesday, October 27, 2015 at 12:18:58 PM UTC-4, Kevin Esler wrote:
>
> I am starting to work with akka-streams and have a problem to solve which 
> seems easy but I'm having trouble with it.
>
> I have a Source[String, Unit] and the strings can be quite large. I'd like 
> to split them into chunks of some maximum size such as 4k, transmit them 
> and reassemble them at the other end.
>
> The first thought i had was to create a Flow which:
>
>    - assigns a unique reassembly key of type String, to each original 
>    String,
>    - outputs a Stream of  (String, String), elements of which are 
>    understood as (reassemblyKey, chunk)
>
> Then create a Flow which does the reasembly. So I coded these as follows 
> before having serious doubts about them:
>
>
> import akka.stream.scaladsl.{ Flow, FlowGraph, MergePreferred, Source }
> import scala.concurrent.duration.FiniteDuration
> import java.util.concurrent.atomic.AtomicLong
>
> object WithStringChunking {
>
>   private val counter = new AtomicLong(0L)
>
>   /**
>    * Factory for a string-chunking flow.
>    *
>    * @param maxChunkSize the maximum size of chunk
>    * @return string chunking flow to be connected to a `Source[String]`
>    */
>   def apply(maxChunkSize: Int): Flow[String, (String, String), Unit] = {
>     val uniqueReassemblyKey = counter.getAndIncrement().toString
>
>     Flow[String].mapConcat { s =>
>       s.grouped(maxChunkSize).map((uniqueReassemblyKey, _)).toList
>     }
>   }
> }
>
>
> import scala.concurrent.ExecutionContext
>
> import akka.stream.Materializer
> import akka.stream.scaladsl.Flow
> import akka.stream.scaladsl.Sink
> import akka.stream.scaladsl.Source
>
> object WithStringUnchunking {
>
>   /**
>    * Factory for a string-unchunking flow.
>    *
>    * @param maxChunkSize the maximum size of chunk
>    * @return string chunking flow to be connected to a `Source[(String, 
> String)]`
>    * where the string pairs consist of:
>    *   - reassemblyKey (all related chunks have the same
>    *
>    * BUG: I think this will buffer too much
>    * Need to take advantage of adjacency
>    */
>   def apply()
>            (implicit materializer: Materializer, ec: ExecutionContext)
>   : Flow[(String, String), String, Unit]
>   = {
>     val result = Flow[(String, String)]
>       // Group together the chunks with the same re-assembly key
>       .groupBy { case (key, chunk) => key }
>       .mapAsync(1) { case (_, pairsSource) =>
>         // Discard the keys.
>         val chunksSource: Source[String, Unit] = pairsSource.map { _._2 }
>
>         // Concatenate the chunks
>         for {
>           chunks <- chunksSource.grouped(1000000).runWith(Sink.head)
>         } yield chunks.fold(""){ _ + _ }
>       }
>     result
>   }
> }
>
>
> This code compiles but I think the unchunking part will always buffer "too 
> much", since most messages will be much less than 8 million characters.
>
>
> I really want the unchunking part to:
>
>    - read and buffer chunks until the reassembly key changes
>    - concatenate the chunks and emit the original message
>
> How to express this using the akks-streams API? 
>
>
> Or maybe a more tractable encoding would be convert the original 
> Source[String, Unit] to another Source[String, Unit] which consists of:
>
>    - a string which when interpreted as an integer is understood to be N, 
>    the number of chunks to follow which are to be reassembled to a single 
>    message
>    - the N chunks
>    - ....etc
>    
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to