After struggling for a while to make sense of Akka Streams, I have the
following experiment I would appreciate comments on so that I can
understand and document it better. Eventually I will release my experiments
on GitHub when I am confident they are valuable learning tools.
Can this be simplified any further without loosing the sense of what is
really going on?
Is creating the FlowGraph really necessary in order to get a Future from
FlowMaterializer?
Cheers, Eric
package net.kolotyluk.laboratory.akka.streams
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import net.kolotyluk.scala.LogbackLogging
import scala.concurrent.Future
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success
/** Basic Akka Stream Usage
*
* Demonstrate a simple case of using a stream.
*
* Akka Streams are based on a Source and Sink, connected via a Flow,
* defined via a FlowGraph, and finally materialized into something
* runnable.
*
* It helps to pay attention to the log messages, in particular which
* thread messages are logged from.
*
* @author Eric Kolotyluk
*/
object Experiment1 extends App with LogbackLogging {
logger.info( "Hello Akka Streams - Experiment 1" )
val system = ActorSystem("System")
import system.dispatcher
logger.info("Create a Source based on a simple Iterable[T]")
val source = Source(1 to 10)
logger.info("Create a sink that can be connected to the Source via a
flow" )
val sink = Sink.foreach{ int: Int => logger.info("sink: " + int) }
logger.info("create a Flow")
val flow = Flow[Int]
logger.info("Create a FlowGraph from the Source")
val flowGraph = source via flow
logger.info("Create a RunnableFlow from the FlowGraph")
// Because we want a Future that completes when the FlowGraph completes,
// we specify combine: (unit, future) => future
val runnableFlow = flowGraph.toMat(sink)((unit, future) => future)
logger.info("Create flowMaterializer")
val flowMaterializer = ActorFlowMaterializer()(system)
logger.info("Materialize runnableFlow as materializedFlow")
val materializedFlow = runnableFlow.run()(flowMaterializer)
logger.info("Create callback for when the flow is complete")
materializedFlow.onComplete {
result => result match {
case Success(value) => logger.info("materializedFlow complete")
case Failure(cause) => logger.error("materializedFlow", cause)
}
// We need to explicitly shutdown the Actor System when there
// is no more work left to do.
system.shutdown()
}
logger.info("Do some silly work to show the foreground is not blocked")
val range = 1 to 10
range.foreach { x =>
logger.info("foreground: " + x)
Thread.sleep(1)
}
}
15:32:43,714 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could
NOT find resource [logback.groovy]
15:32:43,714 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could
NOT find resource [logback-test.xml]
15:32:43,715 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could
NOT find resource [logback.xml]
15:32:43,719 |-INFO in ch.qos.logback.classic.LoggerContext[default] -
Setting up default configuration.
15:32:43.800 [main] INFO n.k.l.akka.streams.Experiment1$ - Hello Akka
Streams - Experiment 1
15:32:44.162 [main] INFO n.k.l.akka.streams.Experiment1$ - Create a Source
based on a simple Iterable[T]
15:32:44.185 [main] INFO n.k.l.akka.streams.Experiment1$ - Create a sink
that can be connected to the Source via a flow
15:32:44.200 [main] INFO n.k.l.akka.streams.Experiment1$ - create a Flow
15:32:44.200 [main] INFO n.k.l.akka.streams.Experiment1$ - Create a
FlowGraph from the Source
15:32:44.201 [main] INFO n.k.l.akka.streams.Experiment1$ - Create a
RunnableFlow from the FlowGraph
15:32:44.203 [main] INFO n.k.l.akka.streams.Experiment1$ - Create
flowMaterializer
15:32:44.216 [main] INFO n.k.l.akka.streams.Experiment1$ - Materialize
runnableFlow as materializedFlow
15:32:44.250 [main] INFO n.k.l.akka.streams.Experiment1$ - Create callback
for when the flow is complete
15:32:44.251 [main] INFO n.k.l.akka.streams.Experiment1$ - Do some silly
work to show the foreground is not blocked
15:32:44.252 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 1
15:32:44.253 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 2
15:32:44.254 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 3
15:32:44.255 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 4
15:32:44.256 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 5
15:32:44.258 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 6
15:32:44.259 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 7
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 1
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 2
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 3
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 4
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 5
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 6
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 7
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 8
15:32:44.260 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 9
15:32:44.260 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 8
15:32:44.260 [System-akka.actor.default-dispatcher-3] INFO n.k.l.akka.
streams.Experiment1$ - sink: 10
15:32:44.261 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 9
15:32:44.261 [System-akka.actor.default-dispatcher-2] INFO n.k.l.akka.
streams.Experiment1$ - materializedFlow complete
15:32:44.263 [main] INFO n.k.l.akka.streams.Experiment1$ - foreground: 10
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.