Repository: spark Updated Branches: refs/heads/master 9047cc0f2 -> 14d7c1c3e
[SPARK-24863][SS] Report Kafka offset lag as a custom metrics ## What changes were proposed in this pull request? This builds on top of SPARK-24748 to report 'offset lag' as a custom metrics for Kafka structured streaming source. This lag is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 (or close to 0) if spark keeps up with the rate at which messages are ingested into Kafka topics in steady state. This measures how far behind the spark source has fallen behind (per partition) and can aid in tuning the application. ## How was this patch tested? Existing and new unit tests Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #21819 from arunmahadevan/SPARK-24863. Authored-by: Arun Mahadevan <ar...@apache.org> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14d7c1c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14d7c1c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14d7c1c3 Branch: refs/heads/master Commit: 14d7c1c3e99e7523c757628d411525aa9d8e0709 Parents: 9047cc0 Author: Arun Mahadevan <ar...@apache.org> Authored: Sat Aug 18 17:31:52 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Sat Aug 18 17:31:52 2018 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/kafka010/JsonUtils.scala | 33 ++++++++++++----- .../sql/kafka010/KafkaMicroBatchReader.scala | 26 ++++++++++++-- .../kafka010/KafkaMicroBatchSourceSuite.scala | 38 +++++++++++++++++++- 3 files changed, 85 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/14d7c1c3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 868edb5..92b13f2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -29,6 +29,11 @@ import org.json4s.jackson.Serialization */ private object JsonUtils { private implicit val formats = Serialization.formats(NoTypeHints) + implicit val ordering = new Ordering[TopicPartition] { + override def compare(x: TopicPartition, y: TopicPartition): Int = { + Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) + } + } /** * Read TopicPartitions from json string @@ -51,7 +56,7 @@ private object JsonUtils { * Write TopicPartitions as json string */ def partitions(partitions: Iterable[TopicPartition]): String = { - val result = new HashMap[String, List[Int]] + val result = HashMap.empty[String, List[Int]] partitions.foreach { tp => val parts: List[Int] = result.getOrElse(tp.topic, Nil) result += tp.topic -> (tp.partition::parts) @@ -80,19 +85,31 @@ private object JsonUtils { * Write per-TopicPartition offsets as json string */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { - val result = new HashMap[String, HashMap[Int, Long]]() - implicit val ordering = new Ordering[TopicPartition] { - override def compare(x: TopicPartition, y: TopicPartition): Int = { - Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) - } - } + val result = HashMap.empty[String, HashMap[Int, Long]] val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism partitions.foreach { tp => val off = partitionOffsets(tp) - val parts = result.getOrElse(tp.topic, new HashMap[Int, Long]) + val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long]) parts += tp.partition -> off result += tp.topic -> parts } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string + */ + def partitionLags( + latestOffsets: Map[TopicPartition, Long], + processedOffsets: Map[TopicPartition, Long]): String = { + val result = HashMap.empty[String, HashMap[Int, Long]] + val partitions = latestOffsets.keySet.toSeq.sorted + partitions.foreach { tp => + val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L) + val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long]) + parts += tp.partition -> lag + result += tp.topic -> parts + } + Serialization.write(Map("lag" -> result)) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/14d7c1c3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index 6c95b2b..900c9f4 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import org.apache.commons.io.IOUtils +import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging @@ -33,9 +34,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} -import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions} import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} -import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset, SupportsCustomReaderMetrics} import org.apache.spark.sql.types.StructType import org.apache.spark.util.UninterruptibleThread @@ -62,7 +63,7 @@ private[kafka010] class KafkaMicroBatchReader( metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) - extends MicroBatchReader with Logging { + extends MicroBatchReader with SupportsCustomReaderMetrics with Logging { private var startPartitionOffsets: PartitionOffsetMap = _ private var endPartitionOffsets: PartitionOffsetMap = _ @@ -158,6 +159,10 @@ private[kafka010] class KafkaMicroBatchReader( KafkaSourceOffset(endPartitionOffsets) } + override def getCustomMetrics: CustomMetrics = { + KafkaCustomMetrics(kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets) + } + override def deserializeOffset(json: String): Offset = { KafkaSourceOffset(JsonUtils.partitionOffsets(json)) } @@ -380,3 +385,18 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( } } } + +/** + * Currently reports per topic-partition lag. + * This is the difference between the offset of the latest available data + * in a topic-partition and the latest offset that has been processed. + */ +private[kafka010] case class KafkaCustomMetrics( + latestOffsets: Map[TopicPartition, Long], + processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics { + override def json(): String = { + JsonUtils.partitionLags(latestOffsets, processedOffsets) + } + + override def toString: String = json() +} http://git-wip-us.apache.org/repos/asf/spark/blob/14d7c1c3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 172c0ef..c7b74f3 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -31,12 +31,13 @@ import scala.util.Random import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkContext import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession} -import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution @@ -701,6 +702,41 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) } } + test("custom lag metrics") { + import testImplicits._ + val topic = newTopic() + testUtils.createTopic(topic, partitions = 2) + testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray) + require(testUtils.getLatestOffsets(Set(topic)).size === 2) + + val kafka = spark + .readStream + .format("kafka") + .option("subscribe", topic) + .option("startingOffsets", s"earliest") + .option("maxOffsetsPerTrigger", 10) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + + implicit val formats = DefaultFormats + + val mapped = kafka.map(kv => kv._2.toInt + 1) + testStream(mapped)( + StartStream(trigger = OneTimeTrigger), + AssertOnQuery { query => + query.awaitTermination() + val source = query.lastProgress.sources(0) + // masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each + // so 5 events should be processed from each partition and a lag of 45 events + val custom = parse(source.customMetrics) + .extract[Map[String, Map[String, Map[String, Long]]]] + custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45 + } + ) + } + } abstract class KafkaSourceSuiteBase extends KafkaSourceTest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org