Following is a running sample of what I was talking about. Happy to help, however I'd ask that you please start a new thread with related questions since this conversation is not related to the original thread (you've hijacked the thread).
The sample code performs normalization of a stream of integers. It's a simple example of a flow the requires sequential processing, doing a first pass over the stream of integers to determine range before emitting the normalized version on the second pass. It's the simplest version of a pattern that you can apply to your own problem at hand. The key is in the fold, repeat, and flatten to create an unbounded source that can be zipped with the original numbers. The full self-contained Gist can be found here: https://gist.github.com/lancearlaus/b43b7acb8a3aada51701 Also, if you're doing a variance calculation, there's a well known better way to do that incrementally in a single pass. It just so happens I tackled this very problem. Here's a pointer to the relevant code in one of my repos. https://github.com/scalaspring/akka-http-spring-boot-activator/blob/develop/src/main/scala/sample/quote/Statistics.scala https://github.com/scalaspring/akka-http-spring-boot-activator/blob/develop/src/main/scala/sample/quote/stage/package.scala // Creates an unbounded source of random ints with a known seed (for repeatability) def randomSource(seed: Int) = Source(() => { val random = new Random(seed) Iterator.continually(random.nextInt) }) // Transform a source of integers into a normalized source of doubles where // each element emitted is in the range of 0 to 1 // Note that the incoming source must be both finite and support multiple subscribers def normalize(in: Source[Int, Unit]): Source[Double, Unit] = { // Fold over the input source to create a new source that emits a single element // which is the range of integers over the entire stream val fold = in.fold((Int.MaxValue, Int.MinValue)) { (range, n) => range match { case (l, u) => (l.min(n), u.max(n)) } } // Transform the single element range source into an unbounded source // that continually emits the same element val range = fold.map(r => Source.repeat(r)).flatten(FlattenStrategy.concat) // Create a stage that normalizes each value val normalize = Flow[(Int, (Int, Int))].map { case (n, (min, max)) if (min == max) => 1.0 case (n, (min, max)) => (n.toDouble - min.toDouble) / (max.toDouble - min.toDouble) } // Create the final source using a flow that combines the prior constructs Source(in, range, Zip[Int, (Int, Int)], normalize)((mat, _, _, _) => mat) { implicit b => (in, range, zip, normalize) => in ~> zip.in0 range ~> zip.in1 zip.out ~> normalize normalize.outlet } } class NormalizeSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures { val seed = 42 "Normalize" should "properly calculate for constant stream" in { val value = 5 val size = 100 val expected = Seq.fill(size)(1.0) val constants = Source.repeat(value).take(size) val normalized = normalize(constants) val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _)) whenReady(future) { result => //println(s"result: $result") result should have size expected.size result.zip(expected).foreach { case (actual, expected) => actual shouldBe expected } } } it should "properly calculate for random stream" in { val size = 100 val expected = Seq.fill(size)(1.0) val randoms = randomSource(seed).take(size) val normalized = normalize(randoms) val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _)) whenReady(future) { result => //println(s"result: $result") result should have size expected.size result should contain (0.0) result should contain (1.0) result.exists(_ < 0.0) shouldBe false result.exists(_ > 1.0) shouldBe false } } } On Wednesday, July 29, 2015 at 12:42:39 PM UTC-4, wwagner4 wrote: > > As you expected I want to aggregate the elements of a stream. Perhaps > creation a second subscriber on the source (as you suggest) could be a > solution. I do not know what you mean by 'creation a second subscriber' > > //Variance > //Calculate the variance from a source of integers with a reusable flow > ('vari') > object Urlaub03 extends App { > implicit val system = ActorSystem("sys") > try { > val size = math.pow(2, 6).toInt > val settings = > ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize > = size) > implicit val materializer = ActorMaterializer(settings) > > val src: Source[Int, Unit] = Source(() => new Iterator[Int] { > val max = 500 > var cnt = 0 > def hasNext = cnt < max; > def next = { cnt += 1; (math.random * 1000).toInt } > }) > > // Utility flow > class Fill[A]() extends StatefulStage[A, A] { > override def initial: StageState[A, A] = > new StageState[A, A] { > override def onPush(elem: A, ctx: Context[A]): SyncDirective = { > val iter = new Iterator[A] { > def hasNext = true; > def next = elem > } > emit(iter, ctx) > } > } > } > > // Utility flow > val _cntFlow: Flow[Int, Int, Future[Int]] = { > import FlowGraph.Implicits._ > val cntSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, > _) => cuml + 1) > Flow(cntSink) { > implicit builder => > fold => > (fold.inlet, > builder.materializedValue.mapAsync(4)(identity).outlet) > } > } > > // Utility flow > val _sumFlow: Flow[Int, Int, Future[Int]] = { > import FlowGraph.Implicits._ > val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, > elem) => cuml + elem) > Flow(sumSink) { > implicit builder => > fold => > (fold.inlet, > builder.materializedValue.mapAsync(4)(identity).outlet) > } > } > > // Reusable flow > val vari: Flow[Int, Double, Unit] = { > import FlowGraph.Implicits._ > > Flow() { implicit b => > val bcast = b.add(Broadcast[Int](3)) > val zip1 = b.add(Zip[Int, Int]()) > val zip2 = b.add(Zip[Int, Double]()) > // Transforms a stream of integers to their sum > val sumFlow = b.add(_sumFlow) > // Transforms a stream of integers to their amount of elements > val cntFlow = b.add(_cntFlow) > val mean = b.add(Flow[(Int, Int)].map { case (sum, cnt) => > sum.toDouble / cnt }) > // Takes the first element of a stream and transforms it into an > endless stream of that element. > val fill = b.add(Flow[Double].transform(() => new Fill[Double]())) > val vari = b.add(Flow[(Int, Double)].map { case (value, mean) => > value - mean }) > > bcast ~> zip2.in0 > bcast ~> sumFlow ~> zip1.in0 > bcast ~> cntFlow ~> zip1.in1 > zip1.out ~> mean ~> fill ~> zip2.in1 > zip2.out ~> vari > > (bcast.in, vari.outlet) > } > } > var cnt = 1 > // Usage of the 'vari' flow > val re = src.via(vari).runForeach { x => > println("03 vari: %-6d %-8.4f" format (cnt, x)) > cnt += 1 > } > Await.result(re, 3.second) > println("03 complete") > } finally { > system.shutdown() > } > } > > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
