Hello,
I just started learning Flink (using Scala) recently, and I developed a job
that, in short, does this steps:
- Reads json messages from Kafka
- Enriches the messages, reading data from Cassandra (using Phantom
DSL)
- Puts the enriched messages back to another Kafka topic.
The job looks like this:
env
.addSource(new FlinkKafkaProducer09[String](...))
.map(MyService.enrichMessage _) // Returns Option
.filter(!_.isEmpty)
.map(_.get)
.map(enrichedMessageToJsonMapper)
.addSink(new FlinkKafkaConsumer09[String](...)))
The "enrichMessage" method is where I'm using Phantom DSL to query
Cassandra, and I would like to return a Future, but I can't figure out a way
to do it right now, so I'm using "Await" to force the resolution and return
a result. Is there a way to use a Future here?
I do have a second job that is updating the data in Cassandra, and since I
don't need to sink, I can have my map to return the Futures, and everything
happens asynchronously. I would like to know if it's possible to have a
similar behavior when I want to use a Sink (so, sink to Kafka as the Futures
are completed).
BTW, I'm using Flink 1.1.2 with Scala 2.11.
Thanks a lot for your help!
Kind regards,
Albert
---
This email has been checked for viruses by Avast antivirus software.
https://www.avast.com/antivirus