Hi,
I'm currently trying to build a real-time geo-indexing application with
akka streams.
In order to index my location updates I have to buffer them in a
Transformer and
map them to a new value when enough events where received.
My current implementation is like this
val geoSource: Source[LocationUpdate] = Source({ () =>
Thread sleep 100L // throtteling for test purpose
Some(LocationUpdate("#1", random, random, System.currentTimeMillis))
})
class GridCellTransformer(implicit val resolution: Double = 1.0) extends
Transformer[LocationUpdate, IndexedLocationUpdate] {
def onNext(update: LocationUpdate): Seq[IndexedLocationUpdate] = {
val cell = GridCell(update.lon, update.lat)
Seq(IndexedLocationUpdate(update, cell))
}
}
class Compressor extends Transformer[IndexedLocationUpdate, IndexPoint] {
// internal buffer here
def onNext(point: IndexedLocationUpdate): Seq[IndexPoint] = {
if(enoughElements) indexPoints) else Nil
}
}
Now I would like to connect them with the flow materializer dsl like this
geoSource ~> gridCell ~> compressor
However the *Transformer* class is not suitable for this. What is the
approach to realize this?
The other two options I see don't look ideal to me
class GridCellTransformer {
def apply(update: LocationUpdate): Seq[IndexedLocationUpdate] = Nil
}
val t = new GridCellTransformer
geoSource.map(t.apply)
or
geoSource.map {
val buffer = ...;
x => toIndexed
}
thanks :)
Muki
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.