This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new af7dd18 [SPARK-36764][SS][TEST] Fix race-condition on "ensure
continuous stream is being used" in KafkaContinuousTest
af7dd18 is described below
commit af7dd18a5ede5ed87c3f1a13100633f7f60d2cd8
Author: Jungtaek Lim <[email protected]>
AuthorDate: Fri Sep 17 21:28:02 2021 +0800
[SPARK-36764][SS][TEST] Fix race-condition on "ensure continuous stream is
being used" in KafkaContinuousTest
### What changes were proposed in this pull request?
The test “ensure continuous stream is being used“ in KafkaContinuousTest
quickly checks the actual type of the execution, and stops the query. Stopping
the streaming query in continuous mode is done by interrupting query execution
thread and join with indefinite timeout.
In parallel, started streaming query is going to generate execution plan,
including running optimizer. Some parts of SessionState can be built at that
time, as they are defined as lazy. The problem is, some of them seem to
“swallow” the InterruptedException and let the thread run continuously.
That said, the query can’t indicate whether there is a request on stopping
query, so the query won’t stop.
This PR fixes such scenario via ensuring that streaming query has started
before the test stops the query.
### Why are the changes needed?
Race-condition could end up with test hang till test framework marks it as
timed-out.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes #34004 from HeartSaVioR/SPARK-36764.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 6099edc66eb35db548230eeaba791c730eb38f84)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala | 1 +
1 file changed, 1 insertion(+)
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index 9ee8cbf..4b6a5b8 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -91,6 +91,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
.load()
testStream(query)(
+ makeSureGetOffsetCalled,
Execute(q => assert(q.isInstanceOf[ContinuousExecution]))
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]