This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bb8b691b0f66 [SPARK-48139][CONNECT][TESTS] Try stabilising
multi-thread tests in CI
bb8b691b0f66 is described below
commit bb8b691b0f66cf50937f24d0b63342ca0da07e9c
Author: Paddy Xu <[email protected]>
AuthorDate: Wed Oct 30 11:01:24 2024 -0700
[SPARK-48139][CONNECT][TESTS] Try stabilising multi-thread tests in CI
### What changes were proposed in this pull request?
This PR tries to stabilise flaky tests which involve thread pools. Some
tests are failing due to the thread pool having 2 threads instead of 3 or 4.
### Why are the changes needed?
To let CI pass.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI will tell.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48622 from xupefei/ci-threadpool.
Authored-by: Paddy Xu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala | 7 +++----
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala | 4 ++--
.../spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala | 4 ++--
3 files changed, 7 insertions(+), 8 deletions(-)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
index b116edb7df7c..7c3b92bf9094 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql
-import java.util.concurrent.ForkJoinPool
+import java.util.concurrent.Executors
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
@@ -137,15 +137,14 @@ class SparkSessionE2ESuite extends ConnectFunSuite with
RemoteSparkSession {
assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
}
- // TODO(SPARK-48139): Re-enable `SparkSessionE2ESuite.interrupt tag`
- ignore("interrupt tag") {
+ test("interrupt tag") {
val session = spark
import session.implicits._
// global ExecutionContext has only 2 threads in Apache Spark CI
// create own thread pool for four Futures used in this test
val numThreads = 4
- val fpool = new ForkJoinPool(numThreads)
+ val fpool = Executors.newFixedThreadPool(numThreads)
val executionContext = ExecutionContext.fromExecutorService(fpool)
val q1 = Future {
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index ca51e61f5ed4..857f6b0cd8f7 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import java.util.concurrent.{Semaphore, TimeUnit}
+import java.util.concurrent.{Executors, Semaphore, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
@@ -302,7 +302,7 @@ class JobCancellationSuite extends SparkFunSuite with
Matchers with BeforeAndAft
// global ExecutionContext has only 2 threads in Apache Spark CI
// create own thread pool for four Futures used in this test
val numThreads = 4
- val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool",
numThreads)
+ val fpool = Executors.newFixedThreadPool(numThreads)
val executionContext = ExecutionContext.fromExecutorService(fpool)
try {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
index e9fd07ecf18b..088abd063d4a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, Executors, Semaphore, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ExecutionContext, Future}
@@ -121,7 +121,7 @@ class SparkSessionJobTaggingAndCancellationSuite
// global ExecutionContext has only 2 threads in Apache Spark CI
// create own thread pool for four Futures used in this test
val numThreads = 3
- val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool",
numThreads)
+ val fpool = Executors.newFixedThreadPool(numThreads)
val executionContext = ExecutionContext.fromExecutorService(fpool)
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]