Hi Eric,

On Tue, Jun 16, 2015 at 12:41 AM, Eric Kolotyluk <[email protected]>
wrote:

> After struggling for a while to make sense of Akka Streams, I have the
> following experiment I would appreciate comments on so that I can
> understand and document it better. Eventually I will release my experiments
> on GitHub when I am confident they are valuable learning tools.
>
> Can this be simplified any further without loosing the sense of what is
> really going on?
>

What is the goal you want to achieve? It can be simplified only if I know
what was the task you wanted to solve -- nevertheless I will try.


>
> Is creating the FlowGraph really necessary in order to get a Future from
> FlowMaterializer?
>

Umm, you have not used a FlowGraph anywhere in the below code. Also, have
you looked into this section in the docs
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Stream_Materialization
especially the section showing how to handle materialized values:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Combining_materialized_values


>
> Cheers, Eric
>
> package net.kolotyluk.laboratory.akka.streams
>
> import akka.actor.ActorSystem
> import akka.stream.ActorFlowMaterializer
> import akka.stream.scaladsl.Flow
> import akka.stream.scaladsl.Sink
> import akka.stream.scaladsl.Source
>
> import net.kolotyluk.scala.LogbackLogging
>
> import scala.concurrent.Future
> import scala.language.postfixOps
> import scala.util.Failure
> import scala.util.Success
>
>
> /** Basic Akka Stream Usage
>   *
>   * Demonstrate a simple case of using a stream.
>   *
>   * Akka Streams are based on a Source and Sink, connected via a Flow,
>   * defined via a FlowGraph, and finally materialized into something
>   * runnable.
>   *
>   * It helps to pay attention to the log messages, in particular which
>   * thread messages are logged from.
>   *
>   * @author Eric Kolotyluk
>   */
> object Experiment1 extends App with LogbackLogging {
>
>     logger.info( "Hello Akka Streams - Experiment 1" )
>
>     val system = ActorSystem("System")
>     import system.dispatcher
>
>     logger.info("Create a Source based on a simple Iterable[T]")
>     val source = Source(1 to 10)
>
>     logger.info("Create a sink that can be connected to the Source via a
> flow" )
>     val sink = Sink.foreach{ int: Int => logger.info("sink: " + int) }
>
>     logger.info("create a Flow")
>     val flow = Flow[Int]
>
>     logger.info("Create a FlowGraph from the Source")
>     val flowGraph = source via flow
>

It is just a Source, not a flowGraph. The type will be Source[Int, Unit],
just as the type of sink. In fact, since you appended an empty flow to a
source, you just get back the same source as before.

As the documentation states:

SourceA processing stage with *exactly one output*, emitting data elements
whenever downstream processing stages are ready to receive them.SinkA
processing stage with *exactly one input*, requesting and accepting data
elements possibly slowing down the upstream producer of elementsFlowA
processing stage which has *exactly one input and output*, which connects
its up- and downstreams by transforming the data elements flowing through
it.

Remember, that Source/Flow/Sink are blueprints, they don't execute right
away, they just provide descriptions of the stream you want to run. You can
think about these as the "Props for streams". In ordinary actors you have
Props that describe the actor to be created, but you need to explicitly
create actors from a Props. Also, you can create multiple actors using the
same Props.

This is similar in streams, you first create a blueprint, then you can
create multiple instances of that blueprint.


>
>     logger.info("Create a RunnableFlow from the FlowGraph")
>     // Because we want a Future that completes when the FlowGraph
> completes,
>     // we specify combine: (unit, future) => future
>     val runnableFlow = flowGraph.toMat(sink)((unit, future) => future)
>

You can use the Keep.right prepackaged function, which is documented here:
http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/#akka.stream.scaladsl.Keep$

And also demonstrated here:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Combining_materialized_values

You could also write source.toMat(sink)(Keep.right), there is no need for
"flowGraph" (which is the same source anyway, since it is just "source" and
an attached empty Flow)

If you want to run the stream right away, you can use runWith(sink), which
will automatically use the materialized value of "sink". If you use any of
the runWith methods you will always get back the materialized value of the
object you gave as an argument for runWith for convenience. Again, refer to
the docs:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Combining_materialized_values
(line 20-22 in the example)


>
>     logger.info("Create flowMaterializer")
>     val flowMaterializer = ActorFlowMaterializer()(system)
>

If you use "implicit val flowMaterializer" ...


>
>
>     logger.info("Materialize runnableFlow as materializedFlow")
>     val materializedFlow = runnableFlow.run()(flowMaterializer)
>

... then you don't need to explicitly give the materializer here as an
argument. Also, I would not name this variable "materializedFlow" since it
will be just a Future from the ForeachSink.

Btw, your whole example so far can be written as such:

val completionFuture  = Source(1 to 10).runForeach(logger.info("sink: " +
_))



>
>
>     logger.info("Create callback for when the flow is complete")
>     materializedFlow.onComplete {
>       result => result match {
>         case Success(value) => logger.info("materializedFlow complete")
>         case Failure(cause) => logger.error("materializedFlow", cause)
>       }
>       // We need to explicitly shutdown the Actor System when there
>       // is no more work left to do.
>       system.shutdown()
>     }
>
>     logger.info("Do some silly work to show the foreground is not blocked"
> )
>     val range = 1 to 10
>     range.foreach { x =>
>       logger.info("foreground: " + x)
>       Thread.sleep(1)
>     }
> }
>
> 15:32:43,714 |-INFO in ch.qos.logback.classic.LoggerContext[default] -
> Could NOT find resource [logback.groovy]
> 15:32:43,714 |-INFO in ch.qos.logback.classic.LoggerContext[default] -
> Could NOT find resource [logback-test.xml]
> 15:32:43,715 |-INFO in ch.qos.logback.classic.LoggerContext[default] -
> Could NOT find resource [logback.xml]
> 15:32:43,719 |-INFO in ch.qos.logback.classic.LoggerContext[default] -
> Setting up default configuration.
>
> 15:32:43.800 [main] INFO  n.k.l.akka.streams.Experiment1$ - Hello Akka
> Streams - Experiment 1
> 15:32:44.162 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create a
> Source based on a simple Iterable[T]
> 15:32:44.185 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create a sink
> that can be connected to the Source via a flow
> 15:32:44.200 [main] INFO  n.k.l.akka.streams.Experiment1$ - create a Flow
> 15:32:44.200 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create a
> FlowGraph from the Source
> 15:32:44.201 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create a
> RunnableFlow from the FlowGraph
> 15:32:44.203 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create
> flowMaterializer
> 15:32:44.216 [main] INFO  n.k.l.akka.streams.Experiment1$ - Materialize
> runnableFlow as materializedFlow
> 15:32:44.250 [main] INFO  n.k.l.akka.streams.Experiment1$ - Create
> callback for when the flow is complete
> 15:32:44.251 [main] INFO  n.k.l.akka.streams.Experiment1$ - Do some silly
> work to show the foreground is not blocked
> 15:32:44.252 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 1
> 15:32:44.253 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 2
> 15:32:44.254 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 3
> 15:32:44.255 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 4
> 15:32:44.256 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 5
> 15:32:44.258 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 6
> 15:32:44.259 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 7
> 15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 1
> 15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 2
> 15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 3
> 15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 4
> 15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 5
> 15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 6
> 15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 7
> 15:32:44.259 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 8
> 15:32:44.260 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 9
> 15:32:44.260 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 8
> 15:32:44.260 [System-akka.actor.default-dispatcher-3] INFO  n.k.l.akka.
> streams.Experiment1$ - sink: 10
> 15:32:44.261 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 9
> 15:32:44.261 [System-akka.actor.default-dispatcher-2] INFO  n.k.l.akka.
> streams.Experiment1$ - materializedFlow complete
> 15:32:44.263 [main] INFO  n.k.l.akka.streams.Experiment1$ - foreground: 10
>
>
>
You can also look at the cookbook for various examples:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-cookbook.html

-Endre




>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to