Hi Etienne,

I see your point. I'm a bit worried that every ParDo has to be wrapped in a
`mapPartition` which introduces cost of serde and forgoes the benefits of
Dataset API.
Maybe Dataset is not the best idea to integrate Beam with Spark. Just my
$0.02.

Manu


On Thu, Jan 17, 2019 at 10:44 PM Etienne Chauchot <[email protected]>
wrote:

> Hi Manu,
> Yes a json schema can make its way to the spark source with no difficulty.
> but still we need to store windowedValue not only the elements that would
> comply with this schema. The problem is that spark will try to match the
> element (windowedValue) to the schema of the source at any element wise
> processing. (and downstream it will auto guess the schema with the content
> of dataset. For example if I extract timestamp in a pardo I get a Long
> schema in the output dataset). The problem is that windowedValue is complex
> and has many subclasses. Maybe bytes serialization is still the best way to
> go, but we don't leverage schema PCollections.
> Best
> Etienne
>
> Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit :
>
> Nice Try, Etienne ! Is it possible to pass in the schema through pipeline
> options ?
>
> Manu
>
> On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot <[email protected]>
> wrote:
>
> Hi Kenn,
>
> Sure, in spark DataSourceV2 providing a schema is mandatory:
> - if I set it to null, I obviously get a NPE
> - if I set it empty: I get an array out of bounds exception
> - if I set it to Datatype.Null, null is stored as actual elements
> => Consequently I set it to binary.
>
> As the beam reader is responsible for reading both the element and the
> timestamp, the source outputs a Dataset<WindowedValue>. So, the solution I
> found, for which I asked your opinion, is to serialize windowedValue to
> bytes using beam FullWindowedValueCoder in reader.get() and deserialize the
> whole dataset once the source is done using a map to get the windowedValue
> back and give it to the transforms downstream.
>
> I am aware that this is not optimal because of the bytes serialization
> roundtrip, and I wanted your suggestions around that.
>
> Thanks
> Etienne
>
>
> Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit :
>
> Cool!
>
> I don't quite understand the issue in "bytes serialization to comply to
> spark dataset schemas to store windowedValues". Can you say a little more?
>
> Kenn
>
> On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot <[email protected]>
> wrote:
>
> Hi guys,
> regarding the new (made from scratch) spark runner POC based on the
> dataset API, I was able to make a big step forward: it can now run a first
> batch pipeline with a source !
>
> See
> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
>
> there is no test facilities for now, testmode is enabled and it just
> prints the output PCollection .
>
> I made some workarounds especially String serialization to pass beam
> objects (was forced to) and also bytes serialization to comply to spark
> dataset schemas to store windowedValues.
>
> Can you give me your thoughts especially regarding these last 2 matters?
>
> The other parts are not ready for showing yet
>
> Here is the whole branch:
>
>
> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming
>
> Thanks,
>
> Etienne
>
>

Reply via email to