This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bb8b691b0f66 [SPARK-48139][CONNECT][TESTS] Try stabilising 
multi-thread tests in CI
bb8b691b0f66 is described below

commit bb8b691b0f66cf50937f24d0b63342ca0da07e9c
Author: Paddy Xu <[email protected]>
AuthorDate: Wed Oct 30 11:01:24 2024 -0700

    [SPARK-48139][CONNECT][TESTS] Try stabilising multi-thread tests in CI
    
    ### What changes were proposed in this pull request?
    
    This PR tries to stabilise flaky tests which involve thread pools. Some 
tests are failing due to the thread pool having 2 threads instead of 3 or 4.
    
    ### Why are the changes needed?
    
    To let CI pass.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI will tell.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48622 from xupefei/ci-threadpool.
    
    Authored-by: Paddy Xu <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala | 7 +++----
 core/src/test/scala/org/apache/spark/JobCancellationSuite.scala    | 4 ++--
 .../spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala     | 4 ++--
 3 files changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
index b116edb7df7c..7c3b92bf9094 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql
 
-import java.util.concurrent.ForkJoinPool
+import java.util.concurrent.Executors
 
 import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
@@ -137,15 +137,14 @@ class SparkSessionE2ESuite extends ConnectFunSuite with 
RemoteSparkSession {
     assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
   }
 
-  // TODO(SPARK-48139): Re-enable `SparkSessionE2ESuite.interrupt tag`
-  ignore("interrupt tag") {
+  test("interrupt tag") {
     val session = spark
     import session.implicits._
 
     // global ExecutionContext has only 2 threads in Apache Spark CI
     // create own thread pool for four Futures used in this test
     val numThreads = 4
-    val fpool = new ForkJoinPool(numThreads)
+    val fpool = Executors.newFixedThreadPool(numThreads)
     val executionContext = ExecutionContext.fromExecutorService(fpool)
 
     val q1 = Future {
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index ca51e61f5ed4..857f6b0cd8f7 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import java.util.concurrent.{Semaphore, TimeUnit}
+import java.util.concurrent.{Executors, Semaphore, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.ArrayBuffer
@@ -302,7 +302,7 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
     // global ExecutionContext has only 2 threads in Apache Spark CI
     // create own thread pool for four Futures used in this test
     val numThreads = 4
-    val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool", 
numThreads)
+    val fpool = Executors.newFixedThreadPool(numThreads)
     val executionContext = ExecutionContext.fromExecutorService(fpool)
 
     try {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
index e9fd07ecf18b..088abd063d4a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, Executors, Semaphore, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.concurrent.{ExecutionContext, Future}
@@ -121,7 +121,7 @@ class SparkSessionJobTaggingAndCancellationSuite
     // global ExecutionContext has only 2 threads in Apache Spark CI
     // create own thread pool for four Futures used in this test
     val numThreads = 3
-    val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool", 
numThreads)
+    val fpool = Executors.newFixedThreadPool(numThreads)
     val executionContext = ExecutionContext.fromExecutorService(fpool)
 
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to