[SPARK-17731][SQL][STREAMING] Metrics for structured streaming for branch-2.0

**This PR adds the same metrics to branch-2.0 that was added to master in 
#15307.**

The differences compared to the #15307 are
- The new configuration is added only in the `SQLConf `object (i.e. 
`SQLConf.STREAMING_METRICS_ENABLED`) and not in the `SQLConf` class (i.e. no 
`SQLConf.isStreamingMetricsEnabled`). Spark master has all the streaming 
configurations exposed as actual fields in SQLConf class (e.g. 
[streamingPollingDelay](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L642)),
 but [not in Spark 
2.0](https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L608).
 So I didnt add it in this 2.0 PR.

- In the previous master PR, the aboveconfiguration was read in 
`StreamExecution` as 
`sparkSession.sessionState.conf.isStreamingMetricsEnabled`. In this 2.0 PR, I 
am instead reading it as 
`sparkSession.conf.get(STREAMING_METRICS_ENABLED)`(i.e. no `sessionState`) to 
keep it consistent with how other confs are read in `StreamExecution` (e.g. 
[STREAMING_POLLING_DELAY](https://github.com/tdas/spark/blob/ee8e899e4c274c363a8b4d13e8bf57b0b467a50e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L62)).

- Different Mima exclusions

------

## What changes were proposed in this pull request?

Metrics are needed for monitoring structured streaming apps. Here is the design 
doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing

Specifically, this PR adds the following public APIs changes.

- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from 
`StreamingQueryInfo`, see later)

- `StreamingQueryStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by all 
the sources
  - processingRate - Current rate (rows/sec) at which the query is processing 
data from
                                  all the sources
  - ~~outputRate~~ - *Does not work with wholestage codegen*
  - latency - Current average latency between the data being available in 
source and the sink writing the corresponding output
  - sourceStatuses: Array[SourceStatus] - Current statuses of the sources
  - sinkStatus: SinkStatus - Current status of the sink
  - triggerStatus - Low-level detailed status of the last completed/currently 
active trigger
    - latencies - getOffset, getBatch, full trigger, wal writes
    - timestamps - trigger start, finish, after getOffset, after getBatch
    - numRows - input, output, state total/updated rows for aggregations

- `SourceStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by the 
source
  - processingRate - Current rate (rows/sec) at which the query is processing 
data from the source
  - triggerStatus - Low-level detailed status of the last completed/currently 
active trigger

- Python API for `StreamingQuery.status()`

### Breaking changes to existing APIs

**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and 
`StreamingQuery.sinkStatus` in favour of 
`StreamingQuery.status.sourceStatuses/sinkStatus`.
  - Branch 2.0 should have it deprecated, master should have it removed.

**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with 
`SourceStatus`, `SinkStatus`
   - Earlier StreamingQueryInfo was used only in the advanced listener API, but 
now it is used in direct public-facing API (StreamingQuery.status)

- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, 
`QueryTerminated` changed have name `queryStatus` and return type 
`StreamingQueryStatus`.

- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to 
`String`.

- For `SourceStatus` and `SinkStatus` made constructor private instead of 
private[sql] to make them more java-safe. Instead added `private[sql] object 
SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.

## How was this patch tested?

Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by 
StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in 
StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in 
StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is 
source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore, 
etc.

Metrics also manually tested using Ganglia sink

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #15472 from tdas/SPARK-17731-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/881e0eb0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/881e0eb0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/881e0eb0

Branch: refs/heads/branch-2.0
Commit: 881e0eb05782ea74cf92a62954466b14ea9e05b6
Parents: a0d9015
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon Oct 17 16:56:40 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Oct 17 16:56:40 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  27 ++
 project/MimaExcludes.scala                      |  10 +
 python/pyspark/sql/streaming.py                 | 301 ++++++++++++++++++
 .../spark/sql/catalyst/trees/TreeNode.scala     |   7 +
 .../sql/execution/LocalTableScanExec.scala      |   5 +-
 .../execution/streaming/StatefulAggregate.scala |  31 +-
 .../execution/streaming/StreamExecution.scala   | 307 ++++++++++++++-----
 .../sql/execution/streaming/StreamMetrics.scala | 242 +++++++++++++++
 .../spark/sql/execution/streaming/memory.scala  |   7 +
 .../state/HDFSBackedStateStoreProvider.scala    |   2 +
 .../execution/streaming/state/StateStore.scala  |   3 +
 .../org/apache/spark/sql/internal/SQLConf.scala |   6 +
 .../apache/spark/sql/streaming/SinkStatus.scala |  28 +-
 .../spark/sql/streaming/SourceStatus.scala      |  54 +++-
 .../spark/sql/streaming/StreamingQuery.scala    |  13 +-
 .../sql/streaming/StreamingQueryInfo.scala      |  37 ---
 .../sql/streaming/StreamingQueryListener.scala  |   8 +-
 .../sql/streaming/StreamingQueryStatus.scala    | 139 +++++++++
 .../sql/execution/metric/SQLMetricsSuite.scala  |  17 +
 .../streaming/StreamMetricsSuite.scala          | 213 +++++++++++++
 .../streaming/TextSocketStreamSuite.scala       |  24 ++
 .../streaming/state/StateStoreSuite.scala       |   5 +
 .../sql/streaming/FileStreamSourceSuite.scala   |  14 +
 .../apache/spark/sql/streaming/StreamTest.scala |  72 +++++
 .../streaming/StreamingAggregationSuite.scala   |  54 ++++
 .../streaming/StreamingQueryListenerSuite.scala | 220 +++++--------
 .../sql/streaming/StreamingQuerySuite.scala     | 180 ++++++++++-
 27 files changed, 1753 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index c640b93..8b5296e 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -264,6 +264,33 @@ class KafkaSourceSuite extends KafkaSourceTest {
     testUnsupportedConfig("kafka.auto.offset.reset", "latest")
   }
 
+  test("input row metrics") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("-1"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("subscribe", topic)
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(2, 3, 4),
+      AssertOnLastQueryStatus { status =>
+        assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
+        assert(status.sourceStatuses(0).processingRate > 0.0)
+      }
+    )
+  }
+
   private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
 
   private def testFromLatestOffsets(topic: String, options: (String, 
String)*): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 423cbd4..ddf53bb 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -787,6 +787,16 @@ object MimaExcludes {
     ) ++ Seq(
       // SPARK-16240: ML persistence backward compatibility for LDA
       
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
+    ) ++ Seq(
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryInfo"),
+      
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.this"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.queryInfo"),
+      
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),
+      
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.SourceStatus.offsetDesc"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceStatus.this"),
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status")
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 118a02b..0df63a7 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -189,6 +189,304 @@ class StreamingQueryManager(object):
         self._jsqm.resetTerminated()
 
 
+class StreamingQueryStatus(object):
+    """A class used to report information about the progress of a 
StreamingQuery.
+
+    .. note:: Experimental
+
+    .. versionadded:: 2.1
+    """
+
+    def __init__(self, jsqs):
+        self._jsqs = jsqs
+
+    def __str__(self):
+        """
+        Pretty string of this query status.
+
+        >>> print(sqs)
+        StreamingQueryStatus:
+            Query name: query
+            Query id: 1
+            Status timestamp: 123
+            Input rate: 15.5 rows/sec
+            Processing rate 23.5 rows/sec
+            Latency: 345.0 ms
+            Trigger details:
+                isDataPresentInTrigger: true
+                isTriggerActive: true
+                latency.getBatch.total: 20
+                latency.getOffset.total: 10
+                numRows.input.total: 100
+                triggerId: 5
+            Source statuses [1 source]:
+                Source 1:    MySource1
+                    Available offset: #0
+                    Input rate: 15.5 rows/sec
+                    Processing rate: 23.5 rows/sec
+                    Trigger details:
+                        numRows.input.source: 100
+                        latency.getOffset.source: 10
+                        latency.getBatch.source: 20
+            Sink status:     MySink
+                Committed offsets: [#1, -]
+        """
+        return self._jsqs.toString()
+
+    @property
+    @ignore_unicode_prefix
+    @since(2.1)
+    def name(self):
+        """
+        Name of the query. This name is unique across all active queries.
+
+        >>> sqs.name
+        u'query'
+        """
+        return self._jsqs.name()
+
+    @property
+    @since(2.1)
+    def id(self):
+        """
+        Id of the query. This id is unique across all queries that have been 
started in
+        the current process.
+
+        >>> int(sqs.id)
+        1
+        """
+        return self._jsqs.id()
+
+    @property
+    @since(2.1)
+    def timestamp(self):
+        """
+        Timestamp (ms) of when this query was generated.
+
+        >>> int(sqs.timestamp)
+        123
+        """
+        return self._jsqs.timestamp()
+
+    @property
+    @since(2.1)
+    def inputRate(self):
+        """
+        Current total rate (rows/sec) at which data is being generated by all 
the sources.
+
+        >>> sqs.inputRate
+        15.5
+        """
+        return self._jsqs.inputRate()
+
+    @property
+    @since(2.1)
+    def processingRate(self):
+        """
+        Current rate (rows/sec) at which the query is processing data from all 
the sources.
+
+        >>> sqs.processingRate
+        23.5
+        """
+        return self._jsqs.processingRate()
+
+    @property
+    @since(2.1)
+    def latency(self):
+        """
+        Current average latency between the data being available in source and 
the sink
+        writing the corresponding output.
+
+        >>> sqs.latency
+        345.0
+        """
+        if (self._jsqs.latency().nonEmpty()):
+            return self._jsqs.latency().get()
+        else:
+            return None
+
+    @property
+    @ignore_unicode_prefix
+    @since(2.1)
+    def sourceStatuses(self):
+        """
+        Current statuses of the sources as a list.
+
+        >>> len(sqs.sourceStatuses)
+        1
+        >>> sqs.sourceStatuses[0].description
+        u'MySource1'
+        """
+        return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]
+
+    @property
+    @ignore_unicode_prefix
+    @since(2.1)
+    def sinkStatus(self):
+        """
+        Current status of the sink.
+
+        >>> sqs.sinkStatus.description
+        u'MySink'
+        """
+        return SinkStatus(self._jsqs.sinkStatus())
+
+    @property
+    @ignore_unicode_prefix
+    @since(2.1)
+    def triggerDetails(self):
+        """
+        Low-level details of the currently active trigger (e.g. number of rows 
processed
+        in trigger, latency of intermediate steps, etc.).
+
+        If no trigger is currently active, then it will have details of the 
last completed trigger.
+
+        >>> sqs.triggerDetails
+        {u'triggerId': u'5', u'latency.getBatch.total': u'20', 
u'numRows.input.total': u'100',
+        u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
+        u'isDataPresentInTrigger': u'true'}
+        """
+        return self._jsqs.triggerDetails()
+
+
+class SourceStatus(object):
+    """
+    Status and metrics of a streaming Source.
+
+    .. note:: Experimental
+
+    .. versionadded:: 2.1
+    """
+
+    def __init__(self, jss):
+        self._jss = jss
+
+    def __str__(self):
+        """
+        Pretty string of this source status.
+
+        >>> print(sqs.sourceStatuses[0])
+        SourceStatus:    MySource1
+            Available offset: #0
+            Input rate: 15.5 rows/sec
+            Processing rate: 23.5 rows/sec
+            Trigger details:
+                numRows.input.source: 100
+                latency.getOffset.source: 10
+                latency.getBatch.source: 20
+        """
+        return self._jss.toString()
+
+    @property
+    @ignore_unicode_prefix
+    @since(2.1)
+    def description(self):
+        """
+        Description of the source corresponding to this status.
+
+        >>> sqs.sourceStatuses[0].description
+        u'MySource1'
+        """
+        return self._jss.description()
+
+    @property
+    @ignore_unicode_prefix
+    @since(2.1)
+    def offsetDesc(self):
+        """
+        Description of the current offset if known.
+
+        >>> sqs.sourceStatuses[0].offsetDesc
+        u'#0'
+        """
+        return self._jss.offsetDesc()
+
+    @property
+    @since(2.1)
+    def inputRate(self):
+        """
+        Current rate (rows/sec) at which data is being generated by the source.
+
+        >>> sqs.sourceStatuses[0].inputRate
+        15.5
+        """
+        return self._jss.inputRate()
+
+    @property
+    @since(2.1)
+    def processingRate(self):
+        """
+        Current rate (rows/sec) at which the query is processing data from the 
source.
+
+        >>> sqs.sourceStatuses[0].processingRate
+        23.5
+        """
+        return self._jss.processingRate()
+
+    @property
+    @ignore_unicode_prefix
+    @since(2.1)
+    def triggerDetails(self):
+        """
+        Low-level details of the currently active trigger (e.g. number of rows 
processed
+        in trigger, latency of intermediate steps, etc.).
+
+        If no trigger is currently active, then it will have details of the 
last completed trigger.
+
+        >>> sqs.sourceStatuses[0].triggerDetails
+        {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
+        u'latency.getBatch.source': u'20'}
+       """
+        return self._jss.triggerDetails()
+
+
+class SinkStatus(object):
+    """
+    Status and metrics of a streaming Sink.
+
+    .. note:: Experimental
+
+    .. versionadded:: 2.1
+    """
+
+    def __init__(self, jss):
+        self._jss = jss
+
+    def __str__(self):
+        """
+        Pretty string of this source status.
+
+        >>> print(sqs.sinkStatus)
+        SinkStatus:    MySink
+            Committed offsets: [#1, -]
+        """
+        return self._jss.toString()
+
+    @property
+    @ignore_unicode_prefix
+    @since(2.1)
+    def description(self):
+        """
+        Description of the source corresponding to this status.
+
+        >>> sqs.sinkStatus.description
+        u'MySink'
+        """
+        return self._jss.description()
+
+    @property
+    @ignore_unicode_prefix
+    @since(2.1)
+    def offsetDesc(self):
+        """
+        Description of the current offsets up to which data has been written 
by the sink.
+
+        >>> sqs.sinkStatus.offsetDesc
+        u'[#1, -]'
+        """
+        return self._jss.offsetDesc()
+
+
 class Trigger(object):
     """Used to indicate how often results should be produced by a 
:class:`StreamingQuery`.
 
@@ -751,11 +1049,14 @@ def _test():
     globs['sdf_schema'] = StructType([StructField("data", StringType(), 
False)])
     globs['df'] = \
         
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
+    globs['sqs'] = StreamingQueryStatus(
+        
spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())
 
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.streaming, globs=globs,
         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | 
doctest.REPORT_NDIFF)
     globs['spark'].stop()
+
     if failure_count:
         exit(-1)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index eeccba7..931d14d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -159,6 +159,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   }
 
   /**
+   * Returns a Seq containing the leaves in this tree.
+   */
+  def collectLeaves(): Seq[BaseType] = {
+    this.collect { case p if p.children.isEmpty => p }
+  }
+
+  /**
    * Finds and returns the first [[TreeNode]] of the tree for which the given 
partial function
    * is defined (pre-order), and applies the partial function to it.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 9f53a99..c998e04 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -57,10 +57,13 @@ case class LocalTableScanExec(
   }
 
   override def executeCollect(): Array[InternalRow] = {
+    longMetric("numOutputRows").add(unsafeRows.size)
     unsafeRows
   }
 
   override def executeTake(limit: Int): Array[InternalRow] = {
-    unsafeRows.take(limit)
+    val taken = unsafeRows.take(limit)
+    longMetric("numOutputRows").add(taken.size)
+    taken
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 4d0283f..587ea7d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.execution.SparkPlan
 
@@ -56,7 +57,12 @@ case class StateStoreRestoreExec(
     child: SparkPlan)
   extends execution.UnaryExecNode with StatefulOperator {
 
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+
   override protected def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+
     child.execute().mapPartitionsWithStateStore(
       getStateId.checkpointLocation,
       operatorId = getStateId.operatorId,
@@ -69,6 +75,7 @@ case class StateStoreRestoreExec(
         iter.flatMap { row =>
           val key = getKey(row)
           val savedState = store.get(key)
+          numOutputRows += 1
           row +: savedState.toSeq
         }
     }
@@ -86,7 +93,13 @@ case class StateStoreSaveExec(
     child: SparkPlan)
   extends execution.UnaryExecNode with StatefulOperator {
 
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
total state rows"),
+    "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
updated state rows"))
+
   override protected def doExecute(): RDD[InternalRow] = {
+    metrics // force lazy init at driver
     assert(returnAllStates.nonEmpty,
       "Incorrect planning in IncrementalExecution, returnAllStates have not 
been set")
     val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ else 
saveAndReturnUpdated _
@@ -111,6 +124,10 @@ case class StateStoreSaveExec(
   private def saveAndReturnUpdated(
       store: StateStore,
       iter: Iterator[InternalRow]): Iterator[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    val numTotalStateRows = longMetric("numTotalStateRows")
+    val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
     new Iterator[InternalRow] {
       private[this] val baseIterator = iter
       private[this] val getKey = 
GenerateUnsafeProjection.generate(keyExpressions, child.output)
@@ -118,6 +135,7 @@ case class StateStoreSaveExec(
       override def hasNext: Boolean = {
         if (!baseIterator.hasNext) {
           store.commit()
+          numTotalStateRows += store.numKeys()
           false
         } else {
           true
@@ -128,6 +146,8 @@ case class StateStoreSaveExec(
         val row = baseIterator.next().asInstanceOf[UnsafeRow]
         val key = getKey(row)
         store.put(key.copy(), row.copy())
+        numOutputRows += 1
+        numUpdatedStateRows += 1
         row
       }
     }
@@ -142,12 +162,21 @@ case class StateStoreSaveExec(
       store: StateStore,
       iter: Iterator[InternalRow]): Iterator[InternalRow] = {
     val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+    val numOutputRows = longMetric("numOutputRows")
+    val numTotalStateRows = longMetric("numTotalStateRows")
+    val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
     while (iter.hasNext) {
       val row = iter.next().asInstanceOf[UnsafeRow]
       val key = getKey(row)
       store.put(key.copy(), row.copy())
+      numUpdatedStateRows += 1
     }
     store.commit()
-    store.iterator().map(_._2.asInstanceOf[InternalRow])
+    numTotalStateRows += store.numKeys()
+    store.iterator().map { case (k, v) =>
+      numOutputRows += 1
+      v.asInstanceOf[InternalRow]
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8e0688d..6330e0a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
@@ -57,6 +57,7 @@ class StreamExecution(
   extends StreamingQuery with Logging {
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
+  import StreamMetrics._
 
   private val pollingDelayMs = 
sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY)
 
@@ -105,11 +106,22 @@ class StreamExecution(
   var lastExecution: QueryExecution = null
 
   @volatile
-  var streamDeathCause: StreamingQueryException = null
+  private var streamDeathCause: StreamingQueryException = null
 
   /* Get the call site in the caller thread; will pass this into the micro 
batch thread */
   private val callSite = Utils.getCallSite()
 
+  /** Metrics for this query */
+  private val streamMetrics =
+    new StreamMetrics(uniqueSources.toSet, triggerClock, 
s"StructuredStreaming.$name")
+
+  @volatile
+  private var currentStatus: StreamingQueryStatus = null
+
+  /** Flag that signals whether any error with input metrics have already been 
logged */
+  @volatile
+  private var metricWarningLogged: Boolean = false
+
   /**
    * The thread that runs the micro-batches of this stream. Note that this 
thread must be
    * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential 
deadlocks in using
@@ -136,16 +148,14 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
+  /** Returns the current status of the query. */
+  override def status: StreamingQueryStatus = currentStatus
+
   /** Returns current status of all the sources. */
-  override def sourceStatuses: Array[SourceStatus] = {
-    val localAvailableOffsets = availableOffsets
-    sources.map(s =>
-      new SourceStatus(s.toString, 
localAvailableOffsets.get(s).map(_.toString))).toArray
-  }
+  override def sourceStatuses: Array[SourceStatus] = 
currentStatus.sourceStatuses.toArray
 
   /** Returns current status of the sink. */
-  override def sinkStatus: SinkStatus =
-    new SinkStatus(sink.toString, 
committedOffsets.toCompositeOffset(sources).toString)
+  override def sinkStatus: SinkStatus = currentStatus.sinkStatus
 
   /** Returns the [[StreamingQueryException]] if the query was terminated by 
an exception. */
   override def exception: Option[StreamingQueryException] = 
Option(streamDeathCause)
@@ -176,7 +186,11 @@ class StreamExecution(
       // Mark ACTIVE and then post the event. QueryStarted event is 
synchronously sent to listeners,
       // so must mark this as ACTIVE first.
       state = ACTIVE
-      postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw 
exception.
+      if (sparkSession.conf.get(SQLConf.STREAMING_METRICS_ENABLED)) {
+        
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
+      }
+      updateStatus()
+      postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw 
exception.
 
       // Unblock starting thread
       startLatch.countDown()
@@ -185,25 +199,41 @@ class StreamExecution(
       SparkSession.setActiveSession(sparkSession)
 
       triggerExecutor.execute(() => {
-        if (isActive) {
-          if (currentBatchId < 0) {
-            // We'll do this initialization only once
-            populateStartOffsets()
-            logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+        streamMetrics.reportTriggerStarted(currentBatchId)
+        streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Finding new data 
from sources")
+        updateStatus()
+        val isTerminated = reportTimeTaken(TRIGGER_LATENCY) {
+          if (isActive) {
+            if (currentBatchId < 0) {
+              // We'll do this initialization only once
+              populateStartOffsets()
+              logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+            } else {
+              constructNextBatch()
+            }
+            if (dataAvailable) {
+              streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, 
true)
+              streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Processing 
new data")
+              updateStatus()
+              runBatch()
+              // We'll increase currentBatchId after we complete processing 
current batch's data
+              currentBatchId += 1
+            } else {
+              streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, 
false)
+              streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "No new data")
+              updateStatus()
+              Thread.sleep(pollingDelayMs)
+            }
+            true
           } else {
-            constructNextBatch()
+            false
           }
-          if (dataAvailable) {
-            runBatch()
-            // We'll increase currentBatchId after we complete processing 
current batch's data
-            currentBatchId += 1
-          } else {
-            Thread.sleep(pollingDelayMs)
-          }
-          true
-        } else {
-          false
         }
+        // Update metrics and notify others
+        streamMetrics.reportTriggerFinished()
+        updateStatus()
+        postEvent(new QueryProgress(currentStatus))
+        isTerminated
       })
     } catch {
       case _: InterruptedException if state == TERMINATED => // interrupted by 
stop()
@@ -221,8 +251,16 @@ class StreamExecution(
         }
     } finally {
       state = TERMINATED
+
+      // Update metrics and status
+      streamMetrics.stop()
+      sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
+      updateStatus()
+
+      // Notify others
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)
-      postEvent(new QueryTerminated(this.toInfo, 
exception.map(_.cause).map(Utils.exceptionString)))
+      postEvent(
+        new QueryTerminated(currentStatus, 
exception.map(_.cause).map(Utils.exceptionString)))
       terminationLatch.countDown()
     }
   }
@@ -248,7 +286,6 @@ class StreamExecution(
             committedOffsets = lastOffsets.toStreamProgress(sources)
             logDebug(s"Resuming with committed offsets: $committedOffsets")
         }
-
       case None => // We are starting this stream for the first time.
         logInfo(s"Starting new streaming query.")
         currentBatchId = 0
@@ -278,8 +315,14 @@ class StreamExecution(
     val hasNewData = {
       awaitBatchLock.lock()
       try {
-        val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
-        availableOffsets ++= newData
+        reportTimeTaken(GET_OFFSET_LATENCY) {
+          val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { 
s =>
+            reportTimeTaken(s, SOURCE_GET_OFFSET_LATENCY) {
+              (s, s.getOffset)
+            }
+          }.toMap
+          availableOffsets ++= latestOffsets.filter { case (s, o) => 
o.nonEmpty }.mapValues(_.get)
+        }
 
         if (dataAvailable) {
           true
@@ -292,16 +335,19 @@ class StreamExecution(
       }
     }
     if (hasNewData) {
-      assert(offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
-        s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
-      logInfo(s"Committed offsets for batch $currentBatchId.")
-
-      // Now that we have logged the new batch, no further processing will 
happen for
-      // the previous batch, and it is safe to discard the old metadata.
-      // Note that purge is exclusive, i.e. it purges everything before 
currentBatchId.
-      // NOTE: If StreamExecution implements pipeline parallelism (multiple 
batches in
-      // flight at the same time), this cleanup logic will need to change.
-      offsetLog.purge(currentBatchId)
+      reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
+        assert(
+          offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
+          s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
+        logInfo(s"Committed offsets for batch $currentBatchId.")
+
+        // Now that we have logged the new batch, no further processing will 
happen for
+        // the previous batch, and it is safe to discard the old metadata.
+        // Note that purge is exclusive, i.e. it purges everything before 
currentBatchId.
+        // NOTE: If StreamExecution implements pipeline parallelism (multiple 
batches in
+        // flight at the same time), this cleanup logic will need to change.
+        offsetLog.purge(currentBatchId)
+      }
     } else {
       awaitBatchLock.lock()
       try {
@@ -311,26 +357,30 @@ class StreamExecution(
         awaitBatchLock.unlock()
       }
     }
+    reportTimestamp(GET_OFFSET_TIMESTAMP)
   }
 
   /**
    * Processes any data available between `availableOffsets` and 
`committedOffsets`.
    */
   private def runBatch(): Unit = {
-    val startTime = System.nanoTime()
-
     // TODO: Move this to IncrementalExecution.
 
     // Request unprocessed data from all sources.
-    val newData = availableOffsets.flatMap {
-      case (source, available)
+    val newData = reportTimeTaken(GET_BATCH_LATENCY) {
+      availableOffsets.flatMap {
+        case (source, available)
           if committedOffsets.get(source).map(_ != available).getOrElse(true) 
=>
-        val current = committedOffsets.get(source)
-        val batch = source.getBatch(current, available)
-        logDebug(s"Retrieving data from $source: $current -> $available")
-        Some(source -> batch)
-      case _ => None
-    }.toMap
+          val current = committedOffsets.get(source)
+          val batch = reportTimeTaken(source, SOURCE_GET_BATCH_LATENCY) {
+            source.getBatch(current, available)
+          }
+          logDebug(s"Retrieving data from $source: $current -> $available")
+          Some(source -> batch)
+        case _ => None
+      }
+    }
+    reportTimestamp(GET_BATCH_TIMESTAMP)
 
     // A list of attributes that will need to be updated.
     var replacements = new ArrayBuffer[(Attribute, Attribute)]
@@ -351,25 +401,24 @@ class StreamExecution(
 
     // Rewire the plan to use the new attributes that were returned by the 
source.
     val replacementMap = AttributeMap(replacements)
-    val newPlan = withNewSources transformAllExpressions {
+    val triggerLogicalPlan = withNewSources transformAllExpressions {
       case a: Attribute if replacementMap.contains(a) => replacementMap(a)
     }
 
-    val optimizerStart = System.nanoTime()
-    lastExecution = new IncrementalExecution(
-      sparkSession,
-      newPlan,
-      outputMode,
-      checkpointFile("state"),
-      currentBatchId)
-
-    lastExecution.executedPlan
-    val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
-    logDebug(s"Optimized batch in ${optimizerTime}ms")
+    val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) {
+      lastExecution = new IncrementalExecution(
+        sparkSession,
+        triggerLogicalPlan,
+        outputMode,
+        checkpointFile("state"),
+        currentBatchId)
+      lastExecution.executedPlan // Force the lazy generation of execution plan
+    }
 
     val nextBatch =
       new Dataset(sparkSession, lastExecution, 
RowEncoder(lastExecution.analyzed.schema))
     sink.addBatch(currentBatchId, nextBatch)
+    reportNumRows(executedPlan, triggerLogicalPlan, newData)
 
     awaitBatchLock.lock()
     try {
@@ -379,11 +428,8 @@ class StreamExecution(
       awaitBatchLock.unlock()
     }
 
-    val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
-    logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
     // Update committed offsets.
     committedOffsets ++= availableOffsets
-    postEvent(new QueryProgress(this.toInfo))
   }
 
   private def postEvent(event: StreamingQueryListener.Event) {
@@ -516,12 +562,131 @@ class StreamExecution(
      """.stripMargin
   }
 
-  private def toInfo: StreamingQueryInfo = {
-    new StreamingQueryInfo(
-      this.name,
-      this.id,
-      this.sourceStatuses,
-      this.sinkStatus)
+  /**
+   * Report row metrics of the executed trigger
+   * @param triggerExecutionPlan Execution plan of the trigger
+   * @param triggerLogicalPlan Logical plan of the trigger, generated from the 
query logical plan
+   * @param sourceToDF Source to DataFrame returned by the source.getBatch in 
this trigger
+   */
+  private def reportNumRows(
+      triggerExecutionPlan: SparkPlan,
+      triggerLogicalPlan: LogicalPlan,
+      sourceToDF: Map[Source, DataFrame]): Unit = {
+    // We want to associate execution plan leaves to sources that generate 
them, so that we match
+    // the their metrics (e.g. numOutputRows) to the sources. To do this we do 
the following.
+    // Consider the translation from the streaming logical plan to the final 
executed plan.
+    //
+    //  streaming logical plan (with sources) <==> trigger's logical plan <==> 
executed plan
+    //
+    // 1. We keep track of streaming sources associated with each leaf in the 
trigger's logical plan
+    //    - Each logical plan leaf will be associated with a single streaming 
source.
+    //    - There can be multiple logical plan leaves associated with a 
streaming source.
+    //    - There can be leaves not associated with any streaming source, 
because they were
+    //      generated from a batch source (e.g. stream-batch joins)
+    //
+    // 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
+    //    the trigger logical plan, we associate executed plan leaves with 
corresponding
+    //    streaming sources.
+    //
+    // 3. For each source, we sum the metrics of the associated execution plan 
leaves.
+    //
+    val logicalPlanLeafToSource = sourceToDF.flatMap { case (source, df) =>
+      df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
+    }
+    val allLogicalPlanLeaves = triggerLogicalPlan.collectLeaves() // includes 
non-streaming sources
+    val allExecPlanLeaves = triggerExecutionPlan.collectLeaves()
+    val sourceToNumInputRows: Map[Source, Long] =
+      if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
+        val execLeafToSource = 
allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
+          case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep 
-> source }
+        }
+        val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, 
source) =>
+          val numRows = 
execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+          source -> numRows
+        }
+        sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum 
up rows for each source
+      } else {
+        if (!metricWarningLogged) {
+          def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), 
${seq.mkString(", ")}"
+          logWarning(
+            "Could not report metrics as number leaves in trigger logical plan 
did not match that" +
+              s" of the execution plan:\n" +
+              s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
+              s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
+          metricWarningLogged = true
+        }
+        Map.empty
+      }
+    val numOutputRows = 
triggerExecutionPlan.metrics.get("numOutputRows").map(_.value)
+    val stateNodes = triggerExecutionPlan.collect {
+      case p if p.isInstanceOf[StateStoreSaveExec] => p
+    }
+
+    streamMetrics.reportNumInputRows(sourceToNumInputRows)
+    stateNodes.zipWithIndex.foreach { case (s, i) =>
+      streamMetrics.reportTriggerDetail(
+        NUM_TOTAL_STATE_ROWS(i + 1),
+        s.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L))
+      streamMetrics.reportTriggerDetail(
+        NUM_UPDATED_STATE_ROWS(i + 1),
+        s.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
+    }
+    updateStatus()
+  }
+
+  private def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
+    val startTime = triggerClock.getTimeMillis()
+    val result = body
+    val endTime = triggerClock.getTimeMillis()
+    val timeTaken = math.max(endTime - startTime, 0)
+    streamMetrics.reportTriggerDetail(triggerDetailKey, timeTaken)
+    updateStatus()
+    if (triggerDetailKey == TRIGGER_LATENCY) {
+      logInfo(s"Completed up to $availableOffsets in $timeTaken ms")
+    }
+    result
+  }
+
+  private def reportTimeTaken[T](source: Source, triggerDetailKey: 
String)(body: => T): T = {
+    val startTime = triggerClock.getTimeMillis()
+    val result = body
+    val endTime = triggerClock.getTimeMillis()
+    streamMetrics.reportSourceTriggerDetail(
+      source, triggerDetailKey, math.max(endTime - startTime, 0))
+    updateStatus()
+    result
+  }
+
+  private def reportTimestamp(triggerDetailKey: String): Unit = {
+    streamMetrics.reportTriggerDetail(triggerDetailKey, 
triggerClock.getTimeMillis)
+    updateStatus()
+  }
+
+  private def updateStatus(): Unit = {
+    val localAvailableOffsets = availableOffsets
+    val sourceStatuses = sources.map { s =>
+      SourceStatus(
+        s.toString,
+        localAvailableOffsets.get(s).map(_.toString).getOrElse("-"), // TODO: 
use json if available
+        streamMetrics.currentSourceInputRate(s),
+        streamMetrics.currentSourceProcessingRate(s),
+        streamMetrics.currentSourceTriggerDetails(s))
+    }.toArray
+    val sinkStatus = SinkStatus(
+      sink.toString,
+      committedOffsets.toCompositeOffset(sources).toString)
+
+    currentStatus =
+      StreamingQueryStatus(
+        name = name,
+        id = id,
+        timestamp = triggerClock.getTimeMillis(),
+        inputRate = streamMetrics.currentInputRate(),
+        processingRate = streamMetrics.currentProcessingRate(),
+        latency = streamMetrics.currentLatency(),
+        sourceStatuses = sourceStatuses,
+        sinkStatus = sinkStatus,
+        triggerDetails = streamMetrics.currentTriggerDetails())
   }
 
   trait State

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
new file mode 100644
index 0000000..e98d188
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.{util => ju}
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+/**
+ * Class that manages all the metrics related to a StreamingQuery. It does the 
following.
+ * - Calculates metrics (rates, latencies, etc.) based on information reported 
by StreamExecution.
+ * - Allows the current metric values to be queried
+ * - Serves some of the metrics through Codahale/DropWizard metrics
+ *
+ * @param sources Unique set of sources in a query
+ * @param triggerClock Clock used for triggering in StreamExecution
+ * @param codahaleSourceName Root name for all the Codahale metrics
+ */
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerDetails = new mutable.HashMap[String, String]
+  private val sourceTriggerDetails = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // =========== Initialization ===========
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
+  registerGauge("inputRate-total", currentInputRate)
+  registerGauge("processingRate-total", () => currentProcessingRate)
+  registerGauge("latency", () => currentLatency().getOrElse(-1.0))
+
+  sources.foreach { s =>
+    inputRates.put(s, new RateCalculator)
+    processingRates.put(s, new RateCalculator)
+    sourceTriggerDetails.put(s, new mutable.HashMap[String, String])
+
+    registerGauge(s"inputRate-${s.toString}", () => currentSourceInputRate(s))
+    registerGauge(s"processingRate-${s.toString}", () => 
currentSourceProcessingRate(s))
+  }
+
+  // =========== Setter methods ===========
+
+  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+    numInputRows.clear()
+    triggerDetails.clear()
+    sourceTriggerDetails.values.foreach(_.clear())
+
+    reportTriggerDetail(TRIGGER_ID, triggerId)
+    sources.foreach(s => reportSourceTriggerDetail(s, TRIGGER_ID, triggerId))
+    reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
+    currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+    reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
+  }
+
+  def reportTriggerDetail[T](key: String, value: T): Unit = synchronized {
+    triggerDetails.put(key, value.toString)
+  }
+
+  def reportSourceTriggerDetail[T](source: Source, key: String, value: T): 
Unit = synchronized {
+    sourceTriggerDetails(source).put(key, value.toString)
+  }
+
+  def reportNumInputRows(inputRows: Map[Source, Long]): Unit = synchronized {
+    numInputRows ++= inputRows
+  }
+
+  def reportTriggerFinished(): Unit = synchronized {
+    require(currentTriggerStartTimestamp >= 0)
+    val currentTriggerFinishTimestamp = triggerClock.getTimeMillis()
+    reportTriggerDetail(FINISH_TIMESTAMP, currentTriggerFinishTimestamp)
+    triggerDetails.remove(STATUS_MESSAGE)
+    reportTriggerDetail(IS_TRIGGER_ACTIVE, false)
+
+    // Report number of rows
+    val totalNumInputRows = numInputRows.values.sum
+    reportTriggerDetail(NUM_INPUT_ROWS, totalNumInputRows)
+    numInputRows.foreach { case (s, r) =>
+      reportSourceTriggerDetail(s, NUM_SOURCE_INPUT_ROWS, r)
+    }
+
+    val currentTriggerDuration = currentTriggerFinishTimestamp - 
currentTriggerStartTimestamp
+    val previousInputIntervalOption = if (previousTriggerStartTimestamp >= 0) {
+      Some(currentTriggerStartTimestamp - previousTriggerStartTimestamp)
+    } else None
+
+    // Update input rate = num rows received by each source during the 
previous trigger interval
+    // Interval is measures as interval between start times of previous and 
current trigger.
+    //
+    // TODO: Instead of trigger start, we should use time when getOffset was 
called on each source
+    // as this may be different for each source if there are many sources in 
the query plan
+    // and getOffset is called serially on them.
+    if (previousInputIntervalOption.nonEmpty) {
+      sources.foreach { s =>
+        inputRates(s).update(numInputRows.getOrElse(s, 0), 
previousInputIntervalOption.get)
+      }
+    }
+
+    // Update processing rate = num rows processed for each source in current 
trigger duration
+    sources.foreach { s =>
+      processingRates(s).update(numInputRows.getOrElse(s, 0), 
currentTriggerDuration)
+    }
+
+    // Update latency = if data present, 0.5 * previous trigger interval + 
current trigger duration
+    if (previousInputIntervalOption.nonEmpty && totalNumInputRows > 0) {
+      latency = Some((previousInputIntervalOption.get.toDouble / 2) + 
currentTriggerDuration)
+    } else {
+      latency = None
+    }
+
+    previousTriggerStartTimestamp = currentTriggerStartTimestamp
+    currentTriggerStartTimestamp = -1
+  }
+
+  // =========== Getter methods ===========
+
+  def currentInputRate(): Double = synchronized {
+    // Since we are calculating source input rates using the same time 
interval for all sources
+    // it is fine to calculate total input rate as the sum of per source input 
rate.
+    inputRates.map(_._2.currentRate).sum
+  }
+
+  def currentSourceInputRate(source: Source): Double = synchronized {
+    inputRates(source).currentRate
+  }
+
+  def currentProcessingRate(): Double = synchronized {
+    // Since we are calculating source processing rates using the same time 
interval for all sources
+    // it is fine to calculate total processing rate as the sum of per source 
processing rate.
+    processingRates.map(_._2.currentRate).sum
+  }
+
+  def currentSourceProcessingRate(source: Source): Double = synchronized {
+    processingRates(source).currentRate
+  }
+
+  def currentLatency(): Option[Double] = synchronized { latency }
+
+  def currentTriggerDetails(): Map[String, String] = synchronized { 
triggerDetails.toMap }
+
+  def currentSourceTriggerDetails(source: Source): Map[String, String] = 
synchronized {
+    sourceTriggerDetails(source).toMap
+  }
+
+  // =========== Other methods ===========
+
+  private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {
+    synchronized {
+      metricRegistry.register(name, new Gauge[T] {
+        override def getValue: T = f()
+      })
+    }
+  }
+
+  def stop(): Unit = synchronized {
+    triggerDetails.clear()
+    inputRates.valuesIterator.foreach { _.stop() }
+    processingRates.valuesIterator.foreach { _.stop() }
+    latency = None
+  }
+}
+
+object StreamMetrics extends Logging {
+  /** Simple utility class to calculate rate while avoiding DivideByZero */
+  class RateCalculator {
+    @volatile private var rate: Option[Double] = None
+
+    def update(numRows: Long, timeGapMs: Long): Unit = {
+      if (timeGapMs > 0) {
+        rate = Some(numRows.toDouble * 1000 / timeGapMs)
+      } else {
+        rate = None
+        logDebug(s"Rate updates cannot with zero or negative time gap 
$timeGapMs")
+      }
+    }
+
+    def currentRate: Double = rate.getOrElse(0.0)
+
+    def stop(): Unit = { rate = None }
+  }
+
+
+  val TRIGGER_ID = "triggerId"
+  val IS_TRIGGER_ACTIVE = "isTriggerActive"
+  val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
+  val STATUS_MESSAGE = "statusMessage"
+
+  val START_TIMESTAMP = "timestamp.triggerStart"
+  val GET_OFFSET_TIMESTAMP = "timestamp.afterGetOffset"
+  val GET_BATCH_TIMESTAMP = "timestamp.afterGetBatch"
+  val FINISH_TIMESTAMP = "timestamp.triggerFinish"
+
+  val GET_OFFSET_LATENCY = "latency.getOffset.total"
+  val GET_BATCH_LATENCY = "latency.getBatch.total"
+  val OFFSET_WAL_WRITE_LATENCY = "latency.offsetLogWrite"
+  val OPTIMIZER_LATENCY = "latency.optimizer"
+  val TRIGGER_LATENCY = "latency.fullTrigger"
+  val SOURCE_GET_OFFSET_LATENCY = "latency.getOffset.source"
+  val SOURCE_GET_BATCH_LATENCY = "latency.getBatch.source"
+
+  val NUM_INPUT_ROWS = "numRows.input.total"
+  val NUM_SOURCE_INPUT_ROWS = "numRows.input.source"
+  def NUM_TOTAL_STATE_ROWS(aggId: Int): String = 
s"numRows.state.aggregation$aggId.total"
+  def NUM_UPDATED_STATE_ROWS(aggId: Int): String = 
s"numRows.state.aggregation$aggId.updated"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index e37f0c7..53eebae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -112,6 +112,11 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: 
SQLContext)
   }
 
   override def stop() {}
+
+  def reset(): Unit = synchronized {
+    batches.clear()
+    currentOffset = new LongOffset(-1)
+  }
 }
 
 /**
@@ -165,6 +170,8 @@ class MemorySink(val schema: StructType, outputMode: 
OutputMode) extends Sink wi
       logDebug(s"Skipping already committed batch: $batchId")
     }
   }
+
+  override def toString(): String = "MemorySink"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3335755..dce5349 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -197,6 +197,8 @@ private[state] class HDFSBackedStateStoreProvider(
       allUpdates.values().asScala.toIterator
     }
 
+    override def numKeys(): Long = mapToUpdate.size()
+
     /**
      * Whether all updates have been committed
      */

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index a67fdce..7132e28 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -77,6 +77,9 @@ trait StateStore {
    */
   def updates(): Iterator[StoreUpdate]
 
+  /** Number of keys in the state store */
+  def numKeys(): Long
+
   /**
    * Whether all updates have been committed
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 22f29c7..452eeed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -582,6 +582,12 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(10L)
 
+  val STREAMING_METRICS_ENABLED =
+    SQLConfigBuilder("spark.sql.streaming.metricsEnabled")
+      .doc("Whether Dropwizard/Codahale metrics will be reported for active 
streaming queries.")
+      .booleanConf
+      .createWithDefault(false)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
index de1efe9..c991166 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -18,17 +18,33 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
 
 /**
  * :: Experimental ::
- * Status and metrics of a streaming [[Sink]].
+ * Status and metrics of a streaming sink.
  *
- * @param description Description of the source corresponding to this status
- * @param offsetDesc Description of the current offset up to which data has 
been written by the sink
+ * @param description Description of the source corresponding to this status.
+ * @param offsetDesc Description of the current offsets up to which data has 
been written
+ *                   by the sink.
  * @since 2.0.0
  */
 @Experimental
-class SinkStatus private[sql](
+class SinkStatus private(
     val description: String,
-    val offsetDesc: String)
+    val offsetDesc: String) {
+
+  override def toString: String =
+    "SinkStatus:" + indent(prettyString)
+
+  private[sql] def prettyString: String = {
+    s"""$description
+       |Committed offsets: $offsetDesc
+       |""".stripMargin
+  }
+}
+
+/** Companion object, primarily for creating SinkStatus instances internally */
+private[sql] object SinkStatus {
+  def apply(desc: String, offsetDesc: String): SinkStatus = new 
SinkStatus(desc, offsetDesc)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
index bd0c848..6ace483 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
@@ -17,18 +17,60 @@
 
 package org.apache.spark.sql.streaming
 
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.Source
+import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
 
 /**
  * :: Experimental ::
- * Status and metrics of a streaming [[Source]].
+ * Status and metrics of a streaming Source.
  *
- * @param description Description of the source corresponding to this status
- * @param offsetDesc Description of the current [[Source]] offset if known
+ * @param description Description of the source corresponding to this status.
+ * @param offsetDesc Description of the current offset if known.
+ * @param inputRate Current rate (rows/sec) at which data is being generated 
by the source.
+ * @param processingRate Current rate (rows/sec) at which the query is 
processing data from
+ *                       the source.
+ * @param triggerDetails Low-level details of the currently active trigger 
(e.g. number of
+ *                      rows processed in trigger, latency of intermediate 
steps, etc.).
+ *                      If no trigger is active, then it will have details of 
the last completed
+ *                      trigger.
  * @since 2.0.0
  */
 @Experimental
-class SourceStatus private[sql] (
+class SourceStatus private(
     val description: String,
-    val offsetDesc: Option[String])
+    val offsetDesc: String,
+    val inputRate: Double,
+    val processingRate: Double,
+    val triggerDetails: ju.Map[String, String]) {
+
+  override def toString: String =
+    "SourceStatus:" + indent(prettyString)
+
+  private[sql] def prettyString: String = {
+    val triggerDetailsLines =
+      triggerDetails.asScala.map { case (k, v) => s"$k: $v" }
+    s"""$description
+       |Available offset: $offsetDesc
+       |Input rate: $inputRate rows/sec
+       |Processing rate: $processingRate rows/sec
+       |Trigger details:
+       |""".stripMargin + indent(triggerDetailsLines)
+
+  }
+}
+
+/** Companion object, primarily for creating SourceStatus instances internally 
*/
+private[sql] object SourceStatus {
+  def apply(
+      desc: String,
+      offsetDesc: String,
+      inputRate: Double,
+      processingRate: Double,
+      triggerDetails: Map[String, String]): SourceStatus = {
+    new SourceStatus(desc, offsetDesc, inputRate, processingRate, 
triggerDetails.asJava)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 91f0a1e..0a85414 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -63,12 +63,23 @@ trait StreamingQuery {
   def exception: Option[StreamingQueryException]
 
   /**
+   * Returns the current status of the query.
+   * @since 2.0.2
+   */
+  def status: StreamingQueryStatus
+
+  /**
    * Returns current status of all the sources.
    * @since 2.0.0
    */
+  @deprecated("use status.sourceStatuses", "2.0.2")
   def sourceStatuses: Array[SourceStatus]
 
-  /** Returns current status of the sink. */
+  /**
+   * Returns current status of the sink.
+   * @since 2.0.0
+   */
+  @deprecated("use status.sinkStatus", "2.0.2")
   def sinkStatus: SinkStatus
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
deleted file mode 100644
index 1af2668..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.annotation.Experimental
-
-/**
- * :: Experimental ::
- * A class used to report information about the progress of a 
[[StreamingQuery]].
- *
- * @param name The [[StreamingQuery]] name. This name is unique across all 
active queries.
- * @param id The [[StreamingQuery]] id. This id is unique across
-  *          all queries that have been started in the current process.
- * @param sourceStatuses The current statuses of the [[StreamingQuery]]'s 
sources.
- * @param sinkStatus The current status of the [[StreamingQuery]]'s sink.
- */
-@Experimental
-class StreamingQueryInfo private[sql](
-  val name: String,
-  val id: Long,
-  val sourceStatuses: Seq[SourceStatus],
-  val sinkStatus: SinkStatus)

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 8a8855d..69790e3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -84,7 +84,7 @@ object StreamingQueryListener {
    * @since 2.0.0
    */
   @Experimental
-  class QueryStarted private[sql](val queryInfo: StreamingQueryInfo) extends 
Event
+  class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) 
extends Event
 
   /**
    * :: Experimental ::
@@ -92,19 +92,19 @@ object StreamingQueryListener {
    * @since 2.0.0
    */
   @Experimental
-  class QueryProgress private[sql](val queryInfo: StreamingQueryInfo) extends 
Event
+  class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) 
extends Event
 
   /**
    * :: Experimental ::
    * Event representing that termination of a query
    *
-   * @param queryInfo Information about the status of the query.
+   * @param queryStatus Information about the status of the query.
    * @param exception The exception message of the [[StreamingQuery]] if the 
query was terminated
    *                  with an exception. Otherwise, it will be `None`.
    * @since 2.0.0
    */
   @Experimental
   class QueryTerminated private[sql](
-      val queryInfo: StreamingQueryInfo,
+      val queryStatus: StreamingQueryStatus,
       val exception: Option[String]) extends Event
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
new file mode 100644
index 0000000..4768992
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset}
+
+/**
+ * :: Experimental ::
+ * A class used to report information about the progress of a 
[[StreamingQuery]].
+ *
+ * @param name Name of the query. This name is unique across all active 
queries.
+ * @param id Id of the query. This id is unique across
+ *          all queries that have been started in the current process.
+ * @param timestamp Timestamp (ms) of when this query was generated.
+ * @param inputRate Current rate (rows/sec) at which data is being generated 
by all the sources.
+ * @param processingRate Current rate (rows/sec) at which the query is 
processing data from
+ *                       all the sources.
+ * @param latency  Current average latency between the data being available in 
source and the sink
+ *                   writing the corresponding output.
+ * @param sourceStatuses Current statuses of the sources.
+ * @param sinkStatus Current status of the sink.
+ * @param triggerDetails Low-level details of the currently active trigger 
(e.g. number of
+ *                      rows processed in trigger, latency of intermediate 
steps, etc.).
+ *                      If no trigger is active, then it will have details of 
the last completed
+ *                      trigger.
+ * @since 2.0.0
+ */
+@Experimental
+class StreamingQueryStatus private(
+  val name: String,
+  val id: Long,
+  val timestamp: Long,
+  val inputRate: Double,
+  val processingRate: Double,
+  val latency: Option[Double],
+  val sourceStatuses: Array[SourceStatus],
+  val sinkStatus: SinkStatus,
+  val triggerDetails: ju.Map[String, String]) {
+
+  import StreamingQueryStatus._
+
+  override def toString: String = {
+    val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) =>
+      s"Source ${i + 1}:" + indent(s.prettyString)
+    }
+    val sinkStatusLines = sinkStatus.prettyString
+    val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => 
s"$k: $v" }.toSeq.sorted
+    val numSources = sourceStatuses.length
+    val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" 
else "" }
+
+    val allLines = s"""
+        |Query name: $name
+        |Query id: $id
+        |Status timestamp: $timestamp
+        |Input rate: $inputRate rows/sec
+        |Processing rate $processingRate rows/sec
+        |Latency: ${latency.getOrElse("-")} ms
+        |Trigger details:
+        |${indent(triggerDetailsLines)}
+        |Source statuses [$numSourcesString]:
+        |${indent(sourceStatusLines)}
+        |Sink status: ${indent(sinkStatusLines)}""".stripMargin
+
+    s"StreamingQueryStatus:${indent(allLines)}"
+  }
+}
+
+/** Companion object, primarily for creating StreamingQueryInfo instances 
internally */
+private[sql] object StreamingQueryStatus {
+  def apply(
+      name: String,
+      id: Long,
+      timestamp: Long,
+      inputRate: Double,
+      processingRate: Double,
+      latency: Option[Double],
+      sourceStatuses: Array[SourceStatus],
+      sinkStatus: SinkStatus,
+      triggerDetails: Map[String, String]): StreamingQueryStatus = {
+    new StreamingQueryStatus(name, id, timestamp, inputRate, processingRate,
+      latency, sourceStatuses, sinkStatus, triggerDetails.asJava)
+  }
+
+  def indent(strings: Iterable[String]): String = 
strings.map(indent).mkString("\n")
+  def indent(string: String): String = string.split("\n").map("    " + 
_).mkString("\n")
+
+  /** Create an instance of status for python testing */
+  def testStatus(): StreamingQueryStatus = {
+    import org.apache.spark.sql.execution.streaming.StreamMetrics._
+    StreamingQueryStatus(
+      name = "query",
+      id = 1,
+      timestamp = 123,
+      inputRate = 15.5,
+      processingRate = 23.5,
+      latency = Some(345),
+      sourceStatuses = Array(
+        SourceStatus(
+          desc = "MySource1",
+          offsetDesc = LongOffset(0).toString,
+          inputRate = 15.5,
+          processingRate = 23.5,
+          triggerDetails = Map(
+            NUM_SOURCE_INPUT_ROWS -> "100",
+            SOURCE_GET_OFFSET_LATENCY -> "10",
+            SOURCE_GET_BATCH_LATENCY -> "20"))),
+      sinkStatus = SinkStatus(
+        desc = "MySink",
+        offsetDesc = CompositeOffset(Some(LongOffset(1)) :: None :: 
Nil).toString),
+      triggerDetails = Map(
+        TRIGGER_ID -> "5",
+        IS_TRIGGER_ACTIVE -> "true",
+        IS_DATA_PRESENT_IN_TRIGGER -> "true",
+        GET_OFFSET_LATENCY -> "10",
+        GET_BATCH_LATENCY -> "20",
+        NUM_INPUT_ROWS -> "100"
+      ))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index bba40c6..229d881 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.metric
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.execution.SparkPlanInfo
 import org.apache.spark.sql.execution.ui.SparkPlanGraph
 import org.apache.spark.sql.functions._
@@ -85,6 +86,22 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
+  test("LocalTableScanExec computes metrics in collect and take") {
+    val df1 = spark.createDataset(Seq(1, 2, 3))
+    val logical = df1.queryExecution.logical
+    require(logical.isInstanceOf[LocalRelation])
+    df1.collect()
+    val metrics1 = df1.queryExecution.executedPlan.collectLeaves().head.metrics
+    assert(metrics1.contains("numOutputRows"))
+    assert(metrics1("numOutputRows").value === 3)
+
+    val df2 = spark.createDataset(Seq(1, 2, 3)).limit(2)
+    df2.collect()
+    val metrics2 = df2.queryExecution.executedPlan.collectLeaves().head.metrics
+    assert(metrics2.contains("numOutputRows"))
+    assert(metrics2("numOutputRows").value === 2)
+  }
+
   test("Filter metrics") {
     // Assume the execution plan is
     // PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
new file mode 100644
index 0000000..938423d
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.scalactic.TolerantNumerics
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.ManualClock
+
+class StreamMetricsSuite extends SparkFunSuite {
+  import StreamMetrics._
+
+  // To make === between double tolerate inexact values
+  implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
+
+  test("rates, latencies, trigger details - basic life cycle") {
+    val sm = newStreamMetrics(source)
+    assert(sm.currentInputRate() === 0.0)
+    assert(sm.currentProcessingRate() === 0.0)
+    assert(sm.currentSourceInputRate(source) === 0.0)
+    assert(sm.currentSourceProcessingRate(source) === 0.0)
+    assert(sm.currentLatency() === None)
+    assert(sm.currentTriggerDetails().isEmpty)
+
+    // When trigger started, the rates should not change, but should return
+    // reported trigger details
+    sm.reportTriggerStarted(1)
+    sm.reportTriggerDetail("key", "value")
+    sm.reportSourceTriggerDetail(source, "key2", "value2")
+    assert(sm.currentInputRate() === 0.0)
+    assert(sm.currentProcessingRate() === 0.0)
+    assert(sm.currentSourceInputRate(source) === 0.0)
+    assert(sm.currentSourceProcessingRate(source) === 0.0)
+    assert(sm.currentLatency() === None)
+    assert(sm.currentTriggerDetails() ===
+      Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
+        START_TIMESTAMP -> "0", "key" -> "value"))
+    assert(sm.currentSourceTriggerDetails(source) ===
+      Map(TRIGGER_ID -> "1", "key2" -> "value2"))
+
+    // Finishing the trigger should calculate the rates, except input rate 
which needs
+    // to have another trigger interval
+    sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows, 10 output 
rows
+    clock.advance(1000)
+    sm.reportTriggerFinished()
+    assert(sm.currentInputRate() === 0.0)
+    assert(sm.currentProcessingRate() === 100.0)  // 100 input rows processed 
in 1 sec
+    assert(sm.currentSourceInputRate(source) === 0.0)
+    assert(sm.currentSourceProcessingRate(source) === 100.0)
+    assert(sm.currentLatency() === None)
+    assert(sm.currentTriggerDetails() ===
+      Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
+        START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
+        NUM_INPUT_ROWS -> "100", "key" -> "value"))
+    assert(sm.currentSourceTriggerDetails(source) ===
+      Map(TRIGGER_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> 
"value2"))
+
+    // After another trigger starts, the rates and latencies should not change 
until
+    // new rows are reported
+    clock.advance(1000)
+    sm.reportTriggerStarted(2)
+    assert(sm.currentInputRate() === 0.0)
+    assert(sm.currentProcessingRate() === 100.0)
+    assert(sm.currentSourceInputRate(source) === 0.0)
+    assert(sm.currentSourceProcessingRate(source) === 100.0)
+    assert(sm.currentLatency() === None)
+
+    // Reporting new rows should update the rates and latencies
+    sm.reportNumInputRows(Map(source -> 200L))     // 200 input rows
+    clock.advance(500)
+    sm.reportTriggerFinished()
+    assert(sm.currentInputRate() === 100.0)      // 200 input rows generated 
in 2 seconds b/w starts
+    assert(sm.currentProcessingRate() === 400.0) // 200 output rows processed 
in 0.5 sec
+    assert(sm.currentSourceInputRate(source) === 100.0)
+    assert(sm.currentSourceProcessingRate(source) === 400.0)
+    assert(sm.currentLatency().get === 1500.0)       // 2000 ms / 2 + 500 ms
+
+    // Rates should be set to 0 after stop
+    sm.stop()
+    assert(sm.currentInputRate() === 0.0)
+    assert(sm.currentProcessingRate() === 0.0)
+    assert(sm.currentSourceInputRate(source) === 0.0)
+    assert(sm.currentSourceProcessingRate(source) === 0.0)
+    assert(sm.currentLatency() === None)
+    assert(sm.currentTriggerDetails().isEmpty)
+  }
+
+  test("rates and latencies - after trigger with no data") {
+    val sm = newStreamMetrics(source)
+    // Trigger 1 with data
+    sm.reportTriggerStarted(1)
+    sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows
+    clock.advance(1000)
+    sm.reportTriggerFinished()
+
+    // Trigger 2 with data
+    clock.advance(1000)
+    sm.reportTriggerStarted(2)
+    sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
+    clock.advance(500)
+    sm.reportTriggerFinished()
+
+    // Make sure that all rates are set
+    require(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 
seconds b/w starts
+    require(sm.currentProcessingRate() === 400.0) // 200 output rows processed 
in 0.5 sec
+    require(sm.currentSourceInputRate(source) === 100.0)
+    require(sm.currentSourceProcessingRate(source) === 400.0)
+    require(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms
+
+    // Trigger 3 with data
+    clock.advance(500)
+    sm.reportTriggerStarted(3)
+    clock.advance(500)
+    sm.reportTriggerFinished()
+
+    // Rates are set to zero and latency is set to None
+    assert(sm.currentInputRate() === 0.0)
+    assert(sm.currentProcessingRate() === 0.0)
+    assert(sm.currentSourceInputRate(source) === 0.0)
+    assert(sm.currentSourceProcessingRate(source) === 0.0)
+    assert(sm.currentLatency() === None)
+    sm.stop()
+  }
+
+  test("rates - after trigger with multiple sources, and one source having no 
info") {
+    val source1 = TestSource(1)
+    val source2 = TestSource(2)
+    val sm = newStreamMetrics(source1, source2)
+    // Trigger 1 with data
+    sm.reportTriggerStarted(1)
+    sm.reportNumInputRows(Map(source1 -> 100L, source2 -> 100L))
+    clock.advance(1000)
+    sm.reportTriggerFinished()
+
+    // Trigger 2 with data
+    clock.advance(1000)
+    sm.reportTriggerStarted(2)
+    sm.reportNumInputRows(Map(source1 -> 200L, source2 -> 200L))
+    clock.advance(500)
+    sm.reportTriggerFinished()
+
+    // Make sure that all rates are set
+    assert(sm.currentInputRate() === 200.0) // 200*2 input rows generated in 2 
seconds b/w starts
+    assert(sm.currentProcessingRate() === 800.0) // 200*2 output rows 
processed in 0.5 sec
+    assert(sm.currentSourceInputRate(source1) === 100.0)
+    assert(sm.currentSourceInputRate(source2) === 100.0)
+    assert(sm.currentSourceProcessingRate(source1) === 400.0)
+    assert(sm.currentSourceProcessingRate(source2) === 400.0)
+
+    // Trigger 3 with data
+    clock.advance(500)
+    sm.reportTriggerStarted(3)
+    clock.advance(500)
+    sm.reportNumInputRows(Map(source1 -> 200L))
+    sm.reportTriggerFinished()
+
+    // Rates are set to zero and latency is set to None
+    assert(sm.currentInputRate() === 200.0)
+    assert(sm.currentProcessingRate() === 400.0)
+    assert(sm.currentSourceInputRate(source1) === 200.0)
+    assert(sm.currentSourceInputRate(source2) === 0.0)
+    assert(sm.currentSourceProcessingRate(source1) === 400.0)
+    assert(sm.currentSourceProcessingRate(source2) === 0.0)
+    sm.stop()
+  }
+
+  test("registered Codahale metrics") {
+    import scala.collection.JavaConverters._
+    val sm = newStreamMetrics(source)
+    val gaugeNames = sm.metricRegistry.getGauges().keySet().asScala
+
+    // so that all metrics are considered as a single metric group in Ganglia
+    assert(!gaugeNames.exists(_.contains(".")))
+    assert(gaugeNames === Set(
+      "inputRate-total",
+      "inputRate-source0",
+      "processingRate-total",
+      "processingRate-source0",
+      "latency"))
+  }
+
+  private def newStreamMetrics(sources: Source*): StreamMetrics = {
+    new StreamMetrics(sources.toSet, clock, "test")
+  }
+
+  private val clock = new ManualClock()
+  private val source = TestSource(0)
+
+  case class TestSource(id: Int) extends Source {
+    override def schema: StructType = StructType(Array.empty[StructField])
+    override def getOffset: Option[Offset] = Some(new LongOffset(0))
+    override def getBatch(start: Option[Offset], end: Offset): DataFrame = { 
null }
+    override def stop() {}
+    override def toString(): String = s"source$id"
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
index 6b0ba7a..5174a04 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
@@ -156,6 +156,30 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
     }
   }
 
+  test("input row metrics") {
+    serverThread = new ServerThread()
+    serverThread.start()
+
+    val provider = new TextSocketSourceProvider
+    val parameters = Map("host" -> "localhost", "port" -> 
serverThread.port.toString)
+    source = provider.createSource(sqlContext, "", None, "", parameters)
+
+    failAfter(streamingTimeout) {
+      serverThread.enqueue("hello")
+      while (source.getOffset.isEmpty) {
+        Thread.sleep(10)
+      }
+      val batch = source.getBatch(None, source.getOffset.get).as[String]
+      batch.collect()
+      val numRowsMetric =
+        
batch.queryExecution.executedPlan.collectLeaves().head.metrics.get("numOutputRows")
+      assert(numRowsMetric.nonEmpty)
+      assert(numRowsMetric.get.value === 1)
+      source.stop()
+      source = null
+    }
+  }
+
   private class ServerThread extends Thread with Logging {
     private val serverSocket = new ServerSocket(0)
     private val messageQueue = new LinkedBlockingQueue[String]()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to