Repository: spark Updated Branches: refs/heads/master 1076e4f00 -> 6afe6f32c
[SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard metrics ## What changes were proposed in this pull request? The patch adds metrics regarding state and watermark to dropwizard metrics, so that watermark and state rows/size can be tracked via time-series manner. ## How was this patch tested? Manually tested with CSV metric sink. Closes #21622 from HeartSaVioR/SPARK-24637. Authored-by: Jungtaek Lim <kabh...@gmail.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6afe6f32 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6afe6f32 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6afe6f32 Branch: refs/heads/master Commit: 6afe6f32ca2880b13bb5fb4397b2058eef12952b Parents: 1076e4f Author: Jungtaek Lim <kabh...@gmail.com> Authored: Tue Aug 7 10:12:22 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Tue Aug 7 10:12:22 2018 +0800 ---------------------------------------------------------------------- .../execution/streaming/MetricsReporter.scala | 20 ++++++++++++++++++++ .../sql/streaming/StreamingQuerySuite.scala | 3 +++ 2 files changed, 23 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6afe6f32/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala index 66b11ec..8709822 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.streaming +import java.text.SimpleDateFormat + import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.{Source => CodahaleSource} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.streaming.StreamingQueryProgress /** @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", + progress => convertStringDateToMillis(progress.eventTime.get("watermark")), 0L) + + registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 0L) + registerGauge("states-usedBytes", _.stateOperators.map(_.memoryUsedBytes).sum, 0L) + + private def convertStringDateToMillis(isoUtcDateStr: String) = { + if (isoUtcDateStr != null) { + timestampFormat.parse(isoUtcDateStr).getTime + } else { + 0L + } + } + private def registerGauge[T]( name: String, f: StreamingQueryProgress => T, http://git-wip-us.apache.org/repos/asf/spark/blob/6afe6f32/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index f37f368..9cceec9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -467,6 +467,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0) assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0) assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] == 0.0) + assert(gauges.get("eventTime-watermark").getValue.asInstanceOf[Long] == 0) + assert(gauges.get("states-rowsTotal").getValue.asInstanceOf[Long] == 0) + assert(gauges.get("states-usedBytes").getValue.asInstanceOf[Long] == 0) sq.stop() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org