Wow Josh, that looks great. I've been a bit swamped this week but as soon
as I get a chance I'll test out the PR in more detail and port over the
InputFormat stuff to use the new framework (including the changes you
suggested).

I can then look deeper into the MsgPack functionality to see if it can be
made to work in a generic enough manner without requiring huge amounts of
custom Templates to be written by users.

Will feed back asap.
N


On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <rosenvi...@gmail.com> wrote:

> I opened a pull request to add custom serializer support to PySpark:
> https://github.com/apache/incubator-spark/pull/146
>
> My pull request adds the plumbing for transferring data from Java to Python
> using formats other than Pickle.  For example, look at how textFile() uses
> MUTF8Deserializer to read strings from Java.  Hopefully this provides all
> of the functionality needed to support MsgPack.
>
> - Josh
>
>
> On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <rosenvi...@gmail.com> wrote:
>
> > Hi Nick,
> >
> > This is a nice start.  I'd prefer to keep the Java sequenceFileAsText()
> > and newHadoopFileAsText() methods inside PythonRDD instead of adding them
> > to JavaSparkContext, since I think these methods are unlikely to be used
> > directly by Java users (you can add these methods to the PythonRDD
> > companion object, which is how readRDDFromPickleFile is implemented:
> >
> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
> > )
> >
> > For MsgPack, the UnpicklingError is because the Python worker expects to
> > receive its input in a pickled format.  In my prototype of custom
> > serializers, I modified the PySpark worker to receive its
> > serialization/deserialization function as input (
> >
> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41
> )
> > and added logic to pass the appropriate serializers based on each stage's
> > input and output formats (
> >
> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
> > ).
> >
> > At some point, I'd like to port my custom serializers code to PySpark; if
> > anyone's interested in helping, I'd be glad to write up some additional
> > notes on how this should work.
> >
> > - Josh
> >
> > On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <
> nick.pentre...@gmail.com>wrote:
> >
> >> Thanks Josh, Patrick for the feedback.
> >>
> >> Based on Josh's pointers I have something working for JavaPairRDD ->
> >> PySpark RDD[(String, String)]. This just calls the toString method on
> each
> >> key and value as before, but without the need for a delimiter. For
> >> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
> >> toString to convert to Text for keys and values. We then call toString
> >> (again) ourselves to get Strings to feed to writeAsPickle.
> >>
> >> Details here: https://gist.github.com/MLnick/7230588
> >>
> >> This also illustrates where the "wrapper function" api would fit in. All
> >> that is required is to define a T => String for key and value.
> >>
> >> I started playing around with MsgPack and can sort of get things to work
> >> in
> >> Scala, but am struggling with getting the raw bytes to be written
> properly
> >> in PythonRDD (I think it is treating them as pickled byte arrays when
> they
> >> are not, but when I removed the 'stripPickle' calls and amended the
> length
> >> (-6) I got "UnpicklingError: invalid load key, ' '. ").
> >>
> >> Another issue is that MsgPack does well at writing "structures" - like
> >> Java
> >> classes with public fields that are fairly simple - but for example the
> >> Writables have private fields so you end up with nothing being written.
> >> This looks like it would require custom "Templates" (serialization
> >> functions effectively) for many classes, which means a lot of custom
> code
> >> for a user to write to use it. Fortunately for most of the common
> >> Writables
> >> a toString does the job. Will keep looking into it though.
> >>
> >> Anyway, Josh if you have ideas or examples on the "Wrapper API from
> >> Python"
> >> that you mentioned, I'd be interested to hear them.
> >>
> >> If you think this is worth working up as a Pull Request covering
> >> SequenceFiles and custom InputFormats with default toString conversions
> >> and
> >> the ability to specify Wrapper functions, I can clean things up more,
> add
> >> some functionality and tests, and also test to see if common things like
> >> the "normal" Writables and reading from things like HBase and Cassandra
> >> can
> >> be made to work nicely (any other common use cases that you think make
> >> sense?).
> >>
> >> Thoughts, comments etc welcome.
> >>
> >> Nick
> >>
> >>
> >>
> >> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwend...@gmail.com
> >> >wrote:
> >>
> >> > As a starting point, a version where people just write their own
> >> "wrapper"
> >> > functions to convert various HadoopFiles into String <K, V> files
> could
> >> go
> >> > a long way. We could even have a few built-in versions, such as
> dealing
> >> > with Sequence files that are <String, String>. Basically, the user
> >> needs to
> >> > write a translator in Java/Scala that produces textual records from
> >> > whatever format that want. Then, they make sure this is included in
> the
> >> > classpath when running PySpark.
> >> >
> >> > As Josh is saying, I'm pretty sure this is already possible, but we
> may
> >> > want to document it for users. In many organizations they might have
> 1-2
> >> > people who can write the Java/Scala to do this but then many more
> people
> >> > who are comfortable using python once it's setup.
> >> >
> >> > - Patrick
> >> >
> >> > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <rosenvi...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Nick,
> >> > >
> >> > > I've seen several requests for SequenceFile support in PySpark, so
> >> > there's
> >> > > definitely demand for this feature.
> >> > >
> >> > > I like the idea of passing MsgPack'ed data (or some other structured
> >> > > format) from Java to the Python workers.  My early prototype of
> custom
> >> > > serializers (described at
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
> >> > > )
> >> > > might be useful for implementing this.  Proper custom serializer
> >> support
> >> > > would handle the bookkeeping for tracking each stage's input and
> >> output
> >> > > formats and supplying the appropriate deserialization functions to
> the
> >> > > Python worker, so the Python worker would be able to directly read
> the
> >> > > MsgPack'd data that's sent to it.
> >> > >
> >> > > Regarding a wrapper API, it's actually possible to initially
> transform
> >> > data
> >> > > using Scala/Java and perform the remainder of the processing in
> >> PySpark.
> >> > >  This involves adding the appropriate compiled to the Java classpath
> >> and
> >> > a
> >> > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use
> >> by
> >> > > PySpark.  I can hack together a rough example of this if anyone's
> >> > > interested, but it would need some work to be developed into a
> >> > > user-friendly API.
> >> > >
> >> > > If you wanted to extend your proof-of-concept to handle the cases
> >> where
> >> > > keys and values have parseable toString() values, I think you could
> >> > remove
> >> > > the need for a delimiter by creating a PythonRDD from the
> >> newHadoopFile
> >> > > JavaPairRDD and adding a new method to writeAsPickle (
> >> > >
> >> > >
> >> >
> >>
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
> >> > > )
> >> > > to dump its contents as a pickled pair of strings.  (Aside: most of
> >> > > writeAsPickle() would probably need be eliminated or refactored when
> >> > adding
> >> > > general custom serializer support).
> >> > >
> >> > > - Josh
> >> > >
> >> > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
> >> > > <nick.pentre...@gmail.com>wrote:
> >> > >
> >> > > > Hi Spark Devs
> >> > > >
> >> > > > I was wondering what appetite there may be to add the ability for
> >> > PySpark
> >> > > > users to create RDDs from (somewhat) arbitrary Hadoop
> InputFormats.
> >> > > >
> >> > > > In my data pipeline for example, I'm currently just using Scala
> >> (partly
> >> > > > because I love it but also because I am heavily reliant on quite
> >> custom
> >> > > > Hadoop InputFormats for reading data). However, many users may
> >> prefer
> >> > to
> >> > > > use PySpark as much as possible (if not for everything). Reasons
> >> might
> >> > > > include the need to use some Python library. While I don't do it
> >> yet, I
> >> > > can
> >> > > > certainly see an attractive use case for using say scikit-learn /
> >> numpy
> >> > > to
> >> > > > do data analysis & machine learning in Python. Added to this my
> >> > cofounder
> >> > > > knows Python well but not Scala so it can be very beneficial to
> do a
> >> > lot
> >> > > of
> >> > > > stuff in Python.
> >> > > >
> >> > > > For text-based data this is fine, but reading data in from more
> >> complex
> >> > > > Hadoop formats is an issue.
> >> > > >
> >> > > > The current approach would of course be to write an ETL-style
> >> > Java/Scala
> >> > > > job and then process in Python. Nothing wrong with this, but I was
> >> > > thinking
> >> > > > about ways to allow Python to access arbitrary Hadoop
> InputFormats.
> >> > > >
> >> > > > Here is a quick proof of concept:
> >> > https://gist.github.com/MLnick/7150058
> >> > > >
> >> > > > This works for simple stuff like SequenceFile with simple Writable
> >> > > > key/values.
> >> > > >
> >> > > > To work with more complex files, perhaps an approach is to
> >> manipulate
> >> > > > Hadoop JobConf via Python and pass that in. The one downside is of
> >> > course
> >> > > > that the InputFormat (well actually the Key/Value classes) must
> >> have a
> >> > > > toString that makes sense so very custom stuff might not work.
> >> > > >
> >> > > > I wonder if it would be possible to take the objects that are
> >> yielded
> >> > via
> >> > > > the InputFormat and convert them into some representation like
> >> > ProtoBuf,
> >> > > > MsgPack, Avro, JSON, that can be read relatively more easily from
> >> > Python?
> >> > > >
> >> > > > Another approach could be to allow a simple "wrapper API" such
> that
> >> one
> >> > > can
> >> > > > write a wrapper function T => String and pass that into an
> >> > > > InputFormatWrapper that takes an arbitrary InputFormat and yields
> >> > Strings
> >> > > > for the keys and values. Then all that is required is to compile
> >> that
> >> > > > function and add it to the SPARK_CLASSPATH and away you go!
> >> > > >
> >> > > > Thoughts?
> >> > > >
> >> > > > Nick
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to