[ 
https://issues.apache.org/jira/browse/SPARK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249525#comment-14249525
 ] 

Aaron Davidson commented on SPARK-4868:
---------------------------------------

The repl has funny namespacing going on, what happens if you put your no-op 
function inside of an object, a la
object Test {
  def noop(a: Any) = {}
}
and then invoke it statically? Make sure to restart the repl in between 
attempts to avoid polluted namespaces as well.

If that doesn't work you can always mark the streaming context as transient, 
but that never feels quite right.

> Twitter DStream.map() throws "Task not serializable"
> ----------------------------------------------------
>
>                 Key: SPARK-4868
>                 URL: https://issues.apache.org/jira/browse/SPARK-4868
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell, Streaming
>    Affects Versions: 1.1.1
>         Environment: * Spark 1.1.1
> * EC2 cluster with 1 slave spun up using {{spark-ec2}}
> * twitter4j 3.0.3
> * {{spark-shell}} called with {{--jars}} argument to load 
> {{spark-streaming-twitter_2.10-1.0.0.jar}} as well as all the twitter4j jars.
>            Reporter: Nicholas Chammas
>            Priority: Minor
>
> _(Continuing the discussion [started here on the Spark user 
> list|http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-in-Spark-Streaming-td5725.html].)_
> The following Spark Streaming code throws a serialization exception I do not 
> understand.
> {code}
> import twitter4j.auth.{Authorization, OAuthAuthorization}
> import twitter4j.conf.ConfigurationBuilder 
> import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
> import org.apache.spark.streaming.twitter.TwitterUtils
> def getAuth(): Option[Authorization] = {
>   System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
>   System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
>   System.setProperty("twitter4j.oauth.accessToken", "accessToken") 
>   System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
>   Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
> } 
> def noop(a: Any): Any = {
>   a
> }
> val ssc = new StreamingContext(sc, Seconds(5))
> val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
> val liveTweets = liveTweetObjects.map(_.getText)
> liveTweets.map(t => noop(t)).print()  // exception here
> ssc.start()
> {code}
> So before I even start the StreamingContext, I get the following stack trace:
> {code}
> scala> liveTweets.map(t => noop(t)).print()
> org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>       at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
>       at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
>       at $iwC$$iwC$$iwC.<init>(<console>:32)
>       at $iwC$$iwC.<init>(<console>:34)
>       at $iwC.<init>(<console>:36)
>       at <init>(<console>:38)
>       at .<init>(<console>:42)
>       at .<clinit>(<console>)
>       at .<init>(<console>:7)
>       at .<clinit>(<console>)
>       at $print(<console>)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
>       at 
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
>       at 
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
>       at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
>       at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
>       at 
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
>       at 
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
>       at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
>       at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
>       at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
>       at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
>       at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
>       at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
>       at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
>       at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>       at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
>       at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
>       at org.apache.spark.repl.Main$.main(Main.scala:31)
>       at org.apache.spark.repl.Main.main(Main.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.streaming.StreamingContext
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>       ... 43 more
> {code}
> What I'm really trying to do is use Spark Streaming via the interactive shell 
> to filter Tweets using a trained KMeans model. I got errors trying that, and 
> I boiled it down to this repro.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to