[
https://issues.apache.org/jira/browse/SPARK-8509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14595163#comment-14595163
]
Joseph K. Bradley commented on SPARK-8509:
------------------------------------------
I believe the bug is here, where you'll need to be careful about what is a
DStream vs RDD vs local data:
{code}
testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]),
line[1])).transform(lambda rdd: rdd.leftOuterJoin(modelRDD))
testRDD.pprint(20)
{code}
I believe this is a bug in your code, not in Spark, so please post on the user
list instead of JIRA. I'll close this for now.
> Failed to JOIN in pyspark
> -------------------------
>
> Key: SPARK-8509
> URL: https://issues.apache.org/jira/browse/SPARK-8509
> Project: Spark
> Issue Type: Bug
> Reporter: afancy
>
> Hi,
> I am writing pyspark stream program. I have the training data set to compute
> the regression model. I want to use the stream data set to test the model.
> So, I join with RDD with the StreamRDD, but i got the exception. Following
> are my source code, and the exception I got. Any help is appreciated. Thanks
> Regards,
> Afancy
> --------------------
> {code}
> from __future__ import print_function
> import sys,os,datetime
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.sql.context import SQLContext
> from pyspark.resultiterable import ResultIterable
> from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
> import numpy as np
> import statsmodels.api as sm
> def splitLine(line, delimiter='|'):
> values = line.split(delimiter)
> st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
> return (values[0],st.hour), values[2:]
> def reg_m(y, x):
> ones = np.ones(len(x[0]))
> X = sm.add_constant(np.column_stack((x[0], ones)))
> for ele in x[1:]:
> X = sm.add_constant(np.column_stack((ele, X)))
> results = sm.OLS(y, X).fit()
> return results
> def train(line):
> y,x = [],[]
> y, x = [],[[],[],[],[],[],[]]
> reading_tmp,temp_tmp = [],[]
> i = 0
> for reading, temperature in line[1]:
> if i%4==0 and len(reading_tmp)==4:
> y.append(reading_tmp.pop())
> x[0].append(reading_tmp.pop())
> x[1].append(reading_tmp.pop())
> x[2].append(reading_tmp.pop())
> temp = float(temp_tmp[0])
> del temp_tmp[:]
> x[3].append(temp-20.0 if temp>20.0 else 0.0)
> x[4].append(16.0-temp if temp<16.0 else 0.0)
> x[5].append(5.0-temp if temp<5.0 else 0.0)
> reading_tmp.append(float(reading))
> temp_tmp.append(float(temperature))
> i = i + 1
> return str(line[0]),reg_m(y, x).params.tolist()
> if __name__ == "__main__":
> if len(sys.argv) != 4:
> print("Usage: regression.py <checkpointDir> <trainingDataDir>
> <streamDataDir>", file=sys.stderr)
> exit(-1)
> checkpoint, trainingInput, streamInput = sys.argv[1:]
> sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming")
> trainingLines = sc.textFile(trainingInput)
> modelRDD = trainingLines.map(lambda line: splitLine(line, "|"))\
> .groupByKey()\
> .map(lambda line: train(line))\
> .cache()
> ssc = StreamingContext(sc, 2)
> ssc.checkpoint(checkpoint)
> lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line,
> "|"))
> testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]),
> line[1])).transform(lambda rdd: rdd.leftOuterJoin(modelRDD))
> testRDD.pprint(20)
> ssc.start()
> ssc.awaitTermination()
> {code}
> ------------------------
> {code}
> 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to
> 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
> Traceback (most recent call last):
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py",
> line 90, in dumps
> return bytearray(self.serializer.dumps((func.func, func.deserializers)))
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 427, in dumps
> return cloudpickle.dumps(obj, 2)
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 622, in dumps
> cp.dump(obj)
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 107, in dump
> return Pickler.dump(self, obj)
> File "/usr/lib/python2.7/pickle.py", line 224, in dump
> self.save(obj)
> File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
> File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
> save(element)
> File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 193, in save_function
> self.save_function_tuple(obj)
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 236, in save_function_tuple
> save((code, closure, base_globals))
> File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
> File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
> save(element)
> File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
> File "/usr/lib/python2.7/pickle.py", line 600, in save_list
> self._batch_appends(iter(obj))
> File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
> save(x)
> File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 193, in save_function
> self.save_function_tuple(obj)
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 236, in save_function_tuple
> save((code, closure, base_globals))
> File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
> File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
> save(element)
> File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
> File "/usr/lib/python2.7/pickle.py", line 600, in save_list
> self._batch_appends(iter(obj))
> File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
> save(tmp[0])
> File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 193, in save_function
> self.save_function_tuple(obj)
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 241, in save_function_tuple
> save(f_globals)
> File "/usr/lib/python2.7/pickle.py", line 286, in save
> f(self, obj) # Call unbound method with explicit self
> File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
> self._batch_setitems(obj.iteritems())
> File "/usr/lib/python2.7/pickle.py", line 686, in _batch_setitems
> save(v)
> File "/usr/lib/python2.7/pickle.py", line 306, in save
> rv = reduce(self.proto)
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line
> 193, in __getnewargs__
> "It appears that you are attempting to broadcast an RDD or reference an
> RDD from an "
> Exception: It appears that you are attempting to broadcast an RDD or
> reference an RDD from an action or transformation. RDD transformations and
> actions can only be invoked by the driver, not inside of other
> transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is
> invalid because the values transformation and count action cannot be
> performed inside of the rdd1.map transformation. For more information, see
> SPARK-5063.
> 15/06/18 12:25:37 ERROR StreamingContext: Error starting the context, marking
> it as stopped
> java.io.IOException: java.lang.NullPointerException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
> at
> org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
> at
> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
> at
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
> at
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> at
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
> at
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
> at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> at
> org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
> ... 61 more
> Traceback (most recent call last):
> File "/home/xiuli/PycharmProjects/benchmark/regression.py", line 103, in
> <module>
> ssc.start()
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py",
> line 184, in start
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
> File
> "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o42.start.
> : java.io.IOException: java.lang.NullPointerException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
> at
> org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
> at
> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:113)
> at
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
> at
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> at
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:114)
> at
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:547)
> at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> at
> org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
> ... 61 more
> 15/06/18 12:25:37 INFO SparkContext: Invoking stop() from shutdown hook
> 15/06/18 12:25:37 INFO SparkUI: Stopped Spark web UI at
> http://10.41.27.11:4040
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]