Hi Andrew, sorry but to me it seems s3 is the culprit.... I have downloaded your json file and stored locally. Then write this simple app (a subset of what you have in ur github, sorry i m littebit rusty on how to create new column out of existing ones) which basically read the json file It's in Scala, but the Python equivaletnt shouldn't be difficult i noticed that in your schema you forgot the timezone column.... was that intentional? Anyway, below the code. i ran it with Spark 2.0 and similarly 1.6... found no issues in reading the data. If i have some time i'll try to store your json on one of my s3 bucket and read it via spark from EC2
def main(args:Array[String]) = { import org.apache.spark.sql.types._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.rdd._ import org.apache.spark.SparkContext._ import org.apache.spark.sql._ val conf = new SparkConf().setAppName("Simple Application").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // no schema val jsonContentNoSchema = sqlContext.read.json("file:///c:/tmp/1973-01-11.json") jsonContentNoSchema.printSchema() println(s"TheJsonContent with No SChema has ${jsonContentNoSchema.count()}") // with schema val jsonRdd = sc.textFile("file:///c:/tmp/1973-01-11.json") import sqlContext.implicits._ val schema = (new StructType).add("hour", StringType).add("month", StringType) .add("second", StringType).add("year", StringType) .add("timezone", StringType).add("day", StringType) .add("minute", StringType) val jsonContentWithSchema = sqlContext.jsonRDD(jsonRdd, schema) println(s"----- And the Json withSchema has ${jsonContentWithSchema.count()} rows") } hope this helps kr marco On Mon, Nov 28, 2016 at 2:48 PM, Andrew Holway < andrew.hol...@otternetworks.de> wrote: > I extracted out the boto bits and tested in vanilla python on the nodes. I > am pretty sure that the data from S3 is ok. I've applied a public policy to > the bucket s3://time-waits-for-no-man. There is a publicly available object > here: https://s3-eu-west-1.amazonaws.com/time-waits-for-no-man/1973-01-11 > > I'm using boto because using proxies with spark and hadoop in general is a > bit of a black art. > > > [centos@hadoop002 ~]$ python s3_test.py > object key > 1973-01-11 > Length of List > 86400 > First row > {u'hour': u'00', 'timestamp': 95558400, u'month': u'01', u'second': u'00', > u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'00'} > Last row > {u'hour': u'23', 'timestamp': 95644799, u'month': u'01', u'second': u'59', > u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'59'} > [centos@hadoop002 ~]$ cat s3_test.py > import boto3 > import ujson > import arrow > import sys > import os > import getpass > > os.environ['HTTPS_PROXY'] = 'https://webproxy:8080 > <https://webproxy.aws.db.de:8080>' > > def add_timestamp(dict): > dict['timestamp'] = arrow.get( > int(dict['year']), > int(dict['month']), > int(dict['day']), > int(dict['hour']), > int(dict['minute']), > int(dict['second']) > ).timestamp > return dict > > s3_list = [] > s3 = boto3.resource('s3') > my_bucket = s3.Bucket('time-waits-for-no-man') > for object in my_bucket.objects.filter(Prefix='1973-01-11'): > s3_list.append(object.key) > > print("object key") > print (s3_list[0]) > > s3obj = boto3.resource('s3').Object(bucket_name='time-waits-for-no-man', > key=s3_list[0]) > contents = s3obj.get()['Body'].read().decode() > meow = contents.splitlines() > result_wo_timestamp = map(ujson.loads, meow) > result_wo_timestamp[0] > result_wi_timestamp = map(add_timestamp, result_wo_timestamp) > > print("Length of List") > print(len(result_wi_timestamp)) > print("First row") > print(result_wi_timestamp[0]) > print("Last row") > print(result_wi_timestamp[86399]) > > > > > On Sun, Nov 27, 2016 at 7:11 PM, Marco Mistroni <mmistr...@gmail.com> > wrote: > >> Hi >> >> pickle erros normally point to serialisation issue. i am suspecting >> something wrong with ur S3 data , but is just a wild guess... >> >> Is your s3 object publicly available? >> >> few suggestions to nail down the problem >> >> 1 - try to see if you can read your object from s3 using boto3 library >> 'offline', meaning not in a spark code >> >> 2 - try to replace your distributedJsonRead. instead of reading from s3, >> generate a string out of a snippet of your json object >> >> 3 - Spark can read data from s3 as well , just do a >> sc.textFile('s3://....) ==> http://www.sparktutorials. >> net/reading-and-writing-s3-data-with-apache-spark. Try to se spark >> entirely to read and process the data, rather than go via boto3. It adds an >> extra complexity which you dont need >> >> If you send a snippet ofyour json content, then everyone on the list can >> run the code and try to reproduce >> >> >> hth >> >> Marco >> >> >> On 27 Nov 2016 7:33 pm, "Andrew Holway" <andrew.hol...@otternetworks.de> >> wrote: >> >>> I get a slight different error when not specifying a schema: >>> >>> Traceback (most recent call last): >>> File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py", >>> line 61, in <module> >>> df = sqlContext.createDataFrame(foo) >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", >>> line 299, in createDataFrame >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >>> line 520, in createDataFrame >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >>> line 360, in _createFromRDD >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >>> line 331, in _inferSchema >>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>> line 1328, in first >>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>> line 1310, in take >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/context.py", >>> line 941, in runJob >>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>> line 2403, in _jrdd >>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>> line 2336, in _wrap_function >>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>> line 2315, in _prepare_for_python_RDD >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py", >>> line 428, in dumps >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 657, in dumps >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 107, in dump >>> File "/usr/lib64/python2.7/pickle.py", line 224, in dump >>> self.save(obj) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple >>> save(element) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 204, in save_function >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 241, in save_function_tuple >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>> save(element) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>> self._batch_appends(iter(obj)) >>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>> save(x) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 204, in save_function >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 241, in save_function_tuple >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>> save(element) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>> self._batch_appends(iter(obj)) >>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>> save(x) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 204, in save_function >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 241, in save_function_tuple >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>> save(element) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>> self._batch_appends(iter(obj)) >>> File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends >>> save(tmp[0]) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 198, in save_function >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 246, in save_function_tuple >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict >>> self._batch_setitems(obj.iteritems()) >>> File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems >>> save(v) >>> File "/usr/lib64/python2.7/pickle.py", line 306, in save >>> rv = reduce(self.proto) >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", >>> line 933, in __call__ >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", >>> line 63, in deco >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", >>> line 316, in get_return_value >>> py4j.protocol.Py4JError: An error occurred while calling >>> o33.__getnewargs__. Trace: >>> py4j.Py4JException: Method __getnewargs__([]) does not exist >>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) >>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) >>> at py4j.Gateway.invoke(Gateway.java:272) >>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) >>> at py4j.commands.CallCommand.execute(CallCommand.java:79) >>> at py4j.GatewayConnection.run(GatewayConnection.java:211) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> On Sun, Nov 27, 2016 at 8:32 PM, Andrew Holway < >>> andrew.hol...@otternetworks.de> wrote: >>> >>>> Hi, >>>> >>>> Can anyone tell me what is causing this error >>>> Spark 2.0.0 >>>> Python 2.7.5 >>>> >>>> df = sqlContext.createDataFrame(foo, schema) >>>> https://gist.github.com/mooperd/368e3453c29694c8b2c038d6b7b4413a >>>> >>>> Traceback (most recent call last): >>>> File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py", >>>> line 61, in <module> >>>> df = sqlContext.createDataFrame(foo, schema) >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", >>>> line 299, in createDataFrame >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >>>> line 523, in createDataFrame >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>> line 2220, in _to_java_object_rdd >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>> line 2403, in _jrdd >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>> line 2336, in _wrap_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>> line 2315, in _prepare_for_python_RDD >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py", >>>> line 428, in dumps >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 657, in dumps >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 107, in dump >>>> File "/usr/lib64/python2.7/pickle.py", line 224, in dump >>>> self.save(obj) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple >>>> save(element) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 204, in save_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 241, in save_function_tuple >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>>> save(element) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>>> self._batch_appends(iter(obj)) >>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>>> save(x) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 204, in save_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 241, in save_function_tuple >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>>> save(element) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>>> self._batch_appends(iter(obj)) >>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>>> save(x) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 204, in save_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 241, in save_function_tuple >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>>> save(element) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>>> self._batch_appends(iter(obj)) >>>> File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends >>>> save(tmp[0]) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 198, in save_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 246, in save_function_tuple >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict >>>> self._batch_setitems(obj.iteritems()) >>>> File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems >>>> save(v) >>>> File "/usr/lib64/python2.7/pickle.py", line 306, in save >>>> rv = reduce(self.proto) >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", >>>> line 933, in __call__ >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", >>>> line 63, in deco >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", >>>> line 316, in get_return_value >>>> py4j.protocol.Py4JError: An error occurred while calling >>>> o33.__getnewargs__. Trace: >>>> py4j.Py4JException: Method __getnewargs__([]) does not exist >>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine. >>>> java:318) >>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine. >>>> java:326) >>>> at py4j.Gateway.invoke(Gateway.java:272) >>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) >>>> at py4j.commands.CallCommand.execute(CallCommand.java:79) >>>> at py4j.GatewayConnection.run(GatewayConnection.java:211) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> >>>> -- >>>> Otter Networks UG >>>> http://otternetworks.de >>>> Gotenstraße 17 >>>> 10829 Berlin >>>> >>> >>> >>> >>> -- >>> Otter Networks UG >>> http://otternetworks.de >>> Gotenstraße 17 >>> 10829 Berlin >>> >> > > > -- > Otter Networks UG > http://otternetworks.de > Gotenstraße 17 > 10829 Berlin >