Hi

I managed to find the time to put together a PR on this: 
https://github.com/apache/incubator-spark/pull/263




Josh has had a look over it - if anyone else with an interest could give some 
feedback that would be great.




As mentioned in the PR it's more of an RFC and certainly still needs a bit of 
clean up work, and I need to add the concept of "wrapper functions" to 
deserialize classes that MsgPack can't handle out the box.




N
—
Sent from Mailbox for iPhone

On Fri, Nov 8, 2013 at 12:20 PM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> Wow Josh, that looks great. I've been a bit swamped this week but as soon
> as I get a chance I'll test out the PR in more detail and port over the
> InputFormat stuff to use the new framework (including the changes you
> suggested).
> I can then look deeper into the MsgPack functionality to see if it can be
> made to work in a generic enough manner without requiring huge amounts of
> custom Templates to be written by users.
> Will feed back asap.
> N
> On Thu, Nov 7, 2013 at 5:03 AM, Josh Rosen <rosenvi...@gmail.com> wrote:
>> I opened a pull request to add custom serializer support to PySpark:
>> https://github.com/apache/incubator-spark/pull/146
>>
>> My pull request adds the plumbing for transferring data from Java to Python
>> using formats other than Pickle.  For example, look at how textFile() uses
>> MUTF8Deserializer to read strings from Java.  Hopefully this provides all
>> of the functionality needed to support MsgPack.
>>
>> - Josh
>>
>>
>> On Thu, Oct 31, 2013 at 11:11 AM, Josh Rosen <rosenvi...@gmail.com> wrote:
>>
>> > Hi Nick,
>> >
>> > This is a nice start.  I'd prefer to keep the Java sequenceFileAsText()
>> > and newHadoopFileAsText() methods inside PythonRDD instead of adding them
>> > to JavaSparkContext, since I think these methods are unlikely to be used
>> > directly by Java users (you can add these methods to the PythonRDD
>> > companion object, which is how readRDDFromPickleFile is implemented:
>> >
>> https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
>> > )
>> >
>> > For MsgPack, the UnpicklingError is because the Python worker expects to
>> > receive its input in a pickled format.  In my prototype of custom
>> > serializers, I modified the PySpark worker to receive its
>> > serialization/deserialization function as input (
>> >
>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41
>> )
>> > and added logic to pass the appropriate serializers based on each stage's
>> > input and output formats (
>> >
>> https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
>> > ).
>> >
>> > At some point, I'd like to port my custom serializers code to PySpark; if
>> > anyone's interested in helping, I'd be glad to write up some additional
>> > notes on how this should work.
>> >
>> > - Josh
>> >
>> > On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <
>> nick.pentre...@gmail.com>wrote:
>> >
>> >> Thanks Josh, Patrick for the feedback.
>> >>
>> >> Based on Josh's pointers I have something working for JavaPairRDD ->
>> >> PySpark RDD[(String, String)]. This just calls the toString method on
>> each
>> >> key and value as before, but without the need for a delimiter. For
>> >> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
>> >> toString to convert to Text for keys and values. We then call toString
>> >> (again) ourselves to get Strings to feed to writeAsPickle.
>> >>
>> >> Details here: https://gist.github.com/MLnick/7230588
>> >>
>> >> This also illustrates where the "wrapper function" api would fit in. All
>> >> that is required is to define a T => String for key and value.
>> >>
>> >> I started playing around with MsgPack and can sort of get things to work
>> >> in
>> >> Scala, but am struggling with getting the raw bytes to be written
>> properly
>> >> in PythonRDD (I think it is treating them as pickled byte arrays when
>> they
>> >> are not, but when I removed the 'stripPickle' calls and amended the
>> length
>> >> (-6) I got "UnpicklingError: invalid load key, ' '. ").
>> >>
>> >> Another issue is that MsgPack does well at writing "structures" - like
>> >> Java
>> >> classes with public fields that are fairly simple - but for example the
>> >> Writables have private fields so you end up with nothing being written.
>> >> This looks like it would require custom "Templates" (serialization
>> >> functions effectively) for many classes, which means a lot of custom
>> code
>> >> for a user to write to use it. Fortunately for most of the common
>> >> Writables
>> >> a toString does the job. Will keep looking into it though.
>> >>
>> >> Anyway, Josh if you have ideas or examples on the "Wrapper API from
>> >> Python"
>> >> that you mentioned, I'd be interested to hear them.
>> >>
>> >> If you think this is worth working up as a Pull Request covering
>> >> SequenceFiles and custom InputFormats with default toString conversions
>> >> and
>> >> the ability to specify Wrapper functions, I can clean things up more,
>> add
>> >> some functionality and tests, and also test to see if common things like
>> >> the "normal" Writables and reading from things like HBase and Cassandra
>> >> can
>> >> be made to work nicely (any other common use cases that you think make
>> >> sense?).
>> >>
>> >> Thoughts, comments etc welcome.
>> >>
>> >> Nick
>> >>
>> >>
>> >>
>> >> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwend...@gmail.com
>> >> >wrote:
>> >>
>> >> > As a starting point, a version where people just write their own
>> >> "wrapper"
>> >> > functions to convert various HadoopFiles into String <K, V> files
>> could
>> >> go
>> >> > a long way. We could even have a few built-in versions, such as
>> dealing
>> >> > with Sequence files that are <String, String>. Basically, the user
>> >> needs to
>> >> > write a translator in Java/Scala that produces textual records from
>> >> > whatever format that want. Then, they make sure this is included in
>> the
>> >> > classpath when running PySpark.
>> >> >
>> >> > As Josh is saying, I'm pretty sure this is already possible, but we
>> may
>> >> > want to document it for users. In many organizations they might have
>> 1-2
>> >> > people who can write the Java/Scala to do this but then many more
>> people
>> >> > who are comfortable using python once it's setup.
>> >> >
>> >> > - Patrick
>> >> >
>> >> > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <rosenvi...@gmail.com>
>> >> wrote:
>> >> >
>> >> > > Hi Nick,
>> >> > >
>> >> > > I've seen several requests for SequenceFile support in PySpark, so
>> >> > there's
>> >> > > definitely demand for this feature.
>> >> > >
>> >> > > I like the idea of passing MsgPack'ed data (or some other structured
>> >> > > format) from Java to the Python workers.  My early prototype of
>> custom
>> >> > > serializers (described at
>> >> > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
>> >> > > )
>> >> > > might be useful for implementing this.  Proper custom serializer
>> >> support
>> >> > > would handle the bookkeeping for tracking each stage's input and
>> >> output
>> >> > > formats and supplying the appropriate deserialization functions to
>> the
>> >> > > Python worker, so the Python worker would be able to directly read
>> the
>> >> > > MsgPack'd data that's sent to it.
>> >> > >
>> >> > > Regarding a wrapper API, it's actually possible to initially
>> transform
>> >> > data
>> >> > > using Scala/Java and perform the remainder of the processing in
>> >> PySpark.
>> >> > >  This involves adding the appropriate compiled to the Java classpath
>> >> and
>> >> > a
>> >> > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use
>> >> by
>> >> > > PySpark.  I can hack together a rough example of this if anyone's
>> >> > > interested, but it would need some work to be developed into a
>> >> > > user-friendly API.
>> >> > >
>> >> > > If you wanted to extend your proof-of-concept to handle the cases
>> >> where
>> >> > > keys and values have parseable toString() values, I think you could
>> >> > remove
>> >> > > the need for a delimiter by creating a PythonRDD from the
>> >> newHadoopFile
>> >> > > JavaPairRDD and adding a new method to writeAsPickle (
>> >> > >
>> >> > >
>> >> >
>> >>
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
>> >> > > )
>> >> > > to dump its contents as a pickled pair of strings.  (Aside: most of
>> >> > > writeAsPickle() would probably need be eliminated or refactored when
>> >> > adding
>> >> > > general custom serializer support).
>> >> > >
>> >> > > - Josh
>> >> > >
>> >> > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
>> >> > > <nick.pentre...@gmail.com>wrote:
>> >> > >
>> >> > > > Hi Spark Devs
>> >> > > >
>> >> > > > I was wondering what appetite there may be to add the ability for
>> >> > PySpark
>> >> > > > users to create RDDs from (somewhat) arbitrary Hadoop
>> InputFormats.
>> >> > > >
>> >> > > > In my data pipeline for example, I'm currently just using Scala
>> >> (partly
>> >> > > > because I love it but also because I am heavily reliant on quite
>> >> custom
>> >> > > > Hadoop InputFormats for reading data). However, many users may
>> >> prefer
>> >> > to
>> >> > > > use PySpark as much as possible (if not for everything). Reasons
>> >> might
>> >> > > > include the need to use some Python library. While I don't do it
>> >> yet, I
>> >> > > can
>> >> > > > certainly see an attractive use case for using say scikit-learn /
>> >> numpy
>> >> > > to
>> >> > > > do data analysis & machine learning in Python. Added to this my
>> >> > cofounder
>> >> > > > knows Python well but not Scala so it can be very beneficial to
>> do a
>> >> > lot
>> >> > > of
>> >> > > > stuff in Python.
>> >> > > >
>> >> > > > For text-based data this is fine, but reading data in from more
>> >> complex
>> >> > > > Hadoop formats is an issue.
>> >> > > >
>> >> > > > The current approach would of course be to write an ETL-style
>> >> > Java/Scala
>> >> > > > job and then process in Python. Nothing wrong with this, but I was
>> >> > > thinking
>> >> > > > about ways to allow Python to access arbitrary Hadoop
>> InputFormats.
>> >> > > >
>> >> > > > Here is a quick proof of concept:
>> >> > https://gist.github.com/MLnick/7150058
>> >> > > >
>> >> > > > This works for simple stuff like SequenceFile with simple Writable
>> >> > > > key/values.
>> >> > > >
>> >> > > > To work with more complex files, perhaps an approach is to
>> >> manipulate
>> >> > > > Hadoop JobConf via Python and pass that in. The one downside is of
>> >> > course
>> >> > > > that the InputFormat (well actually the Key/Value classes) must
>> >> have a
>> >> > > > toString that makes sense so very custom stuff might not work.
>> >> > > >
>> >> > > > I wonder if it would be possible to take the objects that are
>> >> yielded
>> >> > via
>> >> > > > the InputFormat and convert them into some representation like
>> >> > ProtoBuf,
>> >> > > > MsgPack, Avro, JSON, that can be read relatively more easily from
>> >> > Python?
>> >> > > >
>> >> > > > Another approach could be to allow a simple "wrapper API" such
>> that
>> >> one
>> >> > > can
>> >> > > > write a wrapper function T => String and pass that into an
>> >> > > > InputFormatWrapper that takes an arbitrary InputFormat and yields
>> >> > Strings
>> >> > > > for the keys and values. Then all that is required is to compile
>> >> that
>> >> > > > function and add it to the SPARK_CLASSPATH and away you go!
>> >> > > >
>> >> > > > Thoughts?
>> >> > > >
>> >> > > > Nick
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >
>> >
>>

Reply via email to