Hi
I managed to find the time to put together a PR on this: https://github.com/apache/incubator-spark/pull/263 Josh has had a look over it - if anyone else with an interest could give some feedback that would be great. As mentioned in the PR it's more of an RFC and certainly still needs a bit of clean up work, and I need to add the concept of "wrapper functions" to deserialize classes that MsgPack can't handle out the box. N — Sent from Mailbox for iPhone On Fri, Nov 8, 2013 at 12:20 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > Wow Josh, that looks great. I've been a bit swamped this week but as soon > as I get a chance I'll test out the PR in more detail and port over the > InputFormat stuff to use the new framework (including the changes you > suggested). > I can then look deeper into the MsgPack functionality to see if it can be > made to work in a generic enough manner without requiring huge amounts of > custom Templates to be written by users. > Will feed back asap. > N > On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <rosenvi...@gmail.com> wrote: >> I opened a pull request to add custom serializer support to PySpark: >> https://github.com/apache/incubator-spark/pull/146 >> >> My pull request adds the plumbing for transferring data from Java to Python >> using formats other than Pickle. For example, look at how textFile() uses >> MUTF8Deserializer to read strings from Java. Hopefully this provides all >> of the functionality needed to support MsgPack. >> >> - Josh >> >> >> On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <rosenvi...@gmail.com> wrote: >> >> > Hi Nick, >> > >> > This is a nice start. I'd prefer to keep the Java sequenceFileAsText() >> > and newHadoopFileAsText() methods inside PythonRDD instead of adding them >> > to JavaSparkContext, since I think these methods are unlikely to be used >> > directly by Java users (you can add these methods to the PythonRDD >> > companion object, which is how readRDDFromPickleFile is implemented: >> > >> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255 >> > ) >> > >> > For MsgPack, the UnpicklingError is because the Python worker expects to >> > receive its input in a pickled format. In my prototype of custom >> > serializers, I modified the PySpark worker to receive its >> > serialization/deserialization function as input ( >> > >> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41 >> ) >> > and added logic to pass the appropriate serializers based on each stage's >> > input and output formats ( >> > >> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42 >> > ). >> > >> > At some point, I'd like to port my custom serializers code to PySpark; if >> > anyone's interested in helping, I'd be glad to write up some additional >> > notes on how this should work. >> > >> > - Josh >> > >> > On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath < >> nick.pentre...@gmail.com>wrote: >> > >> >> Thanks Josh, Patrick for the feedback. >> >> >> >> Based on Josh's pointers I have something working for JavaPairRDD -> >> >> PySpark RDD[(String, String)]. This just calls the toString method on >> each >> >> key and value as before, but without the need for a delimiter. For >> >> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls >> >> toString to convert to Text for keys and values. We then call toString >> >> (again) ourselves to get Strings to feed to writeAsPickle. >> >> >> >> Details here: https://gist.github.com/MLnick/7230588 >> >> >> >> This also illustrates where the "wrapper function" api would fit in. All >> >> that is required is to define a T => String for key and value. >> >> >> >> I started playing around with MsgPack and can sort of get things to work >> >> in >> >> Scala, but am struggling with getting the raw bytes to be written >> properly >> >> in PythonRDD (I think it is treating them as pickled byte arrays when >> they >> >> are not, but when I removed the 'stripPickle' calls and amended the >> length >> >> (-6) I got "UnpicklingError: invalid load key, ' '. "). >> >> >> >> Another issue is that MsgPack does well at writing "structures" - like >> >> Java >> >> classes with public fields that are fairly simple - but for example the >> >> Writables have private fields so you end up with nothing being written. >> >> This looks like it would require custom "Templates" (serialization >> >> functions effectively) for many classes, which means a lot of custom >> code >> >> for a user to write to use it. Fortunately for most of the common >> >> Writables >> >> a toString does the job. Will keep looking into it though. >> >> >> >> Anyway, Josh if you have ideas or examples on the "Wrapper API from >> >> Python" >> >> that you mentioned, I'd be interested to hear them. >> >> >> >> If you think this is worth working up as a Pull Request covering >> >> SequenceFiles and custom InputFormats with default toString conversions >> >> and >> >> the ability to specify Wrapper functions, I can clean things up more, >> add >> >> some functionality and tests, and also test to see if common things like >> >> the "normal" Writables and reading from things like HBase and Cassandra >> >> can >> >> be made to work nicely (any other common use cases that you think make >> >> sense?). >> >> >> >> Thoughts, comments etc welcome. >> >> >> >> Nick >> >> >> >> >> >> >> >> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwend...@gmail.com >> >> >wrote: >> >> >> >> > As a starting point, a version where people just write their own >> >> "wrapper" >> >> > functions to convert various HadoopFiles into String <K, V> files >> could >> >> go >> >> > a long way. We could even have a few built-in versions, such as >> dealing >> >> > with Sequence files that are <String, String>. Basically, the user >> >> needs to >> >> > write a translator in Java/Scala that produces textual records from >> >> > whatever format that want. Then, they make sure this is included in >> the >> >> > classpath when running PySpark. >> >> > >> >> > As Josh is saying, I'm pretty sure this is already possible, but we >> may >> >> > want to document it for users. In many organizations they might have >> 1-2 >> >> > people who can write the Java/Scala to do this but then many more >> people >> >> > who are comfortable using python once it's setup. >> >> > >> >> > - Patrick >> >> > >> >> > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <rosenvi...@gmail.com> >> >> wrote: >> >> > >> >> > > Hi Nick, >> >> > > >> >> > > I've seen several requests for SequenceFile support in PySpark, so >> >> > there's >> >> > > definitely demand for this feature. >> >> > > >> >> > > I like the idea of passing MsgPack'ed data (or some other structured >> >> > > format) from Java to the Python workers. My early prototype of >> custom >> >> > > serializers (described at >> >> > > >> >> > > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers >> >> > > ) >> >> > > might be useful for implementing this. Proper custom serializer >> >> support >> >> > > would handle the bookkeeping for tracking each stage's input and >> >> output >> >> > > formats and supplying the appropriate deserialization functions to >> the >> >> > > Python worker, so the Python worker would be able to directly read >> the >> >> > > MsgPack'd data that's sent to it. >> >> > > >> >> > > Regarding a wrapper API, it's actually possible to initially >> transform >> >> > data >> >> > > using Scala/Java and perform the remainder of the processing in >> >> PySpark. >> >> > > This involves adding the appropriate compiled to the Java classpath >> >> and >> >> > a >> >> > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use >> >> by >> >> > > PySpark. I can hack together a rough example of this if anyone's >> >> > > interested, but it would need some work to be developed into a >> >> > > user-friendly API. >> >> > > >> >> > > If you wanted to extend your proof-of-concept to handle the cases >> >> where >> >> > > keys and values have parseable toString() values, I think you could >> >> > remove >> >> > > the need for a delimiter by creating a PythonRDD from the >> >> newHadoopFile >> >> > > JavaPairRDD and adding a new method to writeAsPickle ( >> >> > > >> >> > > >> >> > >> >> >> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224 >> >> > > ) >> >> > > to dump its contents as a pickled pair of strings. (Aside: most of >> >> > > writeAsPickle() would probably need be eliminated or refactored when >> >> > adding >> >> > > general custom serializer support). >> >> > > >> >> > > - Josh >> >> > > >> >> > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath >> >> > > <nick.pentre...@gmail.com>wrote: >> >> > > >> >> > > > Hi Spark Devs >> >> > > > >> >> > > > I was wondering what appetite there may be to add the ability for >> >> > PySpark >> >> > > > users to create RDDs from (somewhat) arbitrary Hadoop >> InputFormats. >> >> > > > >> >> > > > In my data pipeline for example, I'm currently just using Scala >> >> (partly >> >> > > > because I love it but also because I am heavily reliant on quite >> >> custom >> >> > > > Hadoop InputFormats for reading data). However, many users may >> >> prefer >> >> > to >> >> > > > use PySpark as much as possible (if not for everything). Reasons >> >> might >> >> > > > include the need to use some Python library. While I don't do it >> >> yet, I >> >> > > can >> >> > > > certainly see an attractive use case for using say scikit-learn / >> >> numpy >> >> > > to >> >> > > > do data analysis & machine learning in Python. Added to this my >> >> > cofounder >> >> > > > knows Python well but not Scala so it can be very beneficial to >> do a >> >> > lot >> >> > > of >> >> > > > stuff in Python. >> >> > > > >> >> > > > For text-based data this is fine, but reading data in from more >> >> complex >> >> > > > Hadoop formats is an issue. >> >> > > > >> >> > > > The current approach would of course be to write an ETL-style >> >> > Java/Scala >> >> > > > job and then process in Python. Nothing wrong with this, but I was >> >> > > thinking >> >> > > > about ways to allow Python to access arbitrary Hadoop >> InputFormats. >> >> > > > >> >> > > > Here is a quick proof of concept: >> >> > https://gist.github.com/MLnick/7150058 >> >> > > > >> >> > > > This works for simple stuff like SequenceFile with simple Writable >> >> > > > key/values. >> >> > > > >> >> > > > To work with more complex files, perhaps an approach is to >> >> manipulate >> >> > > > Hadoop JobConf via Python and pass that in. The one downside is of >> >> > course >> >> > > > that the InputFormat (well actually the Key/Value classes) must >> >> have a >> >> > > > toString that makes sense so very custom stuff might not work. >> >> > > > >> >> > > > I wonder if it would be possible to take the objects that are >> >> yielded >> >> > via >> >> > > > the InputFormat and convert them into some representation like >> >> > ProtoBuf, >> >> > > > MsgPack, Avro, JSON, that can be read relatively more easily from >> >> > Python? >> >> > > > >> >> > > > Another approach could be to allow a simple "wrapper API" such >> that >> >> one >> >> > > can >> >> > > > write a wrapper function T => String and pass that into an >> >> > > > InputFormatWrapper that takes an arbitrary InputFormat and yields >> >> > Strings >> >> > > > for the keys and values. Then all that is required is to compile >> >> that >> >> > > > function and add it to the SPARK_CLASSPATH and away you go! >> >> > > > >> >> > > > Thoughts? >> >> > > > >> >> > > > Nick >> >> > > > >> >> > > >> >> > >> >> >> > >> > >>