And of course I find the solution right after I post. :) :(

val f = source.via(flow).flatMapConcat(identity).runForEach(println(_))

That solves both the interleave and the source of sources

On Thursday, January 28, 2016 at 8:46:45 PM UTC-5, Ryan Stradling wrote:
>
> I have simplified the example further and got rid of some mistakes. 
>  However, I effectively end up with a type of Source of Source.  This does 
> not seem correct to me.  Is there a way to flatten that given the code 
> below?
>
> In addition, how can I make it so all of the first documents are processed 
> before the second document rather than being interleaved?
>
> import java.io.{ByteArrayInputStream}
> import akka.actor.ActorSystem
> import akka.stream.ActorMaterializer
> import akka.stream.scaladsl._
> import akka.util.ByteString
> import scala.concurrent.Future
>
>
> object Database {
>
>   case class DocId(id: String)
>
>   object ResourceDatabase {
>     def getDocumentStream(id: DocId): Source[ByteString, Future[Long]] = {
>       val i =   id.id.toByte
>       val ba = new ByteArrayInputStream (Array.fill[Byte] (10000)(i))
>       StreamConverters.fromInputStream(() => ba, 1000)
>     }
>   }
> }
>
> object Application extends App {
>
>   import Database._
>
>   val source = Source(Vector(DocId("1"), DocId("2")))
>
>   def flow(): Flow[DocId, Source[ByteString, Future[Long]], Unit] = {
>     Flow[DocId] map ResourceDatabase.getDocumentStream
>   }
>
>   import scala.concurrent.ExecutionContext.Implicits.global
>
>   implicit val system = ActorSystem("application-streams")
>   implicit val materializer = ActorMaterializer()
>
>
>   val f = source.via(flow).runForeach { y =>
>     y.runForeach(x => println(x))
>   }
>
>   f onSuccess {
>     case t => println("Success")
>   }
>   f onFailure {
>     case t => println("Failure")
>   }
>
>   system.awaitTermination()
> }
>
>
>
> On Thursday, January 28, 2016 at 12:28:32 AM UTC-5, Ryan Stradling wrote:
>>
>> Here is what I have now...
>> I am very curious about the following...
>> 1) Is there a better way?  I am not sure that I am handling having a 
>> source that generates multiple sources correctly.  For instance in 
>> docStreamToDoc each source in the sequence is being run explicitly.  
>> 2) I am surprised by the output that the 2 documents are interleaved in 
>> certain places.  I would expect that not to be the case so maybe I am doing 
>> something incorrect?
>> DocId(1) ByteString(91, 10, 32, 32, 123, 10, 32, 32, 32, 32)
>> DocId(2) ByteString(91, 10, 32, 32, 123, 10, 32, 32, 32, 32)
>> DocId(1) ByteString(34, 95, 105, 100, 34, 58, 32, 34, 53, 54)
>> DocId(2) ByteString(34, 95, 105, 100, 34, 58, 32, 34, 53, 54)
>> DocId(2) ByteString(97, 56, 50, 49, 98, 50, 102, 51, 53, 99)
>>
>>
>>
>> import java.io.{InputStream}
>>
>> import akka.actor.ActorSystem
>> import akka.stream.ActorMaterializer
>> import akka.stream.scaladsl._
>> import akka.util.ByteString
>>
>> import scala.collection.immutable
>> import scala.concurrent.Future
>>
>>
>> object Database {
>>
>>   case class DocId(id: String)
>>
>>   case class Document(id: DocId, bs: Iterator[ByteString])
>>   case class Document2(id: DocId, bs : ByteString)
>>   case class DocumentStream(id : DocId, source : Source[ByteString, 
>> Future[Long]])
>>
>>   trait Database {
>>     def getDocumentStream(id: DocId): DocumentStream
>>
>>   }
>>
>>   object ResourceDatabase extends Database {
>>     def getDocumentStream(id: DocId): DocumentStream = {
>>       val rs = getClass.getResourceAsStream("/" + id.id + ".json")
>>       DocumentStream(id, StreamConverters.fromInputStream(() => rs))
>>     }
>>   }
>> }
>>
>> object Application extends App {
>>
>>   import Database._
>>
>>   val source = Source(Vector(DocId("1"), DocId("2")))
>>
>>   def flow(): Flow[DocId, DocumentStream, Unit] = {
>>     val emptyByteString = ByteString("")
>>     Flow[DocId] map ResourceDatabase.getDocumentStream
>>   }
>>
>>   def docStreamToDocFlow(): Flow[DocumentStream, Future[Seq[Document]], 
>> Unit] = {
>>     implicit val materializer = ActorMaterializer()
>>     Flow[DocumentStream] map { ds =>
>>       val grouped: Source[Iterator[ByteString], Future[Long]] = 
>> ds.source.map(x => x.grouped(10))
>>       val f: Future[Seq[Document]] = (grouped map { (s : 
>> Iterator[ByteString]) => Document(ds.id, s)}).runWith(Sink.seq)
>>       f
>>     }
>>   }
>>
>>   import scala.concurrent.ExecutionContext.Implicits.global
>>
>>   implicit val system = ActorSystem("application-streams")
>>   implicit val materializer = ActorMaterializer()
>>
>>
>>   val f = source.via(flow).via(docStreamToDocFlow).runForeach { y =>
>>     import scala.collection.JavaConverters._
>>     import scala.collection.JavaConversions._
>>     y.onSuccess {
>>       case t =>
>>         println("OnSuccess")
>>         t.foreach { t =>
>>           t.bs.foreach {
>>             f => println("\t" + t.id + " " + f)
>>           }
>>         }
>>     }
>>     y.onFailure {
>>       case y => println("Inner failure " + y)
>>     }
>>   }
>>
>>   f onSuccess {
>>     case t => println("Success")
>>   }
>>   f onFailure {
>>     case t => println("Failure")
>>   }
>>
>>   system.awaitTermination()
>> }
>>
>>
>>
>> On Wednesday, January 27, 2016 at 8:31:09 AM UTC-5, √ wrote:
>>>
>>> What about Document(X, Source[ByteString]) ?
>>>
>>> On Wed, Jan 27, 2016 at 2:18 PM, Ryan Stradling <[email protected]> 
>>> wrote:
>>>
>>>> I absolutely don't want to read the whole file and apologize if it came 
>>>> across that way.  I want to export a tuple that contains the doc id and 
>>>> the 
>>>> bytes of the chunk.  The doc id is so that I can keep track of eventually 
>>>> what chunks go with which document.   So for example if I had in document 
>>>> X 
>>>> the bytes 012 and my chunk size was 1 byte I would expect 4 Document ids 
>>>> streamed
>>>> Document(X, ["0"])
>>>> Document(X, ["1"])
>>>> Document(X, ["2"])
>>>> Document(X, [""])
>>>>
>>>> And then downstream would write this to a sink.
>>>>
>>>>
>>>> --
>>>> >>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>>      Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>>      Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> √
>>>
>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to