I cannot find a solution for accessing the elements of a stream more than
ones inside a flow without buffering the complete stream. To illustrate the
problem I created the following example:
def randomIntegersSource(size: Int): Source[Int, _] = {
val iter =
Iterator.continually(ThreadLocalRandom.current().nextInt(100))
Source(() => iter.take(size))
}
test("normalize a stream of integers") {
implicit val sys = ActorSystem("sys")
try {
implicit val materializer = ActorMaterializer()
val src: Source[Int, _] = randomIntegersSource(size = 20)
// Converts a stream of positive integers to doubles ranging from 0
to 1.
// The gratest input value converts to 1
val normalizeFlow: Flow[Int, Double, _] = ???
val f = src.via(normalizeFlow).runForeach {norm => println("%.3f"
format norm)}
Await.result(f, 2.second)
} finally {
sys.shutdown()
}
}
Inside the 'normalizeFlow' I have to process the stream twice. First to
find the maximum value and second to calculate the normalized values.
Does anyone know an implementation for 'normalizeFlow' that runs with
arbritrary sized streams and that does not need to buffer the whole stream
in order to work?
Here normalize.scala
<https://github.com/wwagner4/akka-streams-tryout/blob/master/src/test/scala/normalize.scala>
you can find the complete example with a buffering solution.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.