Repository: spark
Updated Branches:
  refs/heads/master 9047cc0f2 -> 14d7c1c3e


[SPARK-24863][SS] Report Kafka offset lag as a custom metrics

## What changes were proposed in this pull request?

This builds on top of SPARK-24748 to report 'offset lag' as a custom metrics 
for Kafka structured streaming source.

This lag is the difference between the latest offsets in Kafka the time the 
metrics is reported (just after a micro-batch completes) and the latest offset 
Spark has processed. It can be 0 (or close to 0) if spark keeps up with the 
rate at which messages are ingested into Kafka topics in steady state. This 
measures how far behind the spark source has fallen behind (per partition) and 
can aid in tuning the application.

## How was this patch tested?

Existing and new unit tests

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #21819 from arunmahadevan/SPARK-24863.

Authored-by: Arun Mahadevan <ar...@apache.org>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14d7c1c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14d7c1c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14d7c1c3

Branch: refs/heads/master
Commit: 14d7c1c3e99e7523c757628d411525aa9d8e0709
Parents: 9047cc0
Author: Arun Mahadevan <ar...@apache.org>
Authored: Sat Aug 18 17:31:52 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Sat Aug 18 17:31:52 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/JsonUtils.scala   | 33 ++++++++++++-----
 .../sql/kafka010/KafkaMicroBatchReader.scala    | 26 ++++++++++++--
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 38 +++++++++++++++++++-
 3 files changed, 85 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14d7c1c3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 868edb5..92b13f2 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -29,6 +29,11 @@ import org.json4s.jackson.Serialization
  */
 private object JsonUtils {
   private implicit val formats = Serialization.formats(NoTypeHints)
+  implicit val ordering = new Ordering[TopicPartition] {
+    override def compare(x: TopicPartition, y: TopicPartition): Int = {
+      Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, 
y.partition))
+    }
+  }
 
   /**
    * Read TopicPartitions from json string
@@ -51,7 +56,7 @@ private object JsonUtils {
    * Write TopicPartitions as json string
    */
   def partitions(partitions: Iterable[TopicPartition]): String = {
-    val result = new HashMap[String, List[Int]]
+    val result = HashMap.empty[String, List[Int]]
     partitions.foreach { tp =>
       val parts: List[Int] = result.getOrElse(tp.topic, Nil)
       result += tp.topic -> (tp.partition::parts)
@@ -80,19 +85,31 @@ private object JsonUtils {
    * Write per-TopicPartition offsets as json string
    */
   def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
-    val result = new HashMap[String, HashMap[Int, Long]]()
-    implicit val ordering = new Ordering[TopicPartition] {
-      override def compare(x: TopicPartition, y: TopicPartition): Int = {
-        Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, 
y.partition))
-      }
-    }
+    val result = HashMap.empty[String, HashMap[Int, Long]]
     val partitions = partitionOffsets.keySet.toSeq.sorted  // sort for more 
determinism
     partitions.foreach { tp =>
         val off = partitionOffsets(tp)
-        val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
+        val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
         parts += tp.partition -> off
         result += tp.topic -> parts
     }
     Serialization.write(result)
   }
+
+  /**
+   * Write per-topic partition lag as json string
+   */
+  def partitionLags(
+      latestOffsets: Map[TopicPartition, Long],
+      processedOffsets: Map[TopicPartition, Long]): String = {
+    val result = HashMap.empty[String, HashMap[Int, Long]]
+    val partitions = latestOffsets.keySet.toSeq.sorted
+    partitions.foreach { tp =>
+      val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L)
+      val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
+      parts += tp.partition -> lag
+      result += tp.topic -> parts
+    }
+    Serialization.write(Map("lag" -> result))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/14d7c1c3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 6c95b2b..900c9f4 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets
 import scala.collection.JavaConverters._
 
 import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
@@ -33,9 +34,9 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
 import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
 import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader}
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset, SupportsCustomReaderMetrics}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.UninterruptibleThread
 
@@ -62,7 +63,7 @@ private[kafka010] class KafkaMicroBatchReader(
     metadataPath: String,
     startingOffsets: KafkaOffsetRangeLimit,
     failOnDataLoss: Boolean)
-  extends MicroBatchReader with Logging {
+  extends MicroBatchReader with SupportsCustomReaderMetrics with Logging {
 
   private var startPartitionOffsets: PartitionOffsetMap = _
   private var endPartitionOffsets: PartitionOffsetMap = _
@@ -158,6 +159,10 @@ private[kafka010] class KafkaMicroBatchReader(
     KafkaSourceOffset(endPartitionOffsets)
   }
 
+  override def getCustomMetrics: CustomMetrics = {
+    KafkaCustomMetrics(kafkaOffsetReader.fetchLatestOffsets(), 
endPartitionOffsets)
+  }
+
   override def deserializeOffset(json: String): Offset = {
     KafkaSourceOffset(JsonUtils.partitionOffsets(json))
   }
@@ -380,3 +385,18 @@ private[kafka010] case class 
KafkaMicroBatchInputPartitionReader(
     }
   }
 }
+
+/**
+ * Currently reports per topic-partition lag.
+ * This is the difference between the offset of the latest available data
+ * in a topic-partition and the latest offset that has been processed.
+ */
+private[kafka010] case class KafkaCustomMetrics(
+    latestOffsets: Map[TopicPartition, Long],
+    processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
+  override def json(): String = {
+    JsonUtils.partitionLags(latestOffsets, processedOffsets)
+  }
+
+  override def toString: String = json()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/14d7c1c3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 172c0ef..c7b74f3 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -31,12 +31,13 @@ import scala.util.Random
 
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.TopicPartition
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
-import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
 import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
@@ -701,6 +702,41 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
     intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
   }
 
+  test("custom lag metrics") {
+    import testImplicits._
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray)
+    require(testUtils.getLatestOffsets(Set(topic)).size === 2)
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("subscribe", topic)
+      .option("startingOffsets", s"earliest")
+      .option("maxOffsetsPerTrigger", 10)
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+
+    implicit val formats = DefaultFormats
+
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+    testStream(mapped)(
+      StartStream(trigger = OneTimeTrigger),
+      AssertOnQuery { query =>
+        query.awaitTermination()
+        val source = query.lastProgress.sources(0)
+        // masOffsetsPerTrigger is 10, and there are two partitions containing 
50 events each
+        // so 5 events should be processed from each partition and a lag of 45 
events
+        val custom = parse(source.customMetrics)
+          .extract[Map[String, Map[String, Map[String, Long]]]]
+        custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45
+      }
+    )
+  }
+
 }
 
 abstract class KafkaSourceSuiteBase extends KafkaSourceTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to