This seems be a bug, could you file a JIRA for it? RDD should be serializable for Streaming job.
On Thu, Jun 18, 2015 at 4:25 AM, Groupme <grou...@gmail.com> wrote: > Hi, > > > I am writing pyspark stream program. I have the training data set to compute > the regression model. I want to use the stream data set to test the model. > So, I join with RDD with the StreamRDD, but i got the exception. Following > are my source code, and the exception I got. Any help is appreciated. Thanks > > > Regards, > > Afancy > > -------------------- > > > from __future__ import print_function > > import sys,os,datetime > > from pyspark import SparkContext > from pyspark.streaming import StreamingContext > from pyspark.sql.context import SQLContext > from pyspark.resultiterable import ResultIterable > from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD > import numpy as np > import statsmodels.api as sm > > > def splitLine(line, delimiter='|'): > values = line.split(delimiter) > st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S') > return (values[0],st.hour), values[2:] > > def reg_m(y, x): > ones = np.ones(len(x[0])) > X = sm.add_constant(np.column_stack((x[0], ones))) > for ele in x[1:]: > X = sm.add_constant(np.column_stack((ele, X))) > results = sm.OLS(y, X).fit() > return results > > def train(line): > y,x = [],[] > y, x = [],[[],[],[],[],[],[]] > reading_tmp,temp_tmp = [],[] > i = 0 > for reading, temperature in line[1]: > if i%4==0 and len(reading_tmp)==4: > y.append(reading_tmp.pop()) > x[0].append(reading_tmp.pop()) > x[1].append(reading_tmp.pop()) > x[2].append(reading_tmp.pop()) > temp = float(temp_tmp[0]) > del temp_tmp[:] > x[3].append(temp-20.0 if temp>20.0 else 0.0) > x[4].append(16.0-temp if temp<16.0 else 0.0) > x[5].append(5.0-temp if temp<5.0 else 0.0) > reading_tmp.append(float(reading)) > temp_tmp.append(float(temperature)) > i = i + 1 > return str(line[0]),reg_m(y, x).params.tolist() > > if __name__ == "__main__": > if len(sys.argv) != 4: > print("Usage: regression.py <checkpointDir> <trainingDataDir> > <streamDataDir>", file=sys.stderr) > exit(-1) > > checkpoint, trainingInput, streamInput = sys.argv[1:] > sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming") > > trainingLines = sc.textFile(trainingInput) > modelRDD = trainingLines.map(lambda line: splitLine(line, "|"))\ > .groupByKey()\ > .map(lambda line: train(line))\ > .cache() > > > ssc = StreamingContext(sc, 2) > ssc.checkpoint(checkpoint) > lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, > "|")) > > > testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), > line[1])).transform(lambda rdd: rdd.leftOuterJoin(modelRDD)) > testRDD.pprint(20) > > ssc.start() > ssc.awaitTermination() > > > ------------------------ > > 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set > to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6 > Traceback (most recent call last): > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", > line 90, in dumps > return bytearray(self.serializer.dumps((func.func, func.deserializers))) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", > line 427, in dumps > return cloudpickle.dumps(obj, 2) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 622, in dumps > cp.dump(obj) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 107, in dump > return Pickler.dump(self, obj) > File "/usr/lib/python2.7/pickle.py", line 224, in dump > self.save(obj) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple > save(element) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 193, in save_function > self.save_function_tuple(obj) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 236, in save_function_tuple > save((code, closure, base_globals)) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple > save(element) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 600, in save_list > self._batch_appends(iter(obj)) > File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends > save(x) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 193, in save_function > self.save_function_tuple(obj) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 236, in save_function_tuple > save((code, closure, base_globals)) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple > save(element) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 600, in save_list > self._batch_appends(iter(obj)) > File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends > save(tmp[0]) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 193, in save_function > self.save_function_tuple(obj) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", > line 241, in save_function_tuple > save(f_globals) > File "/usr/lib/python2.7/pickle.py", line 286, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/lib/python2.7/pickle.py", line 649, in save_dict > self._batch_setitems(obj.iteritems()) > File "/usr/lib/python2.7/pickle.py", line 686, in _batch_setitems > save(v) > File "/usr/lib/python2.7/pickle.py", line 306, in save > rv = reduce(self.proto) > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line > 193, in __getnewargs__ > "It appears that you are attempting to broadcast an RDD or reference an > RDD from an " > Exception: It appears that you are attempting to broadcast an RDD or > reference an RDD from an action or transformation. RDD transformations and > actions can only be invoked by the driver, not inside of other > transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is > invalid because the values transformation and count action cannot be > performed inside of the rdd1.map transformation. For more information, see > SPARK-5063. > 15/06/18 12:25:37 ERROR StreamingContext: Error starting the context, > marking it as stopped > java.io.IOException: java.lang.NullPointerException > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) > at > org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > at > org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) > at > org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114) > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) > at > org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > ... 61 more > Traceback (most recent call last): > File "/home/xiuli/PycharmProjects/benchmark/regression.py", line 103, in > <module> > ssc.start() > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py", > line 184, in start > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File > "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o42.start. > : java.io.IOException: java.lang.NullPointerException > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) > at > org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > at > org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) > at > org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114) > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) > at > org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > ... 61 more > > 15/06/18 12:25:37 INFO SparkContext: Invoking stop() from shutdown hook > 15/06/18 12:25:37 INFO SparkUI: Stopped Spark web UI at > http://10.41.27.11:4040 > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org