Wow Josh, that looks great. I've been a bit swamped this week but as soon as I get a chance I'll test out the PR in more detail and port over the InputFormat stuff to use the new framework (including the changes you suggested).
I can then look deeper into the MsgPack functionality to see if it can be made to work in a generic enough manner without requiring huge amounts of custom Templates to be written by users. Will feed back asap. N On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <rosenvi...@gmail.com> wrote: > I opened a pull request to add custom serializer support to PySpark: > https://github.com/apache/incubator-spark/pull/146 > > My pull request adds the plumbing for transferring data from Java to Python > using formats other than Pickle. For example, look at how textFile() uses > MUTF8Deserializer to read strings from Java. Hopefully this provides all > of the functionality needed to support MsgPack. > > - Josh > > > On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <rosenvi...@gmail.com> wrote: > > > Hi Nick, > > > > This is a nice start. I'd prefer to keep the Java sequenceFileAsText() > > and newHadoopFileAsText() methods inside PythonRDD instead of adding them > > to JavaSparkContext, since I think these methods are unlikely to be used > > directly by Java users (you can add these methods to the PythonRDD > > companion object, which is how readRDDFromPickleFile is implemented: > > > https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255 > > ) > > > > For MsgPack, the UnpicklingError is because the Python worker expects to > > receive its input in a pickled format. In my prototype of custom > > serializers, I modified the PySpark worker to receive its > > serialization/deserialization function as input ( > > > https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41 > ) > > and added logic to pass the appropriate serializers based on each stage's > > input and output formats ( > > > https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42 > > ). > > > > At some point, I'd like to port my custom serializers code to PySpark; if > > anyone's interested in helping, I'd be glad to write up some additional > > notes on how this should work. > > > > - Josh > > > > On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath < > nick.pentre...@gmail.com>wrote: > > > >> Thanks Josh, Patrick for the feedback. > >> > >> Based on Josh's pointers I have something working for JavaPairRDD -> > >> PySpark RDD[(String, String)]. This just calls the toString method on > each > >> key and value as before, but without the need for a delimiter. For > >> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls > >> toString to convert to Text for keys and values. We then call toString > >> (again) ourselves to get Strings to feed to writeAsPickle. > >> > >> Details here: https://gist.github.com/MLnick/7230588 > >> > >> This also illustrates where the "wrapper function" api would fit in. All > >> that is required is to define a T => String for key and value. > >> > >> I started playing around with MsgPack and can sort of get things to work > >> in > >> Scala, but am struggling with getting the raw bytes to be written > properly > >> in PythonRDD (I think it is treating them as pickled byte arrays when > they > >> are not, but when I removed the 'stripPickle' calls and amended the > length > >> (-6) I got "UnpicklingError: invalid load key, ' '. "). > >> > >> Another issue is that MsgPack does well at writing "structures" - like > >> Java > >> classes with public fields that are fairly simple - but for example the > >> Writables have private fields so you end up with nothing being written. > >> This looks like it would require custom "Templates" (serialization > >> functions effectively) for many classes, which means a lot of custom > code > >> for a user to write to use it. Fortunately for most of the common > >> Writables > >> a toString does the job. Will keep looking into it though. > >> > >> Anyway, Josh if you have ideas or examples on the "Wrapper API from > >> Python" > >> that you mentioned, I'd be interested to hear them. > >> > >> If you think this is worth working up as a Pull Request covering > >> SequenceFiles and custom InputFormats with default toString conversions > >> and > >> the ability to specify Wrapper functions, I can clean things up more, > add > >> some functionality and tests, and also test to see if common things like > >> the "normal" Writables and reading from things like HBase and Cassandra > >> can > >> be made to work nicely (any other common use cases that you think make > >> sense?). > >> > >> Thoughts, comments etc welcome. > >> > >> Nick > >> > >> > >> > >> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwend...@gmail.com > >> >wrote: > >> > >> > As a starting point, a version where people just write their own > >> "wrapper" > >> > functions to convert various HadoopFiles into String <K, V> files > could > >> go > >> > a long way. We could even have a few built-in versions, such as > dealing > >> > with Sequence files that are <String, String>. Basically, the user > >> needs to > >> > write a translator in Java/Scala that produces textual records from > >> > whatever format that want. Then, they make sure this is included in > the > >> > classpath when running PySpark. > >> > > >> > As Josh is saying, I'm pretty sure this is already possible, but we > may > >> > want to document it for users. In many organizations they might have > 1-2 > >> > people who can write the Java/Scala to do this but then many more > people > >> > who are comfortable using python once it's setup. > >> > > >> > - Patrick > >> > > >> > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <rosenvi...@gmail.com> > >> wrote: > >> > > >> > > Hi Nick, > >> > > > >> > > I've seen several requests for SequenceFile support in PySpark, so > >> > there's > >> > > definitely demand for this feature. > >> > > > >> > > I like the idea of passing MsgPack'ed data (or some other structured > >> > > format) from Java to the Python workers. My early prototype of > custom > >> > > serializers (described at > >> > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers > >> > > ) > >> > > might be useful for implementing this. Proper custom serializer > >> support > >> > > would handle the bookkeeping for tracking each stage's input and > >> output > >> > > formats and supplying the appropriate deserialization functions to > the > >> > > Python worker, so the Python worker would be able to directly read > the > >> > > MsgPack'd data that's sent to it. > >> > > > >> > > Regarding a wrapper API, it's actually possible to initially > transform > >> > data > >> > > using Scala/Java and perform the remainder of the processing in > >> PySpark. > >> > > This involves adding the appropriate compiled to the Java classpath > >> and > >> > a > >> > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use > >> by > >> > > PySpark. I can hack together a rough example of this if anyone's > >> > > interested, but it would need some work to be developed into a > >> > > user-friendly API. > >> > > > >> > > If you wanted to extend your proof-of-concept to handle the cases > >> where > >> > > keys and values have parseable toString() values, I think you could > >> > remove > >> > > the need for a delimiter by creating a PythonRDD from the > >> newHadoopFile > >> > > JavaPairRDD and adding a new method to writeAsPickle ( > >> > > > >> > > > >> > > >> > https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224 > >> > > ) > >> > > to dump its contents as a pickled pair of strings. (Aside: most of > >> > > writeAsPickle() would probably need be eliminated or refactored when > >> > adding > >> > > general custom serializer support). > >> > > > >> > > - Josh > >> > > > >> > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath > >> > > <nick.pentre...@gmail.com>wrote: > >> > > > >> > > > Hi Spark Devs > >> > > > > >> > > > I was wondering what appetite there may be to add the ability for > >> > PySpark > >> > > > users to create RDDs from (somewhat) arbitrary Hadoop > InputFormats. > >> > > > > >> > > > In my data pipeline for example, I'm currently just using Scala > >> (partly > >> > > > because I love it but also because I am heavily reliant on quite > >> custom > >> > > > Hadoop InputFormats for reading data). However, many users may > >> prefer > >> > to > >> > > > use PySpark as much as possible (if not for everything). Reasons > >> might > >> > > > include the need to use some Python library. While I don't do it > >> yet, I > >> > > can > >> > > > certainly see an attractive use case for using say scikit-learn / > >> numpy > >> > > to > >> > > > do data analysis & machine learning in Python. Added to this my > >> > cofounder > >> > > > knows Python well but not Scala so it can be very beneficial to > do a > >> > lot > >> > > of > >> > > > stuff in Python. > >> > > > > >> > > > For text-based data this is fine, but reading data in from more > >> complex > >> > > > Hadoop formats is an issue. > >> > > > > >> > > > The current approach would of course be to write an ETL-style > >> > Java/Scala > >> > > > job and then process in Python. Nothing wrong with this, but I was > >> > > thinking > >> > > > about ways to allow Python to access arbitrary Hadoop > InputFormats. > >> > > > > >> > > > Here is a quick proof of concept: > >> > https://gist.github.com/MLnick/7150058 > >> > > > > >> > > > This works for simple stuff like SequenceFile with simple Writable > >> > > > key/values. > >> > > > > >> > > > To work with more complex files, perhaps an approach is to > >> manipulate > >> > > > Hadoop JobConf via Python and pass that in. The one downside is of > >> > course > >> > > > that the InputFormat (well actually the Key/Value classes) must > >> have a > >> > > > toString that makes sense so very custom stuff might not work. > >> > > > > >> > > > I wonder if it would be possible to take the objects that are > >> yielded > >> > via > >> > > > the InputFormat and convert them into some representation like > >> > ProtoBuf, > >> > > > MsgPack, Avro, JSON, that can be read relatively more easily from > >> > Python? > >> > > > > >> > > > Another approach could be to allow a simple "wrapper API" such > that > >> one > >> > > can > >> > > > write a wrapper function T => String and pass that into an > >> > > > InputFormatWrapper that takes an arbitrary InputFormat and yields > >> > Strings > >> > > > for the keys and values. Then all that is required is to compile > >> that > >> > > > function and add it to the SPARK_CLASSPATH and away you go! > >> > > > > >> > > > Thoughts? > >> > > > > >> > > > Nick > >> > > > > >> > > > >> > > >> > > > > >