Amit, Thanks for this pointer as well, CoderHelpers helps indeed!
Thomas On Thu, Jun 2, 2016 at 12:51 PM, Amit Sela <[email protected]> wrote: > Oh sorry, of course I meant Thomas Groh in my previous email.. But @Thomas > Weise this example > < > https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L108 > > > might > help, this is how the Spark runner uses Coders like Thomas Groh described. > > And i agree that we should consider making PipelineOptions Serializable or > provide a generic solution for Runners. > > Hope this helps, > Amit > > On Thu, Jun 2, 2016 at 10:35 PM Amit Sela <[email protected]> wrote: > > > Thomas is right, though in my case, I encountered this issue when using > > Spark's new API that uses Encoders > > < > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala> > not > > just for serialization but also for "translating" the object into a > schema > > of optimized execution with Tungsten > > < > https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html > >. > > > > I this case I'm using Kryo and I've solved this by registering (in Spark > > not Beam) custom serializers from > > https://github.com/magro/kryo-serializers > > I would consider (in the future) to implement Encoders with the help of > > Coders but I still didn't wrap my mind around this. > > > > On Thu, Jun 2, 2016 at 9:59 PM Thomas Groh <[email protected]> > > wrote: > > > >> The Beam Model ensures that all PCollections have a Coder; the > PCollection > >> Coder is the standard way to materialize the elements of a > >> PCollection[1][2]. Most SDK-provided classes that will need to be > >> transferred across the wire have an associated coder, and some > additional > >> default datatypes have coders associated with (in the CoderRegistry[3]). > >> > >> FullWindowedValueCoder[4] is capable of encoding and decoding the > entirety > >> of a WindowedValue, and is constructed from a ValueCoder (obtained from > >> the > >> PCollection) and a WindowCoder (obtained from the WindowFn of the > >> WindowingStrategy of the PCollection). Given an input PCollection `pc`, > >> you > >> can construct the FullWindowedValueCoder with the following code snippet > >> > >> ``` > >> FullWindowedValueCoder.of(pc.getCoder(), > >> pc.getWindowingStrategy().getWindowFn().windowCoder()) > >> ``` > >> > >> [1] > >> > >> > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java > >> [2] > >> > >> > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130 > >> [3] > >> > >> > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94 > >> [4] > >> > >> > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515 > >> > >> On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <[email protected]> > >> wrote: > >> > >> > Hi Amit, > >> > > >> > Thanks for the help. I implemented the same serialization workaround > for > >> > the PipelineOptions. Since every distributed runner will have to solve > >> > this, would it make sense to provide the serialization support along > >> with > >> > the interface proxy? > >> > > >> > Here is the exception I get with with WindowedValue: > >> > > >> > com.esotericsoftware.kryo.KryoException: Class cannot be created > >> (missing > >> > no-arg constructor): > >> > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow > >> > at > >> > > >> > > >> > com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) > >> > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) > >> > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) > >> > at > >> > > >> > > >> > com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) > >> > at > >> > > >> > > >> > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) > >> > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > >> > > >> > Thanks, > >> > Thomas > >> > > >> > > >> > On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <[email protected]> > >> wrote: > >> > > >> > > Hi Thomas, > >> > > > >> > > Spark and the Spark runner are using kryo for serialization and it > >> seems > >> > to > >> > > work just fine. What is your exact problem ? stack trace/message ? > >> > > I've hit an issue with Guava's ImmutableList/Map etc. and used > >> > > https://github.com/magro/kryo-serializers for that. > >> > > > >> > > For PipelineOptions you can take a look at the Spark runner code > here: > >> > > > >> > > > >> > > >> > https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73 > >> > > > >> > > I'd be happy to assist with Kryo. > >> > > > >> > > Thanks, > >> > > Amit > >> > > > >> > > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <[email protected]> wrote: > >> > > > >> > > > Hi, > >> > > > > >> > > > I'm working on putting together a basic runner for Apache Apex. > >> > > > > >> > > > Hitting a couple of serialization related issues with running > tests. > >> > Apex > >> > > > is using Kryo for serialization by default (and Kryo can delegate > to > >> > > other > >> > > > serialization frameworks). > >> > > > > >> > > > The inner classes of WindowedValue are private and have no default > >> > > > constructor, which the Kryo field serializer does not like. Also > >> these > >> > > > classes are not Java serializable, so that's not a fallback option > >> (not > >> > > > that it would be efficient anyways). > >> > > > > >> > > > What's the recommended technique to move the WindowedValues over > the > >> > > wire? > >> > > > > >> > > > Also, PipelineOptions aren't serializable, while most other > classes > >> > are. > >> > > > They are needed for example with DoFnRunnerBase, so what's the > >> > > recommended > >> > > > way to distribute them? Disassemble/reassemble? :) > >> > > > > >> > > > Thanks, > >> > > > Thomas > >> > > > > >> > > > >> > > >> > > >
