This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 44ab0fc0068 [SPARK-45045][SS] Revert back the behavior of idle
progress for StreamingQuery API from SPARK-43183
44ab0fc0068 is described below
commit 44ab0fc0068f815c7eddcd34ae4343bbfd97b64d
Author: Jungtaek Lim <[email protected]>
AuthorDate: Mon Sep 4 11:41:48 2023 +0900
[SPARK-45045][SS] Revert back the behavior of idle progress for
StreamingQuery API from SPARK-43183
### What changes were proposed in this pull request?
This PR proposes to revert back the behavior of idle progress for
StreamingQuery API from
[SPARK-43183](https://issues.apache.org/jira/browse/SPARK-43183), to avoid
breakage of tests from 3rd party data sources.
### Why are the changes needed?
We indicated that the behavioral change from SPARK-43183 broke many tests
in 3rd party data sources.
(Short summary of SPARK-43183: we changed the behavior of idle progress to
only provide idle event callback, instead of making progress update callback as
well as adding progress for StreamingQuery API to provide as recent
progresses/last progress.)
The main rationale of SPARK-43183 was to avoid making progress update
callback for idle event, which had been confused users. That is more about
streaming query listener, and not necessarily had to change the behavior of
StreamingQuery API as well.
### Does this PR introduce _any_ user-facing change?
Yes, but the user-facing change is technically reduced before this PR, as
we revert back the behavioral change partially from SPARK-43183, which wasn't
released yet.
### How was this patch tested?
Modified tests. Also manually ran 3rd party data source tests which were
broken with Spark 3.5.0 RC which succeeded with this change.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42773 from HeartSaVioR/SPARK-45045.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit cf0a5cb472efebb4350e48bd82a4f834e8607333)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/ProgressReporter.scala | 164 ++++++++++++---------
.../streaming/StreamingQueryListenerSuite.scala | 5 +-
.../StreamingQueryStatusAndProgressSuite.scala | 41 +++++-
3 files changed, 135 insertions(+), 75 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 6dbecd186dc..c0bd94e7d6c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -89,7 +89,7 @@ trait ProgressReporter extends Logging {
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
// The timestamp we report an event that has not executed anything
- private var lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+ private var lastNoExecutionProgressEventTime = Long.MinValue
private val timestampFormat = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
@@ -142,21 +142,37 @@ trait ProgressReporter extends Logging {
latestStreamProgress = to
}
- private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+ private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
progressBuffer.synchronized {
progressBuffer += newProgress
while (progressBuffer.length >=
sparkSession.sqlContext.conf.streamingProgressRetention) {
progressBuffer.dequeue()
}
}
+ }
+
+ private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+ // Reset noDataEventTimestamp if we processed any data
+ lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+
+ addNewProgress(newProgress)
postEvent(new QueryProgressEvent(newProgress))
logInfo(s"Streaming query made progress: $newProgress")
}
- private def postIdleness(): Unit = {
- postEvent(new QueryIdleEvent(id, runId,
formatTimestamp(currentTriggerStartTimestamp)))
- logInfo(s"Streaming query has been idle and waiting for new data more than
" +
- s"${noDataProgressEventInterval} ms.")
+ private def updateIdleness(newProgress: StreamingQueryProgress): Unit = {
+ val now = triggerClock.getTimeMillis()
+ if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime)
{
+ addNewProgress(newProgress)
+ if (lastNoExecutionProgressEventTime > Long.MinValue) {
+ postEvent(new QueryIdleEvent(newProgress.id, newProgress.runId,
+ formatTimestamp(currentTriggerStartTimestamp)))
+ logInfo(s"Streaming query has been idle and waiting for new data more
than " +
+ s"$noDataProgressEventInterval ms.")
+ }
+
+ lastNoExecutionProgressEventTime = now
+ }
}
/**
@@ -172,96 +188,102 @@ trait ProgressReporter extends Logging {
currentTriggerLatestOffsets != null)
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
- if (hasExecuted) {
- val executionStats = extractExecutionStats(hasNewData)
- val processingTimeMills = currentTriggerEndTimestamp -
currentTriggerStartTimestamp
- val processingTimeSec = Math.max(1L, processingTimeMills).toDouble /
MILLIS_PER_SECOND
+ val executionStats = extractExecutionStats(hasNewData, hasExecuted)
+ val processingTimeMills = currentTriggerEndTimestamp -
currentTriggerStartTimestamp
+ val processingTimeSec = Math.max(1L, processingTimeMills).toDouble /
MILLIS_PER_SECOND
- val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
- (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble /
MILLIS_PER_SECOND
- } else {
- Double.PositiveInfinity
- }
- logDebug(s"Execution stats: $executionStats")
-
- val sourceProgress = sources.distinct.map { source =>
- val numRecords = executionStats.inputRows.getOrElse(source, 0L)
- val sourceMetrics = source match {
- case withMetrics: ReportsSourceMetrics =>
-
withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
- case _ => Map[String, String]().asJava
- }
- new SourceProgress(
- description = source.toString,
- startOffset = currentTriggerStartOffsets.get(source).orNull,
- endOffset = currentTriggerEndOffsets.get(source).orNull,
- latestOffset = currentTriggerLatestOffsets.get(source).orNull,
- numInputRows = numRecords,
- inputRowsPerSecond = numRecords / inputTimeSec,
- processedRowsPerSecond = numRecords / processingTimeSec,
- metrics = sourceMetrics
- )
- }
+ val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
+ (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble /
MILLIS_PER_SECOND
+ } else {
+ Double.PositiveInfinity
+ }
+ logDebug(s"Execution stats: $executionStats")
- val sinkOutput = sinkCommitProgress.map(_.numOutputRows)
- val sinkMetrics = sink match {
- case withMetrics: ReportsSinkMetrics =>
- withMetrics.metrics()
+ val sourceProgress = sources.distinct.map { source =>
+ val numRecords = executionStats.inputRows.getOrElse(source, 0L)
+ val sourceMetrics = source match {
+ case withMetrics: ReportsSourceMetrics =>
+
withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
case _ => Map[String, String]().asJava
}
+ new SourceProgress(
+ description = source.toString,
+ startOffset = currentTriggerStartOffsets.get(source).orNull,
+ endOffset = currentTriggerEndOffsets.get(source).orNull,
+ latestOffset = currentTriggerLatestOffsets.get(source).orNull,
+ numInputRows = numRecords,
+ inputRowsPerSecond = numRecords / inputTimeSec,
+ processedRowsPerSecond = numRecords / processingTimeSec,
+ metrics = sourceMetrics
+ )
+ }
+
+ val sinkOutput = if (hasExecuted) {
+ sinkCommitProgress.map(_.numOutputRows)
+ } else {
+ sinkCommitProgress.map(_ => 0L)
+ }
- val sinkProgress = SinkProgress(
- sink.toString, sinkOutput, sinkMetrics)
-
- val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
-
- val newProgress = new StreamingQueryProgress(
- id = id,
- runId = runId,
- name = name,
- timestamp = formatTimestamp(currentTriggerStartTimestamp),
- batchId = currentBatchId,
- batchDuration = processingTimeMills,
- durationMs =
- new
java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava),
- eventTime = new
java.util.HashMap(executionStats.eventTimeStats.asJava),
- stateOperators = executionStats.stateOperators.toArray,
- sources = sourceProgress.toArray,
- sink = sinkProgress,
- observedMetrics = new java.util.HashMap(observedMetrics.asJava))
-
- // Reset noDataEventTimestamp if we processed any data
- lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+ val sinkMetrics = sink match {
+ case withMetrics: ReportsSinkMetrics =>
+ withMetrics.metrics()
+ case _ => Map[String, String]().asJava
+ }
+
+ val sinkProgress = SinkProgress(
+ sink.toString, sinkOutput, sinkMetrics)
+
+ val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
+
+ val newProgress = new StreamingQueryProgress(
+ id = id,
+ runId = runId,
+ name = name,
+ timestamp = formatTimestamp(currentTriggerStartTimestamp),
+ batchId = currentBatchId,
+ batchDuration = processingTimeMills,
+ durationMs =
+ new
java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava),
+ eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
+ stateOperators = executionStats.stateOperators.toArray,
+ sources = sourceProgress.toArray,
+ sink = sinkProgress,
+ observedMetrics = new java.util.HashMap(observedMetrics.asJava))
+
+ if (hasExecuted) {
updateProgress(newProgress)
} else {
- val now = triggerClock.getTimeMillis()
- if (now - noDataProgressEventInterval >=
lastNoExecutionProgressEventTime) {
- lastNoExecutionProgressEventTime = now
- postIdleness()
- }
+ updateIdleness(newProgress)
}
currentStatus = currentStatus.copy(isTriggerActive = false)
}
/** Extract statistics about stateful operators from the executed query
plan. */
- private def extractStateOperatorMetrics(): Seq[StateOperatorProgress] = {
- assert(lastExecution != null, "lastExecution is not available")
+ private def extractStateOperatorMetrics(hasExecuted: Boolean):
Seq[StateOperatorProgress] = {
+ if (lastExecution == null) return Nil
+ // lastExecution could belong to one of the previous triggers if
`!hasExecuted`.
+ // Walking the plan again should be inexpensive.
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreWriter] =>
- p.asInstanceOf[StateStoreWriter].getProgress()
+ val progress = p.asInstanceOf[StateStoreWriter].getProgress()
+ if (hasExecuted) {
+ progress
+ } else {
+ progress.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark =
0)
+ }
}
}
/** Extracts statistics from the most recent query execution. */
- private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
+ private def extractExecutionStats(hasNewData: Boolean, hasExecuted:
Boolean): ExecutionStats = {
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e
}.nonEmpty
val watermarkTimestamp =
if (hasEventTime) Map("watermark" ->
formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
else Map.empty[String, String]
// SPARK-19378: Still report metrics even though no data was processed
while reporting progress.
- val stateOperators = extractStateOperatorMetrics()
+ val stateOperators = extractStateOperatorMetrics(hasExecuted)
if (!hasNewData) {
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 5b5e8732e0d..52b740bc5c3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -331,9 +331,9 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
}
true
}
- // `recentProgress` should not receive any events
+ // `recentProgress` should not receive too many no data events
actions += AssertOnQuery { q =>
- q.recentProgress.isEmpty
+ q.recentProgress.size > 1 && q.recentProgress.size <= 11
}
testStream(input.toDS)(actions.toSeq: _*)
spark.sparkContext.listenerBus.waitUntilEmpty()
@@ -524,7 +524,6 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
testStream(result)(
StartStream(trigger = Trigger.ProcessingTime(10), triggerClock =
clock),
AddData(input, 10),
- // checkProgressEvent(1),
AdvanceManualClock(10),
checkProgressEvent(1),
AdvanceManualClock(90),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index d016b334627..fa7a3803d05 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -24,11 +24,13 @@ import scala.collection.JavaConverters._
import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
import org.apache.spark.sql.streaming.StreamingQuerySuite.clock
import org.apache.spark.sql.streaming.util.StreamManualClock
@@ -286,6 +288,42 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually {
}
}
+ test("SPARK-19378: Continue reporting stateOp metrics even if there is no
active trigger") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10")
{
+ val inputData = MemoryStream[Int]
+
+ val query = inputData.toDS().toDF("value")
+ .select($"value")
+ .groupBy($"value")
+ .agg(count("*"))
+ .writeStream
+ .queryName("metric_continuity")
+ .format("memory")
+ .outputMode("complete")
+ .start()
+ try {
+ inputData.addData(1, 2)
+ query.processAllAvailable()
+
+ val progress = query.lastProgress
+ assert(progress.stateOperators.length > 0)
+ // Should emit new progresses every 10 ms, but we could be facing a
slow Jenkins
+ eventually(timeout(1.minute)) {
+ val nextProgress = query.lastProgress
+ assert(nextProgress.timestamp !== progress.timestamp)
+ assert(nextProgress.numInputRows === 0)
+ assert(nextProgress.stateOperators.head.numRowsTotal === 2)
+ assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
+ assert(nextProgress.sink.numOutputRows === 0)
+ }
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
test("SPARK-29973: Make `processedRowsPerSecond` calculated more accurately
and meaningfully") {
import testImplicits._
@@ -298,7 +336,8 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually {
AdvanceManualClock(1000),
waitUntilBatchProcessed,
AssertOnQuery(query => {
- assert(query.lastProgress == null)
+ assert(query.lastProgress.numInputRows == 0)
+ assert(query.lastProgress.processedRowsPerSecond == 0.0d)
true
}),
AddData(inputData, 1, 2),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]