Amit,

Thanks for this pointer as well, CoderHelpers helps indeed!

Thomas

On Thu, Jun 2, 2016 at 12:51 PM, Amit Sela <[email protected]> wrote:

> Oh sorry, of course I meant Thomas Groh in my previous email.. But @Thomas
> Weise this example
> <
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L108
> >
> might
> help, this is how the Spark runner uses Coders like Thomas Groh described.
>
> And i agree that we should consider making PipelineOptions Serializable or
> provide a generic solution for Runners.
>
> Hope this helps,
> Amit
>
> On Thu, Jun 2, 2016 at 10:35 PM Amit Sela <[email protected]> wrote:
>
> > Thomas is right, though in my case, I encountered this issue when using
> > Spark's new API that uses Encoders
> > <
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala>
> not
> > just for serialization but also for "translating" the object into a
> schema
> > of optimized execution with Tungsten
> > <
> https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
> >.
> >
> > I this case I'm using Kryo and I've solved this by registering (in Spark
> > not Beam) custom serializers from
> > https://github.com/magro/kryo-serializers
> > I would consider (in the future) to implement Encoders with the help of
> > Coders but I still didn't wrap my mind around this.
> >
> > On Thu, Jun 2, 2016 at 9:59 PM Thomas Groh <[email protected]>
> > wrote:
> >
> >> The Beam Model ensures that all PCollections have a Coder; the
> PCollection
> >> Coder is the standard way to materialize the elements of a
> >> PCollection[1][2]. Most SDK-provided classes that will need to be
> >> transferred across the wire have an associated coder, and some
> additional
> >> default datatypes have coders associated with (in the CoderRegistry[3]).
> >>
> >> FullWindowedValueCoder[4] is capable of encoding and decoding the
> entirety
> >> of a WindowedValue, and is constructed from a ValueCoder (obtained from
> >> the
> >> PCollection) and a WindowCoder (obtained from the WindowFn of the
> >> WindowingStrategy of the PCollection). Given an input PCollection `pc`,
> >> you
> >> can construct the FullWindowedValueCoder with the following code snippet
> >>
> >> ```
> >> FullWindowedValueCoder.of(pc.getCoder(),
> >> pc.getWindowingStrategy().getWindowFn().windowCoder())
> >> ```
> >>
> >> [1]
> >>
> >>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
> >> [2]
> >>
> >>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130
> >> [3]
> >>
> >>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94
> >> [4]
> >>
> >>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515
> >>
> >> On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <[email protected]>
> >> wrote:
> >>
> >> > Hi Amit,
> >> >
> >> > Thanks for the help. I implemented the same serialization workaround
> for
> >> > the PipelineOptions. Since every distributed runner will have to solve
> >> > this, would it make sense to provide the serialization support along
> >> with
> >> > the interface proxy?
> >> >
> >> > Here is the exception I get with with WindowedValue:
> >> >
> >> > com.esotericsoftware.kryo.KryoException: Class cannot be created
> >> (missing
> >> > no-arg constructor):
> >> > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
> >> > at
> >> >
> >> >
> >>
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> >> > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> >> > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> >> > at
> >> >
> >> >
> >>
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> >> > at
> >> >
> >> >
> >>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> >> > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> >> >
> >> > Thanks,
> >> > Thomas
> >> >
> >> >
> >> > On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <[email protected]>
> >> wrote:
> >> >
> >> > > Hi Thomas,
> >> > >
> >> > > Spark and the Spark runner are using kryo for serialization and it
> >> seems
> >> > to
> >> > > work just fine. What is your exact problem ? stack trace/message ?
> >> > > I've hit an issue with Guava's ImmutableList/Map etc. and used
> >> > > https://github.com/magro/kryo-serializers for that.
> >> > >
> >> > > For PipelineOptions you can take a look at the Spark runner code
> here:
> >> > >
> >> > >
> >> >
> >>
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73
> >> > >
> >> > > I'd be happy to assist with Kryo.
> >> > >
> >> > > Thanks,
> >> > > Amit
> >> > >
> >> > > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <[email protected]> wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I'm working on putting together a basic runner for Apache Apex.
> >> > > >
> >> > > > Hitting a couple of serialization related issues with running
> tests.
> >> > Apex
> >> > > > is using Kryo for serialization by default (and Kryo can delegate
> to
> >> > > other
> >> > > > serialization frameworks).
> >> > > >
> >> > > > The inner classes of WindowedValue are private and have no default
> >> > > > constructor, which the Kryo field serializer does not like. Also
> >> these
> >> > > > classes are not Java serializable, so that's not a fallback option
> >> (not
> >> > > > that it would be efficient anyways).
> >> > > >
> >> > > > What's the recommended technique to move the WindowedValues over
> the
> >> > > wire?
> >> > > >
> >> > > > Also, PipelineOptions aren't serializable, while most other
> classes
> >> > are.
> >> > > > They are needed for example with DoFnRunnerBase, so what's the
> >> > > recommended
> >> > > > way to distribute them? Disassemble/reassemble? :)
> >> > > >
> >> > > > Thanks,
> >> > > > Thomas
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to