Hi,
I'm building a pipeline using Flink using Kafka as source and sink. As part
of the this pipeline I have multiple stages in my run command and I would
like to publish some substages output into separate kafka topic.
My question is can I write multiple stages of run to multiple kafka topics ?
private val env = StreamExecutionEnvironment.getExecutionEnvironment
private val src = env.addSource(Source.kafka(streams.abc.topic))
override def run(stream: DataStream[TypeX]) : = {
val stage1 = stream
.map(doA)
.map(doB)
.map(doC)
val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse:
TypeT)*
val stage3 = stage2.filter(_.isTrue)
*stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run method
?*
val stage4 = stage2.filter(! _.isTrue)
stage4.map(_.toString)
}
run(src).addSink(Write_To_Kafka_Topic_X)
Ideally I will not prefer to call addSink method inside run (as mentioned
in bold lines above).
--
Thanks,
Deepak Jha