This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 30637a8 [SPARK-31593][SS] Remove unnecessary streaming query progress
update
30637a8 is described below
commit 30637a81fb50492e3c54759bb3fd7aac1cd0e326
Author: uncleGen <[email protected]>
AuthorDate: Sun Jun 14 14:49:01 2020 +0900
[SPARK-31593][SS] Remove unnecessary streaming query progress update
### What changes were proposed in this pull request?
Structured Streaming progress reporter will always report an `empty`
progress when there is no new data. As design, we should provide progress
updates every 10s (default) when there is no new data.
Before PR:



After PR:

### Why are the changes needed?
Fixes a bug around incorrect progress report
### Does this PR introduce any user-facing change?
Fixes a bug around incorrect progress report
### How was this patch tested?
existing ut and manual test
Closes #28391 from uncleGen/SPARK-31593.
Authored-by: uncleGen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 1e40bccf447dccad9d31bccc75d21b8fca77ba52)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../sql/execution/streaming/ProgressReporter.scala | 2 +-
.../streaming/StreamingDeduplicationSuite.scala | 7 ++-
.../streaming/StreamingQueryListenerSuite.scala | 56 ++++++++++++++++++++--
3 files changed, 58 insertions(+), 7 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 0dff1c2..ea1f2ce 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -201,7 +201,7 @@ trait ProgressReporter extends Logging {
if (hasExecuted) {
// Reset noDataEventTimestamp if we processed any data
- lastNoExecutionProgressEventTime = Long.MinValue
+ lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index f63778a..51ddc7b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -281,7 +281,12 @@ class StreamingDeduplicationSuite extends
StateStoreMetricsTest {
if (flag) assertNumStateRows(total = 1, updated = 1)
else assertNumStateRows(total = 7, updated = 1)
},
- AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L)
+ AssertOnQuery { q =>
+ eventually(timeout(streamingTimeout)) {
+ q.lastProgress.sink.numOutputRows == 0L
+ true
+ }
+ }
)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index e585b8a..6e08b88 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -389,7 +389,7 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
// Structured Streaming in Spark 2.0.0. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last
event. We use it
// to verify that we can skip broken jsons generated by Structured
Streaming.
-
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt",
1)
+
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt",
1)
}
testQuietly("ReplayListenerBus should ignore broken event jsons generated in
2_0_1") {
@@ -397,14 +397,14 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
// Structured Streaming in Spark 2.0.1. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last
event. We use it
// to verify that we can skip broken jsons generated by Structured
Streaming.
-
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt",
1)
+
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt",
1)
}
testQuietly("ReplayListenerBus should ignore broken event jsons generated in
2_0_2") {
// query-event-logs-version-2.0.2.txt has all types of events generated by
// Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured
Streaming query events
// in 2.1.0. This test is to verify we are able to load events generated
by Spark 2.0.2.
-
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt",
5)
+
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.2.txt",
5)
}
test("listener propagates observable metrics") {
@@ -433,9 +433,13 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
}
try {
+ val noDataProgressIntervalKey =
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key
spark.streams.addListener(listener)
testStream(df, OutputMode.Append)(
- StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
+ StartStream(
+ Trigger.ProcessingTime(100),
+ triggerClock = clock,
+ Map(noDataProgressIntervalKey -> "100")),
// Batch 1
AddData(inputData, 1, 2),
AdvanceManualClock(100),
@@ -464,7 +468,49 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
}
}
- private def testReplayListenerBusWithBorkenEventJsons(
+ test("SPARK-31593: remove unnecessary streaming query progress update") {
+ withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key ->
"100") {
+ @volatile var numProgressEvent = 0
+ val listener = new StreamingQueryListener {
+ override def onQueryStarted(event: QueryStartedEvent): Unit = {}
+ override def onQueryProgress(event: QueryProgressEvent): Unit = {
+ numProgressEvent += 1
+ }
+ override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
+ }
+ spark.streams.addListener(listener)
+
+ def checkProgressEvent(count: Int): StreamAction = {
+ AssertOnQuery { _ =>
+ eventually(Timeout(streamingTimeout)) {
+ assert(numProgressEvent == count)
+ }
+ true
+ }
+ }
+
+ try {
+ val input = new MemoryStream[Int](0, sqlContext)
+ val clock = new StreamManualClock()
+ val result = input.toDF().select("value")
+ testStream(result)(
+ StartStream(trigger = Trigger.ProcessingTime(10), triggerClock =
clock),
+ AddData(input, 10),
+ checkProgressEvent(1),
+ AdvanceManualClock(10),
+ checkProgressEvent(2),
+ AdvanceManualClock(90),
+ checkProgressEvent(2),
+ AdvanceManualClock(10),
+ checkProgressEvent(3)
+ )
+ } finally {
+ spark.streams.removeListener(listener)
+ }
+ }
+ }
+
+ private def testReplayListenerBusWithBrokenEventJsons(
fileName: String,
expectedEventSize: Int): Unit = {
val input =
getClass.getResourceAsStream(s"/structured-streaming/$fileName")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]