We have a scenario that events from three kafka topics sharing the same keys need to be merged. One topic has the master events; most events in other two topics arrive within 10 minutes of master event arrival. Wrote pseudo code below. I'd love to hear your thoughts whether I am on the right track.
// Scenario // (1) Merging events from Kafka topic1, topic2 and topic 3 sharing the same keys // (2) Events in topic1 are master events // (3) One master event may have associated event in Topic2 and/or Topic3 sharing the same key // (4) Most events in topic2 and topic3 will arrive within 10 minutes of the master event arrival // // Pseudo code // Use 1-minute window of events in topic1, to left-outer-join with next 10-minute of events from // topic2 and topic3 // parse the event to form key-value pair def parse(v:String) = { (v.split(",")(0), v) } // Create context with 1 minute batch interval val sparkConf = new SparkConf().setAppName("MergeLogs") val ssc = new StreamingContext(sparkConf, Minutes(1)) ssc.checkpoint(checkpointDirectory) // Create direct kafka stream with brokers and topics val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Set(“topic1”) stream1.checkpoint(Minutes(5) val pairStream1 = stream1.map(_._2).map(s => parse(s)) val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Set(“topic2”) stream2.checkpoint(Minutes(5) val pairStream2 = stream2.map(_._2).map(s => parse(s)) val stream3 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Set(“topic3”) stream3.checkpoint(Minutes(5) val pairStream3 = stream3.map(_._2).map(s => parse(s)) // load 1 minute of master events from topic 1 val windowedStream1 = pairStream1.window(Minutes(1)) // load 10 minutes of topic1 and topic2 val windowedStream2 = pairStream2.window(Minutes(10), Minutes(1)) val windowedStream3 = pairStream3.window(Minutes(10), Minutes(1)) // lefter join topic1 with topic2 and topic3 *val joinedStream = windowedStream1.leftOuterJoin(windowedStream2).leftOuterJoin(windowedStream3)* // dump merged events joinedStream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } // Start the computation val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { createContext(ip, port, outputPath, checkpointDirectory) }) ssc.start() ssc.awaitTermination() thx Daniel