Hi all,
I have the below code.  I have a list of document ids that I want to return 
the documents for.  However, I want to stream each document's data chunk by 
chunk.  So the general structure would be

Seq[DocId] -> Document(DocId, ByteString) -> sink

So for instance for document id X whose document size is 10 bytes and my 
chunk size is 1 byte I would get
Document(X, ByteString[Byte 0])
Document(X, ByteString[Byte 1])
...
Document(X, ByteString[Byte 9])

I totally realize that Application.flow function I have is not correct as 
it will only give me the first chunk size.  It is meant as a "here is the 
stateful action I would want to do for my flow."  




import java.io.{InputStream}

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString


object Database {

  case class DocId(id: String)

  case class Document(id: DocId, bs: ByteString)

  trait Database {
    def getDocumentStream(id: DocId): Option[InputStream]

  }

  object ResourceDatabase extends Database {
    def getDocumentStream(id: DocId): Option[InputStream] = {
      val rs = getClass.getResourceAsStream("/" + id.id + ".json")
      if (rs != null) {
        Some(rs)
      }
      else {
        None
      }
    }
  }
}

object Application extends App {

  import Database._

  val source = Source(Vector(DocId("1"), DocId("2")))

  def flow(): Flow[DocId, Option[Document], Unit] = {
    val emptyByteString = ByteString("")
    Flow[DocId].map { x =>
      val ds = ResourceDatabase.getDocumentStream(x)
      ds map { y =>
        /* Know this is not correct but would like to know how to do this 
properly */
        val b = new Array[Byte](8192)
        val bytesRead = y.read(b, 0, 8192)
        if (bytesRead >= 0) {
          Document(x, ByteString(b.take(bytesRead)))
        }
        else {
          y.close
          Document(x, emptyByteString)
        }
      }
    }
  }

  import scala.concurrent.ExecutionContext.Implicits.global

  implicit val system = ActorSystem("application-streams")
  implicit val materializer = ActorMaterializer()
  val f = source.via(flow).runForeach(x => x map {
    println _
  })

  f onFailure {
    case t => println("Failed with " + t.getMessage)
  }
  f onSuccess {
    case t => println("Success")
  }
  system.awaitTermination()
}


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to