As you expected I want to aggregate the elements of a stream. Perhaps 
creation a second subscriber on the source (as you suggest) could be a 
solution. I do not know what you mean by 'creation a second subscriber'

//Variance
//Calculate the variance from a source of integers with a reusable flow 
('vari')
object Urlaub03 extends App {
  implicit val system = ActorSystem("sys")
  try {
    val size = math.pow(2, 6).toInt
    val settings = 
ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize 
= size)
    implicit val materializer = ActorMaterializer(settings)

    val src: Source[Int, Unit] = Source(() => new Iterator[Int] {
      val max = 500
      var cnt = 0
      def hasNext = cnt < max;
      def next = { cnt += 1; (math.random * 1000).toInt }
    })

    // Utility flow
    class Fill[A]() extends StatefulStage[A, A] {
      override def initial: StageState[A, A] =
        new StageState[A, A] {
          override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
            val iter = new Iterator[A] {
              def hasNext = true;
              def next = elem
            }
            emit(iter, ctx)
          }
        }
    }

    // Utility flow
    val _cntFlow: Flow[Int, Int, Future[Int]] = {
      import FlowGraph.Implicits._
      val cntSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, 
_) => cuml + 1)
      Flow(cntSink) {
        implicit builder =>
          fold =>
            (fold.inlet, 
builder.materializedValue.mapAsync(4)(identity).outlet)
      }
    }

    // Utility flow
    val _sumFlow: Flow[Int, Int, Future[Int]] = {
      import FlowGraph.Implicits._
      val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, 
elem) => cuml + elem)
      Flow(sumSink) {
        implicit builder =>
          fold =>
            (fold.inlet, 
builder.materializedValue.mapAsync(4)(identity).outlet)
      }
    }

    // Reusable flow
    val vari: Flow[Int, Double, Unit] = {
      import FlowGraph.Implicits._

      Flow() { implicit b =>
        val bcast = b.add(Broadcast[Int](3))
        val zip1 = b.add(Zip[Int, Int]())
        val zip2 = b.add(Zip[Int, Double]())
        // Transforms a stream of integers to their sum
        val sumFlow = b.add(_sumFlow)
        // Transforms a stream of integers to their amount of elements
        val cntFlow = b.add(_cntFlow)
        val mean = b.add(Flow[(Int, Int)].map { case (sum, cnt) => 
sum.toDouble / cnt })
        // Takes the first element of a stream and transforms it into an 
endless stream of that element.
        val fill = b.add(Flow[Double].transform(() => new Fill[Double]()))
        val vari = b.add(Flow[(Int, Double)].map { case (value, mean) => 
value - mean })

        bcast ~> zip2.in0
        bcast ~> sumFlow ~> zip1.in0
        bcast ~> cntFlow ~> zip1.in1
        zip1.out ~> mean ~> fill ~> zip2.in1
        zip2.out ~> vari

        (bcast.in, vari.outlet)
      }
    }
    var cnt = 1
    // Usage of the 'vari' flow
    val re = src.via(vari).runForeach { x =>
      println("03 vari: %-6d %-8.4f" format (cnt, x))
      cnt += 1
    }
    Await.result(re, 3.second)
    println("03 complete")
  } finally {
    system.shutdown()
  }
}


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to