I have accidently posted too much of my code. Relevant for my question is
only the code in 'object Urlab03' :(
Am Dienstag, 28. Juli 2015 09:27:38 UTC+2 schrieb wwagner4:
>
> As you expected I want to aggregate the elements of a stream. Perhaps
> creation a second subscriber on the source (as you suggest) could be a
> solution. I do not know what you mean by 'creation a second subscriber'
>
> This is my example.
> package net.entelijan
>
> import akka.actor._
>
> import akka.stream._
> import akka.stream.scaladsl._
> import akka.stream.stage._
>
> import scala.concurrent._
> import scala.concurrent.duration._
>
> import scala.concurrent.ExecutionContext.Implicits.global
>
> object Urlaub01 extends App {
>
> implicit val system = ActorSystem("sys")
> implicit val materializer = ActorMaterializer()
>
> val src: Source[Int, Unit] = Source(1 to 10)
>
> val flow: Flow[Int, Int, Unit] = Flow[Int].map { x => x * x }
>
> val sink1: Sink[Int, Future[Unit]] = Sink.foreach { x: Int =>
> if (x < 500) println(x)
> else throw new IllegalStateException("OK")
> }
> val sink2: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((a, b) => a +
> b)
> val src1: Source[Int, Unit] = src.via(flow)
> val rf = src1.toMat(sink2)(Keep.both)
>
> val re = rf.run()
> println("left :%s" format re._1)
> println("right :%s" format re._2)
> re._2.onComplete { x => println(x) }
>
> Thread.sleep(1000)
> system.shutdown()
>
> }
>
> /*
> Elementtype and Materializingtype
> Souce[E, M]
> Flow [E1, E2, M]
> Sink [E, M]
> E : elementtype
> M : materializingtype
> */
> object Urlaub02 extends App {
>
> implicit val system = ActorSystem("sys")
> implicit val materializer = ActorMaterializer()
>
> val src: Source[String, Unit] = Source(List("A", "B"))
> val sink: Sink[String, Future[String]] = Sink.fold[String, String]("")((
> a, b) => a + b)
>
> val rgSrc: RunnableGraph[Unit] = src.toMat(sink)(Keep.left)
> val reSrc1: Unit = rgSrc.run()
> println("mat value of source 1: '%s'" format reSrc1)
>
> val rgSink: RunnableGraph[Future[String]] = src.toMat(sink)(Keep.right)
> val reSink1: Future[String] = rgSink.run()
> reSink1.onComplete { x => println("mat value of sink 1: '%s'" format x
> ) }
>
> val reSrc2: Unit = src.to(sink).run()
> println("mat value of source 2: '%s'" format reSrc2)
>
> val reSrc3: Unit = src.runWith(sink)
> println("mat value of source 3: '%s'" format reSrc3)
>
> val reSrc4 = sink.runWith(src)
> println("mat value of source 4: '%s'" format reSrc4)
>
> Thread.sleep(1000)
> system.shutdown()
>
> }
>
> //Variance
> //Calculate the variance from a source of integers with a reusable flow
> ('vari')
> object Urlaub03 extends App {
> implicit val system = ActorSystem("sys")
> try {
> val size = math.pow(2, 6).toInt
> val settings =
> ActorMaterializerSettings(system).withInputBuffer(initialSize
> = 64, maxSize = size)
> implicit val materializer = ActorMaterializer(settings)
>
> val src: Source[Int, Unit]
> ...
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.