OK, I found some further simplification, so here is my latest code.

package net.kolotyluk.laboratory.akka.streams

import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

import net.kolotyluk.scala.LogbackLogging

import scala.concurrent.Future
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success


/** Basic Akka Stream Usage
  *
  * Demonstrate a simple case of using a Stream.
  * 
  * Akka Streams are based on processing stages
  * 
  * =Source=
  * A processing stage with exactly one output,
  * emitting data elements whenever downstream
  * processing stages are ready to receive them.
  * 
  * =Sink=
  * A processing stage with exactly one input,
  * requesting and accepting data elements possibly
  * slowing down the upstream producer of elements
  * 
  * =Flow=
  * A processing stage which has exactly one input and output,
  * which connects its up- and downstreams by transforming
  * the data elements flowing through it.
  * 
  * =RunnableFlow=
  * A Flow that has both ends "attached" to a Source and Sink respectively, 
and is ready to be run().
  * 
  * Using Streams is the practice of composing these stages in useful ways.
  * You first compose a blueprint or template for the streams, then you
  * materialize the stream by running it.
  * 
  * It helps to pay attention to the log messages, in particular which
  * thread messages are logged from.
  * 
  * @author Eric Kolotyluk
  * @see 
[[http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html
 
Stream Basics]]
  * 
  */
object Experiment1 extends App with LogbackLogging {

    logger.info("Hello Akka Streams - Experiment 1")
    
    logger.info("Create and start the Akka Actor System")
    val system = ActorSystem("System")
    // Consumed by materializedFlow.onComplete
    import system.dispatcher

    logger.info("Create a Source of Int based on a simple Iterable[T]")
    val source = Source(1 to 10)

    logger.info("Create a sink that can be connected to the Source")
    val sink = Sink.foreach{ int: Int => logger.info("sink: " + int) } 

    logger.info("Create flowMaterializer for running the stream")
    val flowMaterializer = ActorFlowMaterializer()(system)
    
    logger.info("Materialize the stream by connecting the sink to the 
source and running it")
    // A RunnableFlow is implicitly created and run by .runWith()
    val materializedFlow = source.runWith(sink)(flowMaterializer)
    
    logger.info("Create callback for when the flow is complete")
    materializedFlow.onComplete {
      result => result match {
        case Success(value) => logger.info("materializedFlow complete")
        case Failure(cause) => logger.error("materializedFlow", cause)
      }
      // We need to explicitly shutdown the Actor System when there
      // is no more work left to do.
      system.shutdown()
    }
    
    logger.info("Do some silly work to show the foreground is not blocked")
    for (x <- 1 to 10) {
      logger.info("foreground: " + x)
      Thread.sleep(1)
    }
}


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to