After struggling for a while to make sense of Akka Streams, I have the 
following experiment I would appreciate comments on so that I can 
understand and document it better. Eventually I will release my experiments 
on GitHub when I am confident they are valuable learning tools.

Can this be simplified any further without loosing the sense of what is 
really going on?

Is creating the FlowGraph really necessary in order to get a Future from 
FlowMaterializer?

Cheers, Eric

package net.kolotyluk.laboratory.akka.streams

import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

import net.kolotyluk.scala.LogbackLogging

import scala.concurrent.Future
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success


/** Basic Akka Stream Usage
  *
  * Demonstrate a simple case of using a stream.
  * 
  * Akka Streams are based on a Source and Sink, connected via a Flow,
  * defined via a FlowGraph, and finally materialized into something
  * runnable.
  * 
  * It helps to pay attention to the log messages, in particular which
  * thread messages are logged from.
  * 
  * @author Eric Kolotyluk
  */
object Experiment1 extends App with LogbackLogging {

    logger.info( "Hello Akka Streams - Experiment 1" )
    
    val system = ActorSystem("System")
    import system.dispatcher

    logger.info("Create a Source based on a simple Iterable[T]")
    val source = Source(1 to 10)

    logger.info("Create a sink that can be connected to the Source via a 
flow" )
    val sink = Sink.foreach{ int: Int => logger.info("sink: " + int) } 
    
    logger.info("create a Flow")
    val flow = Flow[Int]
     
    logger.info("Create a FlowGraph from the Source")
    val flowGraph = source via flow

    logger.info("Create a RunnableFlow from the FlowGraph")
    // Because we want a Future that completes when the FlowGraph completes,
    // we specify combine: (unit, future) => future
    val runnableFlow = flowGraph.toMat(sink)((unit, future) => future)

    logger.info("Create flowMaterializer")
    val flowMaterializer = ActorFlowMaterializer()(system)
    
    logger.info("Materialize runnableFlow as materializedFlow")
    val materializedFlow = runnableFlow.run()(flowMaterializer)
    
    logger.info("Create callback for when the flow is complete")
    materializedFlow.onComplete {
      result => result match {
        case Success(value) => logger.info("materializedFlow complete")
        case Failure(cause) => logger.error("materializedFlow", cause)
      }
      // We need to explicitly shutdown the Actor System when there
      // is no more work left to do.
      system.shutdown()
    }
    
    logger.info("Do some silly work to show the foreground is not blocked")
    val range = 1 to 10
    range.foreach { x => 
      logger.info("foreground: " + x)
      Thread.sleep(1)
    }
}

15:32:43,714 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could 
NOT find resource [logback.groovy]
15:32:43,714 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could 
NOT find resource [logback-test.xml]
15:32:43,715 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could 
NOT find resource [logback.xml]
15:32:43,719 |-INFO in ch.qos.logback.classic.LoggerContext[default] - 
Setting up default configuration.

15:32:43.800 [main] INFO  n.k.l.akka.streams.Experiment1$ - Hello Akka 
Streams - Experiment 1
15:32:44.162 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create a Source 
based on a simple Iterable[T]
15:32:44.185 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create a sink 
that can be connected to the Source via a flow
15:32:44.200 [main] INFO  n.k.l.akka.streams.Experiment1$ - create a Flow
15:32:44.200 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create a 
FlowGraph from the Source
15:32:44.201 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create a 
RunnableFlow from the FlowGraph
15:32:44.203 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create 
flowMaterializer
15:32:44.216 [main] INFO  n.k.l.akka.streams.Experiment1$ - Materialize 
runnableFlow as materializedFlow
15:32:44.250 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create callback 
for when the flow is complete
15:32:44.251 [main] INFO  n.k.l.akka.streams.Experiment1$ - Do some silly 
work to show the foreground is not blocked
15:32:44.252 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 1
15:32:44.253 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 2
15:32:44.254 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 3
15:32:44.255 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 4
15:32:44.256 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 5
15:32:44.258 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 6
15:32:44.259 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 7
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 1
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 2
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 3
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 4
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 5
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 6
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 7
15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 8
15:32:44.260 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 9
15:32:44.260 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 8
15:32:44.260 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
streams.Experiment1$ - sink: 10
15:32:44.261 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 9
15:32:44.261 [System-akka.actor.default-dispatcher-2] INFO  n.k.l.akka.
streams.Experiment1$ - materializedFlow complete
15:32:44.263 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 10


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to