This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 0d1664c  [SPARK-29419][SQL] Fix Encoder thread-safety bug in 
createDataset(Seq)
0d1664c is described below

commit 0d1664c9f8178285c924b44546ebbd059d92e3df
Author: Josh Rosen <[email protected]>
AuthorDate: Mon Mar 2 10:19:12 2020 +0900

    [SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq)
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if 
the caller-supplied `Encoder` is used in multiple threads then createDataset's 
usage of the encoder may lead to incorrect / corrupt results because the 
Encoder's internal mutable state will be updated from multiple threads.
    
    Here is an example demonstrating the problem:
    
    ```scala
    import org.apache.spark.sql._
    
    val enc = implicitly[Encoder[(Int, Int)]]
    
    val datasets = (1 to 100).par.map { _ =>
      val pairs = (1 to 100).map(x => (x, x))
      spark.createDataset(pairs)(enc)
    }
    
    datasets.reduce(_ union _).collect().foreach {
      pair => require(pair._1 == pair._2, s"Pair elements are mismatched: 
$pair")
    }
    ```
    
    Before this PR's change, the above example fails because Spark produces 
corrupted records where different input records' fields have been co-mingled.
    
    This bug is similar to SPARK-22355 / #19577, a similar problem in 
`Dataset.collect()`.
    
    The fix implemented here is based on #24735's updated version of the 
`Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code 
comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414)
 / explanation as that PR.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tested manually using the example listed above.
    
    Thanks to smcnamara-stripe for identifying this bug.
    
    Closes #26076 from JoshRosen/SPARK-29419.
    
    Authored-by: Josh Rosen <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
    (cherry picked from commit f4499f678dc2e9f72c3ee5d2af083aa6b98f3fc2)
    Signed-off-by: HyukjinKwon <[email protected]>
---
 sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 2b847fb..edbd02b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -467,7 +467,8 @@ class SparkSession private(
   @Experimental
   @InterfaceStability.Evolving
   def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
-    val enc = encoderFor[T]
+    // `ExpressionEncoder` is not thread-safe, here we create a new encoder.
+    val enc = encoderFor[T].copy()
     val attributes = enc.schema.toAttributes
     val encoded = data.map(d => enc.toRow(d).copy())
     val plan = new LocalRelation(attributes, encoded)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to