[ 
https://issues.apache.org/jira/browse/SPARK-5035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-5035.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 1.2.1
                   1.1.2
                   1.3.0
                   1.0.3

> Streaming ReceiverMessage trait should extend Serializable
> ----------------------------------------------------------
>
>                 Key: SPARK-5035
>                 URL: https://issues.apache.org/jira/browse/SPARK-5035
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.2, 1.3.0, 1.1.2, 1.2.1
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Critical
>             Fix For: 1.0.3, 1.3.0, 1.1.2, 1.2.1
>
>
> Spark Streaming's {{ReceiverMessage}} trait should extend Serializable in 
> order to fix a subtle bug that only occurs when running on a real cluster:
> If you attempt to send a fire-and-forget message to a remote Akka actor and 
> that message cannot be serialized, then this seems to lead to more-or-less 
> silent failures.  As an optimization, Akka skips message serialization for 
> messages sent within the same JVM.  As a result, Spark's unit tests will 
> never fail due to non-serializable Akka messages, but these will cause 
> mostly-silent failures when running on a real cluster.
> Here's the current code for ReceiverMessage:
> {code}
> /** Messages sent to the NetworkReceiver. */
> private[streaming] sealed trait ReceiverMessage
> private[streaming] object StopReceiver extends ReceiverMessage
> {code}
> Since {{ReceiverMessage}} does not extend Serializable and {{StopReceiver}} 
> is a regular {{object}}, not a {{case object}}, {{StopReceiver}} will throw 
> serialization errors.  As a result, graceful receiver shutdown is broken on 
> real clusters but works in {{local}} and {{local-cluster}} modes.  If you 
> want to reproduce this, try running the word count example from the Streaming 
> Programming Guide in the Spark shell:
> {code}
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.StreamingContext._
> val ssc = new StreamingContext(sc, Seconds(10))
> // Create a DStream that will connect to hostname:port, like localhost:9999
> val lines = ssc.socketTextStream("localhost", 9999)
> // Split each line into words
> val words = lines.flatMap(_.split(" "))
> import org.apache.spark.streaming.StreamingContext._
> // Count each word in each batch
> val pairs = words.map(word => (word, 1))
> val wordCounts = pairs.reduceByKey(_ + _)
> // Print the first ten elements of each RDD generated in this DStream to the 
> console
> wordCounts.print()
> ssc.start()
> Thread.sleep(10000)
> ssc.stop(true, true)
> {code}
> This will work correctly in local mode but fail when running against a real 
> cluster (try sbin/start-all.sh to test this locally).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to