Repository: spark
Updated Branches:
  refs/heads/master 1076e4f00 -> 6afe6f32c


[SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard 
metrics

## What changes were proposed in this pull request?

The patch adds metrics regarding state and watermark to dropwizard metrics, so 
that watermark and state rows/size can be tracked via time-series manner.

## How was this patch tested?

Manually tested with CSV metric sink.

Closes #21622 from HeartSaVioR/SPARK-24637.

Authored-by: Jungtaek Lim <kabh...@gmail.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6afe6f32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6afe6f32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6afe6f32

Branch: refs/heads/master
Commit: 6afe6f32ca2880b13bb5fb4397b2058eef12952b
Parents: 1076e4f
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Tue Aug 7 10:12:22 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Tue Aug 7 10:12:22 2018 +0800

----------------------------------------------------------------------
 .../execution/streaming/MetricsReporter.scala   | 20 ++++++++++++++++++++
 .../sql/streaming/StreamingQuerySuite.scala     |  3 +++
 2 files changed, 23 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6afe6f32/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
index 66b11ec..8709822 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.text.SimpleDateFormat
+
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.streaming.StreamingQueryProgress
 
 /**
@@ -39,6 +42,23 @@ class MetricsReporter(
   registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
   registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 
0L)
 
+  private val timestampFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+  registerGauge("eventTime-watermark",
+    progress => 
convertStringDateToMillis(progress.eventTime.get("watermark")), 0L)
+
+  registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 
0L)
+  registerGauge("states-usedBytes", 
_.stateOperators.map(_.memoryUsedBytes).sum, 0L)
+
+  private def convertStringDateToMillis(isoUtcDateStr: String) = {
+    if (isoUtcDateStr != null) {
+      timestampFormat.parse(isoUtcDateStr).getTime
+    } else {
+      0L
+    }
+  }
+
   private def registerGauge[T](
       name: String,
       f: StreamingQueryProgress => T,

http://git-wip-us.apache.org/repos/asf/spark/blob/6afe6f32/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index f37f368..9cceec9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -467,6 +467,9 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
         assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0)
         
assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0)
         assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] == 
0.0)
+        assert(gauges.get("eventTime-watermark").getValue.asInstanceOf[Long] 
== 0)
+        assert(gauges.get("states-rowsTotal").getValue.asInstanceOf[Long] == 0)
+        assert(gauges.get("states-usedBytes").getValue.asInstanceOf[Long] == 0)
         sq.stop()
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to