Hi Manu,Yes a json schema can make its way to the spark source with no
difficulty. but still we need to store
windowedValue not only the elements that would comply with this schema. The
problem is that spark will try to match the
element (windowedValue) to the schema of the source at any element wise
processing. (and downstream it will auto guess
the schema with the content of dataset. For example if I extract timestamp in a
pardo I get a Long schema in the output
dataset). The problem is that windowedValue is complex and has many subclasses.
Maybe bytes serialization is still the
best way to go, but we don't leverage schema PCollections. BestEtienne
Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit :
> Nice Try, Etienne ! Is it possible to pass in the schema through pipeline
> options ?
> Manu
> On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot <[email protected]> wrote:
> > Hi Kenn,
> > Sure, in spark DataSourceV2 providing a schema is mandatory:- if I set it
> > to null, I obviously get a NPE- if I set
> > it empty: I get an array out of bounds exception- if I set it to
> > Datatype.Null, null is stored as actual elements =>
> > Consequently I set it to binary.
> > As the beam reader is responsible for reading both the element and the
> > timestamp, the source outputs a
> > Dataset<WindowedValue>. So, the solution I found, for which I asked your
> > opinion, is to serialize windowedValue to
> > bytes using beam FullWindowedValueCoder in reader.get() and deserialize the
> > whole dataset once the source is done
> > using a map to get the windowedValue back and give it to the transforms
> > downstream.
> > I am aware that this is not optimal because of the bytes serialization
> > roundtrip, and I wanted your suggestions
> > around that.
> > ThanksEtienne
> >
> > Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit :
> > > Cool!
> > > I don't quite understand the issue in "bytes serialization to comply to
> > > spark dataset schemas to store
> > > windowedValues". Can you say a little more?
> > >
> > > Kenn
> > > On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot <[email protected]>
> > > wrote:
> > > > Hi guys,
> > > > regarding the new (made from scratch) spark runner POC based on the
> > > > dataset API, I was able to make a big step
> > > > forward: it can now run a first batch pipeline with a source !
> > > >
> > > > See
> > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
> > > >
> > > > there is no test facilities for now, testmode is enabled and it just
> > > > prints the output PCollection .
> > > >
> > > > I made some workarounds especially String serialization to pass beam
> > > > objects (was forced to) and also bytes
> > > > serialization to comply to spark dataset schemas to store
> > > > windowedValues.
> > > >
> > > > Can you give me your thoughts especially regarding these last 2 matters?
> > > >
> > > > The other parts are not ready for showing yet
> > > >
> > > > Here is the whole branch:
> > > >
> > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming
> > > >
> > > > Thanks,
> > > >
> > > > Etienne
> > > >
> > > >