This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new be76ee9  [SPARK-32845][SS][TESTS] Add sinkParameter to check sink 
options robustly in DataStreamReaderWriterSuite
be76ee9 is described below

commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Fri Sep 11 11:48:34 2020 -0700

    [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly 
in DataStreamReaderWriterSuite
    
    This PR aims to add `sinkParameter`  to check sink options robustly and 
independently in DataStreamReaderWriterSuite
    
    `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, 
`createSource`, `createSink`. However, `StreamQuery.stop` invokes 
`queryExecutionThread.join`, `runStream`, `createSource` immediately and reset 
the stored options by `createSink`.
    
    To catch `createSink` options, currently, the test suite is trying a 
workaround pattern. However, we observed a flakiness in this pattern sometimes. 
If we split `createSink` option separately, we don't need this workaround and 
can eliminate this flakiness.
    
    ```scala
    val query = df.writeStream.
       ...
       .start()
    assert(LastOptions.paramters(..))
    query.stop()
    ```
    
    No. This is a test-only change.
    
    Pass the newly updated test case.
    
    Closes #29730 from dongjoon-hyun/SPARK-32845.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../test/DataStreamReaderWriterSuite.scala         | 23 +++++++++++-----------
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4744aca..38d5c74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -43,11 +43,13 @@ object LastOptions {
   var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
   var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
   var parameters: Map[String, String] = null
+  var sinkParameters: Map[String, String] = null
   var schema: Option[StructType] = null
   var partitionColumns: Seq[String] = Nil
 
   def clear(): Unit = {
     parameters = null
+    sinkParameters = null
     schema = null
     partitionColumns = null
     reset(mockStreamSourceProvider)
@@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with 
StreamSinkProvider {
       parameters: Map[String, String],
       partitionColumns: Seq[String],
       outputMode: OutputMode): Sink = {
-    LastOptions.parameters = parameters
+    LastOptions.sinkParameters = parameters
     LastOptions.partitionColumns = partitionColumns
     LastOptions.mockStreamSinkProvider.createSink(spark, parameters, 
partitionColumns, outputMode)
     new Sink {
@@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
     LastOptions.clear()
 
-    val query = df.writeStream
+    df.writeStream
       .format("org.apache.spark.sql.streaming.test")
       .option("opt1", "5")
       .options(Map("opt2" -> "4"))
       .options(map)
       .option("checkpointLocation", newMetadataDir)
       .start()
+      .stop()
 
-    assert(LastOptions.parameters("opt1") == "5")
-    assert(LastOptions.parameters("opt2") == "4")
-    assert(LastOptions.parameters("opt3") == "3")
-    assert(LastOptions.parameters.contains("checkpointLocation"))
-
-    query.stop()
+    assert(LastOptions.sinkParameters("opt1") == "5")
+    assert(LastOptions.sinkParameters("opt2") == "4")
+    assert(LastOptions.sinkParameters("opt3") == "3")
+    assert(LastOptions.sinkParameters.contains("checkpointLocation"))
   }
 
   test("SPARK-32832: later option should override earlier options for load()") 
{
@@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
       .load()
     assert(LastOptions.parameters.isEmpty)
 
-    val query = ds.writeStream
+    ds.writeStream
       .format("org.apache.spark.sql.streaming.test")
       .option("checkpointLocation", newMetadataDir)
       .option("paTh", "1")
@@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
       .option("patH", "4")
       .option("path", "5")
       .start()
-    assert(LastOptions.parameters("path") == "5")
-    query.stop()
+      .stop()
+    assert(LastOptions.sinkParameters("path") == "5")
   }
 
   test("partitioning") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to