Hi all,
I'm trying to construct a simple FlowGraph to merge multiple sources (with
ActorProducers supplying the data). However, the following code fails to
compile, complaining about type Source being incompatible with Outlet. Am I
missing some implicits or type parameters? Source below.
I'm using akka-stream-experimental 1.0 on Scala 2.11.5
Regards,
Patryk
Error:(43, 44) type mismatch;
found : akka.stream.scaladsl.Source[(sensor.SerialNumber, Double),Unit]
required: akka.stream.Outlet[?]
sensorSources.foreach(src => b.addEdge(src, merge))
^
import java.io.File
import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl._
import sensor.SerialNumber
import streaming.FileReloader
import scala.concurrent.duration._
object Boot extends App {
implicit val system = ActorSystem("reactive-temperature")
implicit val mat = ActorMaterializer()
def fileReloadSource(period: FiniteDuration, path: String): Source[String,
Unit] = {
Source(ActorPublisher[String](system.actorOf(FileReloader.props(period,
path))))
}
import akka.stream.scaladsl.FlowGraph
import system.log
val sensors = sensor.Sensor.find(new File("exampleData/"))
val consoleSink = Sink.foreach[(String, Double)](st => log.info(s"sensor:
${st._1}, temp: ${st._2} C"))
val sensorSources: List[Source[(SerialNumber, Double), Unit]] =
sensors.map { sensor =>
fileReloadSource(5.seconds, sensor.device.getPath)
.filter(_.contains("t=")).map(_.split("\\s").last.replace("t=", "").
toDouble)
.map((sensor.serialNumber, _))
}
val graph = FlowGraph.partial() { implicit b =>
import FlowGraph.Implicits._
val merge = b.add(Merge[(SerialNumber, Double)](sensorSources.size))
sensorSources.foreach(src => b.addEdge(src, merge))
}
consoleSink.runWith(graph)
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.