This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d7afb06f68 [SPARK-38723][SS][TESTS] Add test for streaming query 
resume race condition
7d7afb06f68 is described below

commit 7d7afb06f682c10f3900eb8adeab9fad6d49cb24
Author: Phil Dakin <phil.daki...@gmail.com>
AuthorDate: Thu Oct 26 14:24:09 2023 +0900

    [SPARK-38723][SS][TESTS] Add test for streaming query resume race condition
    
    ### What changes were proposed in this pull request?
    Add a test for the CONCURRENT_QUERY error raised when multiple sessions try 
to simultaneously resume the same streaming query from checkpoint.
    
    ### Why are the changes needed?
    Improve testing coverage per 
https://issues.apache.org/jira/browse/SPARK-38723.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Change is itself a test - ran locally and confirmed the suite passes.
    ```
    [info] All tests passed.
    [success] Total time: 129 s (02:09), completed Oct 17, 2023, 2:11:34 PM
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #43405 from PhilDakin/pdakin.SPARK-38723.
    
    Authored-by: Phil Dakin <phil.daki...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/errors/QueryExecutionErrorsSuite.scala     | 48 ++++++++++++++++++++++
 1 file changed, 48 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 78bbabb1a3f..fb1d05f2a9a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -25,6 +25,7 @@ import java.util.{Locale, Properties, 
ServiceConfigurationError}
 import org.apache.hadoop.fs.{LocalFileSystem, Path}
 import org.apache.hadoop.fs.permission.FsPermission
 import org.mockito.Mockito.{mock, spy, when}
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest, 
Row, SaveMode}
@@ -49,6 +50,7 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
 import org.apache.spark.sql.streaming.StreamingQueryException
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{DataType, DecimalType, LongType, 
MetadataBuilder, StructType}
+import org.apache.spark.util.ThreadUtils
 import org.apache.spark.util.Utils
 
 class QueryExecutionErrorsSuite
@@ -876,6 +878,52 @@ class QueryExecutionErrorsSuite
     assert(e.getCause.isInstanceOf[NullPointerException])
   }
 
+  test("CONCURRENT_QUERY: streaming query is resumed from many sessions") {
+    failAfter(90 seconds) {
+      withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
+        withTempDir { dir =>
+          val ds = spark.readStream.format("rate").load()
+
+          // Queries have the same ID when they are resumed from the same 
checkpoint.
+          val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
+          val dataLocation = new File(dir, "data").getCanonicalPath
+
+          // Run an initial query to setup the checkpoint.
+          val initialQuery = ds.writeStream.format("parquet")
+            .option("checkpointLocation", chkLocation).start(dataLocation)
+
+          // Error is thrown due to a race condition. Ensure it happens with 
high likelihood in the
+          // test by spawning many threads.
+          val exceptions = ThreadUtils.parmap(Seq.range(1, 50), 
"QueryExecutionErrorsSuite", 50)
+            { _ =>
+              var exception = None : 
Option[SparkConcurrentModificationException]
+              try {
+                val restartedQuery = ds.writeStream.format("parquet")
+                  .option("checkpointLocation", 
chkLocation).start(dataLocation)
+                restartedQuery.stop()
+                restartedQuery.awaitTermination()
+              } catch {
+                case e: SparkConcurrentModificationException =>
+                  exception = Some(e)
+              }
+              exception
+            }
+          assert(exceptions.map(e => e.isDefined).reduceLeft(_ || _))
+          exceptions.map { e =>
+            if (e.isDefined) {
+              checkError(
+                e.get,
+                errorClass = "CONCURRENT_QUERY",
+                sqlState = Some("0A000")
+              )
+            }
+          }
+          spark.streams.active.foreach(_.stop())
+        }
+      }
+    }
+  }
+
   test("UNSUPPORTED_EXPR_FOR_WINDOW: to_date is not supported with WINDOW") {
     withTable("t") {
       sql("CREATE TABLE t(c String) USING parquet")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to