This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5c48806a2941 [SPARK-49688][CONNECT][TESTS] Fix a sporadic
`SparkConnectServiceSuite` failure
5c48806a2941 is described below
commit 5c48806a2941070e23a81b4e7e4f3225fe341535
Author: Changgyoo Park <[email protected]>
AuthorDate: Thu Sep 19 09:08:59 2024 +0900
[SPARK-49688][CONNECT][TESTS] Fix a sporadic `SparkConnectServiceSuite`
failure
### What changes were proposed in this pull request?
Add a short wait loop to ensure that the test pre-condition is met. To be
specific, VerifyEvents.executeHolder is set asynchronously by
MockSparkListener.onOtherEvent whereas the test assumes that
VerifyEvents.executeHolder is always available.
### Why are the changes needed?
For smoother development experience.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
SparkConnectServiceSuite.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48142 from changgyoopark-db/SPARK-49688.
Authored-by: Changgyoo Park <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/connect/planner/SparkConnectServiceSuite.scala | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index 579fdb47aef3..62146f19328a 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -871,10 +871,16 @@ class SparkConnectServiceSuite
class VerifyEvents(val sparkContext: SparkContext) {
val listener: MockSparkListener = new MockSparkListener()
val listenerBus = sparkContext.listenerBus
+ val EVENT_WAIT_TIMEOUT = timeout(10.seconds)
val LISTENER_BUS_TIMEOUT = 30000
def executeHolder: ExecuteHolder = {
- assert(listener.executeHolder.isDefined)
- listener.executeHolder.get
+ // An ExecuteHolder shall be set eventually through MockSparkListener
+ Eventually.eventually(EVENT_WAIT_TIMEOUT) {
+ assert(
+ listener.executeHolder.isDefined,
+ s"No events have been posted in $EVENT_WAIT_TIMEOUT")
+ listener.executeHolder.get
+ }
}
def onNext(v: proto.ExecutePlanResponse): Unit = {
if (v.hasSchema) {
@@ -891,8 +897,10 @@ class SparkConnectServiceSuite
def onCompleted(producedRowCount: Option[Long] = None): Unit = {
assert(executeHolder.eventsManager.getProducedRowCount ==
producedRowCount)
// The eventsManager is closed asynchronously
- Eventually.eventually(timeout(1.seconds)) {
- assert(executeHolder.eventsManager.status == ExecuteStatus.Closed)
+ Eventually.eventually(EVENT_WAIT_TIMEOUT) {
+ assert(
+ executeHolder.eventsManager.status == ExecuteStatus.Closed,
+ s"Execution has not been completed in $EVENT_WAIT_TIMEOUT")
}
}
def onCanceled(): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]