Hi all,
I have the below code. I have a list of document ids that I want to return
the documents for. However, I want to stream each document's data chunk by
chunk. So the general structure would be
Seq[DocId] -> Document(DocId, ByteString) -> sink
So for instance for document id X whose document size is 10 bytes and my
chunk size is 1 byte I would get
Document(X, ByteString[Byte 0])
Document(X, ByteString[Byte 1])
...
Document(X, ByteString[Byte 9])
I totally realize that Application.flow function I have is not correct as
it will only give me the first chunk size. It is meant as a "here is the
stateful action I would want to do for my flow."
import java.io.{InputStream}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
object Database {
case class DocId(id: String)
case class Document(id: DocId, bs: ByteString)
trait Database {
def getDocumentStream(id: DocId): Option[InputStream]
}
object ResourceDatabase extends Database {
def getDocumentStream(id: DocId): Option[InputStream] = {
val rs = getClass.getResourceAsStream("/" + id.id + ".json")
if (rs != null) {
Some(rs)
}
else {
None
}
}
}
}
object Application extends App {
import Database._
val source = Source(Vector(DocId("1"), DocId("2")))
def flow(): Flow[DocId, Option[Document], Unit] = {
val emptyByteString = ByteString("")
Flow[DocId].map { x =>
val ds = ResourceDatabase.getDocumentStream(x)
ds map { y =>
/* Know this is not correct but would like to know how to do this
properly */
val b = new Array[Byte](8192)
val bytesRead = y.read(b, 0, 8192)
if (bytesRead >= 0) {
Document(x, ByteString(b.take(bytesRead)))
}
else {
y.close
Document(x, emptyByteString)
}
}
}
}
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem("application-streams")
implicit val materializer = ActorMaterializer()
val f = source.via(flow).runForeach(x => x map {
println _
})
f onFailure {
case t => println("Failed with " + t.getMessage)
}
f onSuccess {
case t => println("Success")
}
system.awaitTermination()
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.