As you expected I want to aggregate the elements of a stream. Perhaps
creation a second subscriber on the source (as you suggest) could be a
solution. I do not know what you mean by 'creation a second subscriber'
//Variance
//Calculate the variance from a source of integers with a reusable flow
('vari')
object Urlaub03 extends App {
implicit val system = ActorSystem("sys")
try {
val size = math.pow(2, 6).toInt
val settings =
ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize
= size)
implicit val materializer = ActorMaterializer(settings)
val src: Source[Int, Unit] = Source(() => new Iterator[Int] {
val max = 500
var cnt = 0
def hasNext = cnt < max;
def next = { cnt += 1; (math.random * 1000).toInt }
})
// Utility flow
class Fill[A]() extends StatefulStage[A, A] {
override def initial: StageState[A, A] =
new StageState[A, A] {
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
val iter = new Iterator[A] {
def hasNext = true;
def next = elem
}
emit(iter, ctx)
}
}
}
// Utility flow
val _cntFlow: Flow[Int, Int, Future[Int]] = {
import FlowGraph.Implicits._
val cntSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml,
_) => cuml + 1)
Flow(cntSink) {
implicit builder =>
fold =>
(fold.inlet,
builder.materializedValue.mapAsync(4)(identity).outlet)
}
}
// Utility flow
val _sumFlow: Flow[Int, Int, Future[Int]] = {
import FlowGraph.Implicits._
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml,
elem) => cuml + elem)
Flow(sumSink) {
implicit builder =>
fold =>
(fold.inlet,
builder.materializedValue.mapAsync(4)(identity).outlet)
}
}
// Reusable flow
val vari: Flow[Int, Double, Unit] = {
import FlowGraph.Implicits._
Flow() { implicit b =>
val bcast = b.add(Broadcast[Int](3))
val zip1 = b.add(Zip[Int, Int]())
val zip2 = b.add(Zip[Int, Double]())
// Transforms a stream of integers to their sum
val sumFlow = b.add(_sumFlow)
// Transforms a stream of integers to their amount of elements
val cntFlow = b.add(_cntFlow)
val mean = b.add(Flow[(Int, Int)].map { case (sum, cnt) =>
sum.toDouble / cnt })
// Takes the first element of a stream and transforms it into an
endless stream of that element.
val fill = b.add(Flow[Double].transform(() => new Fill[Double]()))
val vari = b.add(Flow[(Int, Double)].map { case (value, mean) =>
value - mean })
bcast ~> zip2.in0
bcast ~> sumFlow ~> zip1.in0
bcast ~> cntFlow ~> zip1.in1
zip1.out ~> mean ~> fill ~> zip2.in1
zip2.out ~> vari
(bcast.in, vari.outlet)
}
}
var cnt = 1
// Usage of the 'vari' flow
val re = src.via(vari).runForeach { x =>
println("03 vari: %-6d %-8.4f" format (cnt, x))
cnt += 1
}
Await.result(re, 3.second)
println("03 complete")
} finally {
system.shutdown()
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.