I wonder if this could tie in with Reuven's recent work. He's basically
making it so every type with an "obvious" schema automatically converts
to/from Row whenever needed. Sounds like a similar need, superficially.

Kenn

On Fri, Jan 18, 2019, 02:36 Manu Zhang <owenzhang1...@gmail.com wrote:

> Hi Etienne,
>
> I see your point. I'm a bit worried that every ParDo has to be wrapped in
> a `mapPartition` which introduces cost of serde and forgoes the benefits of
> Dataset API.
> Maybe Dataset is not the best idea to integrate Beam with Spark. Just my
> $0.02.
>
> Manu
>
>
> On Thu, Jan 17, 2019 at 10:44 PM Etienne Chauchot <echauc...@apache.org>
> wrote:
>
>> Hi Manu,
>> Yes a json schema can make its way to the spark source with no
>> difficulty. but still we need to store windowedValue not only the elements
>> that would comply with this schema. The problem is that spark will try to
>> match the element (windowedValue) to the schema of the source at any
>> element wise processing. (and downstream it will auto guess the schema with
>> the content of dataset. For example if I extract timestamp in a pardo I get
>> a Long schema in the output dataset). The problem is that windowedValue is
>> complex and has many subclasses. Maybe bytes serialization is still the
>> best way to go, but we don't leverage schema PCollections.
>> Best
>> Etienne
>>
>> Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit :
>>
>> Nice Try, Etienne ! Is it possible to pass in the schema through pipeline
>> options ?
>>
>> Manu
>>
>> On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot <echauc...@apache.org>
>> wrote:
>>
>> Hi Kenn,
>>
>> Sure, in spark DataSourceV2 providing a schema is mandatory:
>> - if I set it to null, I obviously get a NPE
>> - if I set it empty: I get an array out of bounds exception
>> - if I set it to Datatype.Null, null is stored as actual elements
>> => Consequently I set it to binary.
>>
>> As the beam reader is responsible for reading both the element and the
>> timestamp, the source outputs a Dataset<WindowedValue>. So, the solution I
>> found, for which I asked your opinion, is to serialize windowedValue to
>> bytes using beam FullWindowedValueCoder in reader.get() and deserialize the
>> whole dataset once the source is done using a map to get the windowedValue
>> back and give it to the transforms downstream.
>>
>> I am aware that this is not optimal because of the bytes serialization
>> roundtrip, and I wanted your suggestions around that.
>>
>> Thanks
>> Etienne
>>
>>
>> Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit :
>>
>> Cool!
>>
>> I don't quite understand the issue in "bytes serialization to comply to
>> spark dataset schemas to store windowedValues". Can you say a little more?
>>
>> Kenn
>>
>> On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot <echauc...@apache.org>
>> wrote:
>>
>> Hi guys,
>> regarding the new (made from scratch) spark runner POC based on the
>> dataset API, I was able to make a big step forward: it can now run a first
>> batch pipeline with a source !
>>
>> See
>> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
>>
>> there is no test facilities for now, testmode is enabled and it just
>> prints the output PCollection .
>>
>> I made some workarounds especially String serialization to pass beam
>> objects (was forced to) and also bytes serialization to comply to spark
>> dataset schemas to store windowedValues.
>>
>> Can you give me your thoughts especially regarding these last 2 matters?
>>
>> The other parts are not ready for showing yet
>>
>> Here is the whole branch:
>>
>>
>> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming
>>
>> Thanks,
>>
>> Etienne
>>
>>

Reply via email to