@Gleb, I'll also take a look at ExpressionEncoder thanks for the pointer to
typelevel/frameless.
Etienne
Le mercredi 23 janvier 2019 à 17:06 +0100, Etienne Chauchot a écrit :
> Hi all ,Thanks for your feedback! I was indeed thinking about Reuven's work
> around Schema PCollections, hence my email
> to the community. I don't see how it fits considering that, as I'm wrapping a
> source, I need to store both the
> timestamp and the value hence the use of WindowedValue<T> (as the other
> runners do). Yet, WindowedValue<T> might be an
> overkill because I obviously have no windowing as I'm at the input of a
> pipeline. Hence the idea to create a beam
> schema that has the fields of T + a long for the timestamp and make a
> converter between beam schema and spark schema.
> But I'm not sure it will be more performant than simply serializing
> WindowedValue object to bytes. I guess Schema
> PCollection is a tool more usefull for the sdk part than for the runner part
> of Beam. Am I missing something @Reuven ?
> for the same source wrapping problem, both current spark and flink also store
> WindowedValue<T> but do not enforce any
> schema in their Dataset equivalent structures. So they don't have this problem
> @Manu, regarding your concerns about serialization/deserialization roundtrip,
> artificial roundtrips (not triggered by
> the spark FMWK) only happen once at the source execution time. downstream we
> have Dataset<WindowedValue<T>>. But,
> indeed, if we apply a pardo it gets wrapped into a mapPartition for which
> spark will require a encoder (similar to
> beam coder) for serialization. And indeed we provide a bytes encoder. But
> once again if we had a schema, spark would
> still serde and I'm not sure a bean/schema Encoder would be more performant
> than a binary one.
> Side note: @Gleb, yes using a schema would allow to use pushdown predicates
> that are included in spark DataSourceV2
> API. But such predicates would depend on the backend IO technology that we
> dont' know in advance (e.g. filter by a
> column with is not a primary/clustering column in Cassandra could not pushed
> down). We would have to translate
> differently depending on the IO in place of translating only BoundedSource
> and UnboundedSource.
> BestEtienne
> Le vendredi 18 janvier 2019 à 18:33 +0100, Gleb Kanterov a écrit :
> > Agree with Kenn. It should be possible, Spark has a similar concept called
> > ExpressionEncoder, I was doing similar
> > derivation using Scala macro in typelevel/frameless.
> >
> > Most of the code in Beam is a blackbox function in ParDo, and the only way
> > to translate it is using `mapPartition`,
> > however, we could override behavior for known transforms from
> > beam-java-core, for instance, Group, Select, and
> > use FieldAccessDescriptor to pushdown projections. There is a bigger
> > opportunity for Beam SQL, that translates into
> > a transforms that fit more Spark DataFrame model.
> >
> > Gleb
> >
> >
> >
> > On Fri, Jan 18, 2019 at 3:25 PM Kenneth Knowles <[email protected]> wrote:
> > > I wonder if this could tie in with Reuven's recent work. He's basically
> > > making it so every type with an "obvious"
> > > schema automatically converts to/from Row whenever needed. Sounds like a
> > > similar need, superficially.
> > > Kenn
> > > On Fri, Jan 18, 2019, 02:36 Manu Zhang <[email protected] wrote:
> > > > Hi Etienne,
> > > > I see your point. I'm a bit worried that every ParDo has to be wrapped
> > > > in a `mapPartition` which introduces cost
> > > > of serde and forgoes the benefits of Dataset API.
> > > > Maybe Dataset is not the best idea to integrate Beam with Spark. Just
> > > > my $0.02.
> > > >
> > > > Manu
> > > >
> > > > On Thu, Jan 17, 2019 at 10:44 PM Etienne Chauchot
> > > > <[email protected]> wrote:
> > > > > Hi Manu,Yes a json schema can make its way to the spark source with
> > > > > no difficulty. but still we need to store
> > > > > windowedValue not only the elements that would comply with this
> > > > > schema. The problem is that spark will try to
> > > > > match the element (windowedValue) to the schema of the source at any
> > > > > element wise processing. (and downstream
> > > > > it will auto guess the schema with the content of dataset. For
> > > > > example if I extract timestamp in a pardo I get
> > > > > a Long schema in the output dataset). The problem is that
> > > > > windowedValue is complex and has many subclasses.
> > > > > Maybe bytes serialization is still the best way to go, but we don't
> > > > > leverage schema PCollections. BestEtienne
> > > > > Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit :
> > > > > > Nice Try, Etienne ! Is it possible to pass in the schema through
> > > > > > pipeline options ?
> > > > > > Manu
> > > > > > On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot
> > > > > > <[email protected]> wrote:
> > > > > > > Hi Kenn,
> > > > > > > Sure, in spark DataSourceV2 providing a schema is mandatory:- if
> > > > > > > I set it to null, I obviously get a NPE-
> > > > > > > if I set it empty: I get an array out of bounds exception- if I
> > > > > > > set it to Datatype.Null, null is stored as
> > > > > > > actual elements => Consequently I set it to binary.
> > > > > > > As the beam reader is responsible for reading both the element
> > > > > > > and the timestamp, the source outputs a
> > > > > > > Dataset<WindowedValue>. So, the solution I found, for which I
> > > > > > > asked your opinion, is to serialize
> > > > > > > windowedValue to bytes using beam FullWindowedValueCoder in
> > > > > > > reader.get() and deserialize the whole dataset
> > > > > > > once the source is done using a map to get the windowedValue back
> > > > > > > and give it to the transforms
> > > > > > > downstream.
> > > > > > > I am aware that this is not optimal because of the bytes
> > > > > > > serialization roundtrip, and I wanted your
> > > > > > > suggestions around that.
> > > > > > > ThanksEtienne
> > > > > > >
> > > > > > > Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a
> > > > > > > écrit :
> > > > > > > > Cool!
> > > > > > > > I don't quite understand the issue in "bytes serialization to
> > > > > > > > comply to spark dataset schemas to store
> > > > > > > > windowedValues". Can you say a little more?
> > > > > > > >
> > > > > > > > Kenn
> > > > > > > > On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot
> > > > > > > > <[email protected]> wrote:
> > > > > > > > > Hi guys,
> > > > > > > > > regarding the new (made from scratch) spark runner POC based
> > > > > > > > > on the dataset API, I was able to make a
> > > > > > > > > big step forward: it can now run a first batch pipeline with
> > > > > > > > > a source !
> > > > > > > > >
> > > > > > > > > See
> > > > > > > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
> > > > > > > > >
> > > > > > > > > there is no test facilities for now, testmode is enabled and
> > > > > > > > > it just prints the output PCollection .
> > > > > > > > >
> > > > > > > > > I made some workarounds especially String serialization to
> > > > > > > > > pass beam objects (was forced to) and also
> > > > > > > > > bytes serialization to comply to spark dataset schemas to
> > > > > > > > > store windowedValues.
> > > > > > > > >
> > > > > > > > > Can you give me your thoughts especially regarding these last
> > > > > > > > > 2 matters?
> > > > > > > > >
> > > > > > > > > The other parts are not ready for showing yet
> > > > > > > > >
> > > > > > > > > Here is the whole branch:
> > > > > > > > >
> > > > > > > > >
https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Etienne
> > > > > > > > >
> > > > > > > > >
> >
> >