We're a group of experienced backend developers who are fairly new to Spark
Streaming (and Scala) and very interested in using the new (in 1.3)
DirectKafkaInputDStream impl as part of the metrics reporting service we're
building.

Our flow involves reading in metric events, lightly modifying some of the
data values, and then creating aggregates via reduceByKey. We're following
the approach in Cody Koeninger's blog on exactly-once streaming
(https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md) in
which the Kakfa OffsetRanges are grabbed from the RDD and persisted to a
tracking table within the same db transaction as the data within said
ranges. 

Within a short time frame the offsets in the table fall out of synch with
the offsets. It appears that the writeOffsets method (see code below)
occasionally doesn't get called which also indicates that some blocks of
data aren't being processed either; the aggregate operation makes this
difficult to eyeball from the data that's written to the db.

Note that we do understand that the reduce operation alters that
size/boundaries of the partitions we end up processing. Indeed, without the
reduceByKey operation our code seems to work perfectly. But without the
reduceByKey operation the db has to perform *a lot* more updates. It's
certainly a significant restriction to place on what is such a promising
approach. I'm hoping there simply something we're missing.

Any workarounds or thoughts are welcome. Here's the code we've got:

def run(stream: DStream[Input], conf: Config, args: List[String]): Unit = {
    ...
    val sumFunc: (BigDecimal, BigDecimal) => BigDecimal = (_ + _)

    val transformStream = stream.transform { rdd =>
      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      printOffsets(offsets) // just prints out the offsets for reference
      rdd.mapPartitionsWithIndex { case (i, iter) =>
        iter.flatMap { case (name, msg) => extractMetrics(msg) }
          .map { case (k,v) => ( ( keyWithFlooredTimestamp(k), offsets(i) ),
v ) }
      }
    }.reduceByKey(sumFunc, 1)

    transformStream.foreachRDD { rdd =>
      rdd.foreachPartition { partition =>
        val conn = DriverManager.getConnection(dbUrl, dbUser, dbPass)
        val db = DB(conn)
        db.autoClose(false)

        db.autoCommit { implicit session =>
          var currentOffset: OffsetRange = null
          partition.foreach { case (key, value) =>
            currentOffset = key._2
            writeMetrics(key._1, value, table)
          }
          writeOffset(currentOffset) // updates the offset positions
        }
        db.close()
      }
    }

Thanks,
Mark




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to