I think it should be:
val graph = FlowGraph.closed() { implicit b =>
val merge = b.add(Merge[(SerialNumber, Double)](sensorSources.size))
sensorSources.zipWithIndex.foreach { case (src, i) =>
b.addEdge(b.add(src), merge.in(i))
}
b.addEdge(merge.out, b.add(consoleSink))
}
graph.run()
Or with the ~> implicits:
val graph = FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val merge = b.add(Merge[(SerialNumber, Double)](sensorSources.size))
sensorSources.foreach { src =>
src ~> merge
}
merge ~> consoleSink
}
graph.run()
Cheers,
Patrik
On Sun, Sep 13, 2015 at 6:54 PM, Patryk Koryzna <[email protected]> wrote:
> Hi all,
> I'm trying to construct a simple FlowGraph to merge multiple sources (with
> ActorProducers supplying the data). However, the following code fails to
> compile, complaining about type Source being incompatible with Outlet. Am I
> missing some implicits or type parameters? Source below.
> I'm using akka-stream-experimental 1.0 on Scala 2.11.5
>
> Regards,
> Patryk
>
> Error:(43, 44) type mismatch;
> found : akka.stream.scaladsl.Source[(sensor.SerialNumber, Double),Unit]
> required: akka.stream.Outlet[?]
> sensorSources.foreach(src => b.addEdge(src, merge))
> ^
>
> import java.io.File
>
>
>
> import akka.actor._
>
> import akka.stream.ActorMaterializer
>
> import akka.stream.actor.ActorPublisher
>
> import akka.stream.scaladsl._
>
> import sensor.SerialNumber
>
> import streaming.FileReloader
>
>
>
> import scala.concurrent.duration._
>
>
>
> object Boot extends App {
>
> implicit val system = ActorSystem("reactive-temperature")
>
> implicit val mat = ActorMaterializer()
>
>
>
>
>
> def fileReloadSource(period: FiniteDuration, path: String): Source[
> String, Unit] = {
>
> Source(ActorPublisher[String](system.actorOf(FileReloader.props(period
> , path))))
>
> }
>
>
>
>
>
> import akka.stream.scaladsl.FlowGraph
>
> import system.log
>
>
>
> val sensors = sensor.Sensor.find(new File("exampleData/"))
>
> val consoleSink = Sink.foreach[(String, Double)](st => log.info(s"sensor:
> ${st._1}, temp: ${st._2} C"))
>
>
>
>
>
> val sensorSources: List[Source[(SerialNumber, Double), Unit]] =
>
> sensors.map { sensor =>
>
> fileReloadSource(5.seconds, sensor.device.getPath)
>
> .filter(_.contains("t=")).map(_.split("\\s").last.replace("t=", ""
> ).toDouble)
>
> .map((sensor.serialNumber, _))
>
> }
>
>
>
>
>
> val graph = FlowGraph.partial() { implicit b =>
>
> import FlowGraph.Implicits._
>
>
>
> val merge = b.add(Merge[(SerialNumber, Double)](sensorSources.size))
>
>
>
>
>
> sensorSources.foreach(src => b.addEdge(src, merge))
>
> }
>
>
>
> consoleSink.runWith(graph)
>
> }
>
>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
--
Patrik Nordwall
Typesafe <http://typesafe.com/> - Reactive apps on the JVM
Twitter: @patriknw
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.