Here is what I have now...
I am very curious about the following...
1) Is there a better way?  I am not sure that I am handling having a source 
that generates multiple sources correctly.  For instance in docStreamToDoc 
each source in the sequence is being run explicitly.  
2) I am surprised by the output that the 2 documents are interleaved in 
certain places.  I would expect that not to be the case so maybe I am doing 
something incorrect?
DocId(1) ByteString(91, 10, 32, 32, 123, 10, 32, 32, 32, 32)
DocId(2) ByteString(91, 10, 32, 32, 123, 10, 32, 32, 32, 32)
DocId(1) ByteString(34, 95, 105, 100, 34, 58, 32, 34, 53, 54)
DocId(2) ByteString(34, 95, 105, 100, 34, 58, 32, 34, 53, 54)
DocId(2) ByteString(97, 56, 50, 49, 98, 50, 102, 51, 53, 99)



import java.io.{InputStream}

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.collection.immutable
import scala.concurrent.Future


object Database {

  case class DocId(id: String)

  case class Document(id: DocId, bs: Iterator[ByteString])
  case class Document2(id: DocId, bs : ByteString)
  case class DocumentStream(id : DocId, source : Source[ByteString, 
Future[Long]])

  trait Database {
    def getDocumentStream(id: DocId): DocumentStream

  }

  object ResourceDatabase extends Database {
    def getDocumentStream(id: DocId): DocumentStream = {
      val rs = getClass.getResourceAsStream("/" + id.id + ".json")
      DocumentStream(id, StreamConverters.fromInputStream(() => rs))
    }
  }
}

object Application extends App {

  import Database._

  val source = Source(Vector(DocId("1"), DocId("2")))

  def flow(): Flow[DocId, DocumentStream, Unit] = {
    val emptyByteString = ByteString("")
    Flow[DocId] map ResourceDatabase.getDocumentStream
  }

  def docStreamToDocFlow(): Flow[DocumentStream, Future[Seq[Document]], Unit] = 
{
    implicit val materializer = ActorMaterializer()
    Flow[DocumentStream] map { ds =>
      val grouped: Source[Iterator[ByteString], Future[Long]] = ds.source.map(x 
=> x.grouped(10))
      val f: Future[Seq[Document]] = (grouped map { (s : Iterator[ByteString]) 
=> Document(ds.id, s)}).runWith(Sink.seq)
      f
    }
  }

  import scala.concurrent.ExecutionContext.Implicits.global

  implicit val system = ActorSystem("application-streams")
  implicit val materializer = ActorMaterializer()


  val f = source.via(flow).via(docStreamToDocFlow).runForeach { y =>
    import scala.collection.JavaConverters._
    import scala.collection.JavaConversions._
    y.onSuccess {
      case t =>
        println("OnSuccess")
        t.foreach { t =>
          t.bs.foreach {
            f => println("\t" + t.id + " " + f)
          }
        }
    }
    y.onFailure {
      case y => println("Inner failure " + y)
    }
  }

  f onSuccess {
    case t => println("Success")
  }
  f onFailure {
    case t => println("Failure")
  }

  system.awaitTermination()
}



On Wednesday, January 27, 2016 at 8:31:09 AM UTC-5, √ wrote:
>
> What about Document(X, Source[ByteString]) ?
>
> On Wed, Jan 27, 2016 at 2:18 PM, Ryan Stradling <[email protected] 
> <javascript:>> wrote:
>
>> I absolutely don't want to read the whole file and apologize if it came 
>> across that way.  I want to export a tuple that contains the doc id and the 
>> bytes of the chunk.  The doc id is so that I can keep track of eventually 
>> what chunks go with which document.   So for example if I had in document X 
>> the bytes 012 and my chunk size was 1 byte I would expect 4 Document ids 
>> streamed
>> Document(X, ["0"])
>> Document(X, ["1"])
>> Document(X, ["2"])
>> Document(X, [""])
>>
>> And then downstream would write this to a sink.
>>
>>
>> --
>> >>>>>>>>>>      Read the docs: http://akka.io/docs/
>> >>>>>>>>>>      Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>>      Search the archives: 
>> https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to