Repository: spark
Updated Branches:
  refs/heads/master 59c3c233f -> ff7f6ef75


[SPARK-24697][SS] Fix the reported start offsets in streaming query progress

## What changes were proposed in this pull request?

In ProgressReporter for streams, we use the `committedOffsets` as the 
startOffset and `availableOffsets` as the end offset when reporting the status 
of a trigger in `finishTrigger`. This is a bad pattern that has existed since 
the beginning of ProgressReporter and it is bad because its super hard to 
reason about when `availableOffsets` and `committedOffsets` are updated, and 
when they are recorded. Case in point, this bug silently existed in 
ContinuousExecution, since before MicroBatchExecution was refactored.

The correct fix it to record the offsets explicitly. This PR adds a simple 
method which is explicitly called from MicroBatch/ContinuousExecition before 
updating the `committedOffsets`.

## How was this patch tested?
Added new tests

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #21744 from tdas/SPARK-24697.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff7f6ef7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff7f6ef7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff7f6ef7

Branch: refs/heads/master
Commit: ff7f6ef75c80633480802d537e66432e3bea4785
Parents: 59c3c23
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Jul 11 12:44:42 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Jul 11 12:44:42 2018 -0700

----------------------------------------------------------------------
 .../streaming/MicroBatchExecution.scala         |  3 +++
 .../execution/streaming/ProgressReporter.scala  | 21 ++++++++++++++++----
 .../continuous/ContinuousExecution.scala        |  3 +++
 .../sql/streaming/StreamingQuerySuite.scala     |  6 ++++--
 4 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff7f6ef7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 16651dd..45c43f5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -184,6 +184,9 @@ class MicroBatchExecution(
             isCurrentBatchConstructed = 
constructNextBatch(noDataBatchesEnabled)
           }
 
+          // Record the trigger offset range for progress reporting *before* 
processing the batch
+          recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
+
           // Remember whether the current batch has data or not. This will be 
required later
           // for bookkeeping after running the batch, when 
`isNewDataAvailable` will have changed
           // to false as the batch would have already processed the available 
data.

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7f6ef7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 16ad3ef..47f4b52 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -56,8 +56,6 @@ trait ProgressReporter extends Logging {
   protected def logicalPlan: LogicalPlan
   protected def lastExecution: QueryExecution
   protected def newData: Map[BaseStreamingSource, LogicalPlan]
-  protected def availableOffsets: StreamProgress
-  protected def committedOffsets: StreamProgress
   protected def sources: Seq[BaseStreamingSource]
   protected def sink: BaseStreamingSink
   protected def offsetSeqMetadata: OffsetSeqMetadata
@@ -68,8 +66,11 @@ trait ProgressReporter extends Logging {
   // Local timestamps and counters.
   private var currentTriggerStartTimestamp = -1L
   private var currentTriggerEndTimestamp = -1L
+  private var currentTriggerStartOffsets: Map[BaseStreamingSource, String] = _
+  private var currentTriggerEndOffsets: Map[BaseStreamingSource, String] = _
   // TODO: Restore this from the checkpoint when possible.
   private var lastTriggerStartTimestamp = -1L
+
   private val currentDurationsMs = new mutable.HashMap[String, Long]()
 
   /** Flag that signals whether any error with input metrics have already been 
logged */
@@ -114,9 +115,20 @@ trait ProgressReporter extends Logging {
     lastTriggerStartTimestamp = currentTriggerStartTimestamp
     currentTriggerStartTimestamp = triggerClock.getTimeMillis()
     currentStatus = currentStatus.copy(isTriggerActive = true)
+    currentTriggerStartOffsets = null
+    currentTriggerEndOffsets = null
     currentDurationsMs.clear()
   }
 
+  /**
+   * Record the offsets range this trigger will process. Call this before 
updating
+   * `committedOffsets` in `StreamExecution` to make sure that the correct 
range is recorded.
+   */
+  protected def recordTriggerOffsets(from: StreamProgress, to: 
StreamProgress): Unit = {
+    currentTriggerStartOffsets = from.mapValues(_.json)
+    currentTriggerEndOffsets = to.mapValues(_.json)
+  }
+
   private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
     progressBuffer.synchronized {
       progressBuffer += newProgress
@@ -130,6 +142,7 @@ trait ProgressReporter extends Logging {
 
   /** Finalizes the query progress and adds it to list of recent status 
updates. */
   protected def finishTrigger(hasNewData: Boolean): Unit = {
+    assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != 
null)
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
 
     val executionStats = extractExecutionStats(hasNewData)
@@ -147,8 +160,8 @@ trait ProgressReporter extends Logging {
       val numRecords = executionStats.inputRows.getOrElse(source, 0L)
       new SourceProgress(
         description = source.toString,
-        startOffset = committedOffsets.get(source).map(_.json).orNull,
-        endOffset = availableOffsets.get(source).map(_.json).orNull,
+        startOffset = currentTriggerStartOffsets.get(source).orNull,
+        endOffset = currentTriggerEndOffsets.get(source).orNull,
         numInputRows = numRecords,
         inputRowsPerSecond = numRecords / inputTimeSec,
         processedRowsPerSecond = numRecords / processingTimeSec

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7f6ef7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index a0bb829..e991dbc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -309,7 +309,10 @@ class ContinuousExecution(
   def commit(epoch: Long): Unit = {
     assert(continuousSources.length == 1, "only one continuous source 
supported currently")
     assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not 
reported before commit")
+
     synchronized {
+      // Record offsets before updating `committedOffsets`
+      recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
       if (queryExecutionThread.isAlive) {
         commitLog.add(epoch)
         val offset =

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7f6ef7/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index dcf6cb5..936a076 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -335,8 +335,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
         assert(progress.sources.length === 1)
         assert(progress.sources(0).description contains "MemoryStream")
-        assert(progress.sources(0).startOffset === "0")
-        assert(progress.sources(0).endOffset !== null)
+        assert(progress.sources(0).startOffset === null)   // no prior offset
+        assert(progress.sources(0).endOffset === "0")
         assert(progress.sources(0).processedRowsPerSecond === 4.0)  // 2 rows 
processed in 500 ms
 
         assert(progress.stateOperators.length === 1)
@@ -362,6 +362,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
         assert(query.lastProgress.batchId === 1)
         assert(query.lastProgress.inputRowsPerSecond === 2.0)
         assert(query.lastProgress.sources(0).inputRowsPerSecond === 2.0)
+        assert(query.lastProgress.sources(0).startOffset === "0")
+        assert(query.lastProgress.sources(0).endOffset === "1")
         true
       },
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to