We're a group of experienced backend developers who are fairly new to Spark Streaming (and Scala) and very interested in using the new (in 1.3) DirectKafkaInputDStream impl as part of the metrics reporting service we're building.
Our flow involves reading in metric events, lightly modifying some of the data values, and then creating aggregates via reduceByKey. We're following the approach in Cody Koeninger's blog on exactly-once streaming (https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md) in which the Kakfa OffsetRanges are grabbed from the RDD and persisted to a tracking table within the same db transaction as the data within said ranges. Within a short time frame the offsets in the table fall out of synch with the offsets. It appears that the writeOffsets method (see code below) occasionally doesn't get called which also indicates that some blocks of data aren't being processed either; the aggregate operation makes this difficult to eyeball from the data that's written to the db. Note that we do understand that the reduce operation alters that size/boundaries of the partitions we end up processing. Indeed, without the reduceByKey operation our code seems to work perfectly. But without the reduceByKey operation the db has to perform *a lot* more updates. It's certainly a significant restriction to place on what is such a promising approach. I'm hoping there simply something we're missing. Any workarounds or thoughts are welcome. Here's the code we've got: def run(stream: DStream[Input], conf: Config, args: List[String]): Unit = { ... val sumFunc: (BigDecimal, BigDecimal) => BigDecimal = (_ + _) val transformStream = stream.transform { rdd => val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges printOffsets(offsets) // just prints out the offsets for reference rdd.mapPartitionsWithIndex { case (i, iter) => iter.flatMap { case (name, msg) => extractMetrics(msg) } .map { case (k,v) => ( ( keyWithFlooredTimestamp(k), offsets(i) ), v ) } } }.reduceByKey(sumFunc, 1) transformStream.foreachRDD { rdd => rdd.foreachPartition { partition => val conn = DriverManager.getConnection(dbUrl, dbUser, dbPass) val db = DB(conn) db.autoClose(false) db.autoCommit { implicit session => var currentOffset: OffsetRange = null partition.foreach { case (key, value) => currentOffset = key._2 writeMetrics(key._1, value, table) } writeOffset(currentOffset) // updates the offset positions } db.close() } } Thanks, Mark -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org