Following is a running sample of what I was talking about.
Happy to help, however I'd ask that you please start a new thread with 
related questions since this conversation is not related to the original 
thread (you've hijacked the thread).

The sample code performs normalization of a stream of integers.
It's a simple example of a flow the requires sequential processing, doing a 
first pass over the stream of integers to determine range before emitting 
the normalized version on the second pass.
It's the simplest version of a pattern that you can apply to your own 
problem at hand.
The key is in the fold, repeat, and flatten to create an unbounded source 
that can be zipped with the original numbers.

The full self-contained Gist can be found here: 
https://gist.github.com/lancearlaus/b43b7acb8a3aada51701

Also, if you're doing a variance calculation, there's a well known better 
way to do that incrementally in a single pass.
It just so happens I tackled this very problem. Here's a pointer to the 
relevant code in one of my repos.
https://github.com/scalaspring/akka-http-spring-boot-activator/blob/develop/src/main/scala/sample/quote/Statistics.scala
https://github.com/scalaspring/akka-http-spring-boot-activator/blob/develop/src/main/scala/sample/quote/stage/package.scala


// Creates an unbounded source of random ints with a known seed (for 
repeatability)
def randomSource(seed: Int) = Source(() => {
  val random = new Random(seed)
  Iterator.continually(random.nextInt)  
})


// Transform a source of integers into a normalized source of doubles where
// each element emitted is in the range of 0 to 1
// Note that the incoming source must be both finite and support multiple 
subscribers
def normalize(in: Source[Int, Unit]): Source[Double, Unit] = {

  // Fold over the input source to create a new source that emits a single 
element
  // which is the range of integers over the entire stream
  val fold = in.fold((Int.MaxValue, Int.MinValue)) { 
    (range, n) => range match {
      case (l, u) => (l.min(n), u.max(n))
    }
  }

  // Transform the single element range source into an unbounded source
  // that continually emits the same element 
  val range = fold.map(r => 
Source.repeat(r)).flatten(FlattenStrategy.concat)

  // Create a stage that normalizes each value
  val normalize = Flow[(Int, (Int, Int))].map {
    case (n, (min, max)) if (min == max) => 1.0
    case (n, (min, max)) => (n.toDouble - min.toDouble) / (max.toDouble - 
min.toDouble)
  }

  // Create the final source using a flow that combines the prior constructs
  Source(in, range, Zip[Int, (Int, Int)], normalize)((mat, _, _, _) => mat) 
{
    implicit b => (in, range, zip, normalize) => 

    in    ~> zip.in0
    range ~> zip.in1
             zip.out ~> normalize

    normalize.outlet
  }

}

class NormalizeSpec extends FlatSpec with AkkaStreamsImplicits with 
Matchers with ScalaFutures {
 
  val seed = 42

  "Normalize" should "properly calculate for constant stream" in {

    val value = 5
    val size = 100
    val expected = Seq.fill(size)(1.0)

    val constants = Source.repeat(value).take(size)
    val normalized = normalize(constants)

    val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))

    whenReady(future) { result =>
      //println(s"result: $result")
      result should have size expected.size
      result.zip(expected).foreach { case (actual, expected) =>
        actual shouldBe expected
      }
    }
  }

  it should "properly calculate for random stream" in {

    val size = 100
    val expected = Seq.fill(size)(1.0)

    val randoms = randomSource(seed).take(size)
    val normalized = normalize(randoms)

    val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))

    whenReady(future) { result =>
      //println(s"result: $result")
      result should have size expected.size
      result should contain (0.0)
      result should contain (1.0)
      result.exists(_ < 0.0) shouldBe false
      result.exists(_ > 1.0) shouldBe false
    }
  }

}


On Wednesday, July 29, 2015 at 12:42:39 PM UTC-4, wwagner4 wrote:
>
> As you expected I want to aggregate the elements of a stream. Perhaps 
> creation a second subscriber on the source (as you suggest) could be a 
> solution. I do not know what you mean by 'creation a second subscriber'
>
> //Variance
> //Calculate the variance from a source of integers with a reusable flow 
> ('vari')
> object Urlaub03 extends App {
>   implicit val system = ActorSystem("sys")
>   try {
>     val size = math.pow(2, 6).toInt
>     val settings = 
> ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize 
> = size)
>     implicit val materializer = ActorMaterializer(settings)
>
>     val src: Source[Int, Unit] = Source(() => new Iterator[Int] {
>       val max = 500
>       var cnt = 0
>       def hasNext = cnt < max;
>       def next = { cnt += 1; (math.random * 1000).toInt }
>     })
>
>     // Utility flow
>     class Fill[A]() extends StatefulStage[A, A] {
>       override def initial: StageState[A, A] =
>         new StageState[A, A] {
>           override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
>             val iter = new Iterator[A] {
>               def hasNext = true;
>               def next = elem
>             }
>             emit(iter, ctx)
>           }
>         }
>     }
>
>     // Utility flow
>     val _cntFlow: Flow[Int, Int, Future[Int]] = {
>       import FlowGraph.Implicits._
>       val cntSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, 
> _) => cuml + 1)
>       Flow(cntSink) {
>         implicit builder =>
>           fold =>
>             (fold.inlet, 
> builder.materializedValue.mapAsync(4)(identity).outlet)
>       }
>     }
>
>     // Utility flow
>     val _sumFlow: Flow[Int, Int, Future[Int]] = {
>       import FlowGraph.Implicits._
>       val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, 
> elem) => cuml + elem)
>       Flow(sumSink) {
>         implicit builder =>
>           fold =>
>             (fold.inlet, 
> builder.materializedValue.mapAsync(4)(identity).outlet)
>       }
>     }
>
>     // Reusable flow
>     val vari: Flow[Int, Double, Unit] = {
>       import FlowGraph.Implicits._
>
>       Flow() { implicit b =>
>         val bcast = b.add(Broadcast[Int](3))
>         val zip1 = b.add(Zip[Int, Int]())
>         val zip2 = b.add(Zip[Int, Double]())
>         // Transforms a stream of integers to their sum
>         val sumFlow = b.add(_sumFlow)
>         // Transforms a stream of integers to their amount of elements
>         val cntFlow = b.add(_cntFlow)
>         val mean = b.add(Flow[(Int, Int)].map { case (sum, cnt) => 
> sum.toDouble / cnt })
>         // Takes the first element of a stream and transforms it into an 
> endless stream of that element.
>         val fill = b.add(Flow[Double].transform(() => new Fill[Double]()))
>         val vari = b.add(Flow[(Int, Double)].map { case (value, mean) => 
> value - mean })
>
>         bcast ~> zip2.in0
>         bcast ~> sumFlow ~> zip1.in0
>         bcast ~> cntFlow ~> zip1.in1
>         zip1.out ~> mean ~> fill ~> zip2.in1
>         zip2.out ~> vari
>
>         (bcast.in, vari.outlet)
>       }
>     }
>     var cnt = 1
>     // Usage of the 'vari' flow
>     val re = src.via(vari).runForeach { x =>
>       println("03 vari: %-6d %-8.4f" format (cnt, x))
>       cnt += 1
>     }
>     Await.result(re, 3.second)
>     println("03 complete")
>   } finally {
>     system.shutdown()
>   }
> }
>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to