Agree with Kenn. It should be possible, Spark has a similar concept called
ExpressionEncoder, I was doing similar derivation using Scala macro in
typelevel/frameless <https://github.com/typelevel/frameless>.

Most of the code in Beam is a blackbox function in ParDo, and the only way
to translate it is using `mapPartition`, however, we could override
behavior for known transforms from beam-java-core, for instance, Group,
Select, and use FieldAccessDescriptor to pushdown projections. There is a
bigger opportunity for Beam SQL, that translates into a transforms that fit
more Spark DataFrame model.

Gleb


On Fri, Jan 18, 2019 at 3:25 PM Kenneth Knowles <[email protected]> wrote:

> I wonder if this could tie in with Reuven's recent work. He's basically
> making it so every type with an "obvious" schema automatically converts
> to/from Row whenever needed. Sounds like a similar need, superficially.
>
> Kenn
>
> On Fri, Jan 18, 2019, 02:36 Manu Zhang <[email protected] wrote:
>
>> Hi Etienne,
>>
>> I see your point. I'm a bit worried that every ParDo has to be wrapped in
>> a `mapPartition` which introduces cost of serde and forgoes the benefits of
>> Dataset API.
>> Maybe Dataset is not the best idea to integrate Beam with Spark. Just my
>> $0.02.
>>
>> Manu
>>
>>
>> On Thu, Jan 17, 2019 at 10:44 PM Etienne Chauchot <[email protected]>
>> wrote:
>>
>>> Hi Manu,
>>> Yes a json schema can make its way to the spark source with no
>>> difficulty. but still we need to store windowedValue not only the elements
>>> that would comply with this schema. The problem is that spark will try to
>>> match the element (windowedValue) to the schema of the source at any
>>> element wise processing. (and downstream it will auto guess the schema with
>>> the content of dataset. For example if I extract timestamp in a pardo I get
>>> a Long schema in the output dataset). The problem is that windowedValue is
>>> complex and has many subclasses. Maybe bytes serialization is still the
>>> best way to go, but we don't leverage schema PCollections.
>>> Best
>>> Etienne
>>>
>>> Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit :
>>>
>>> Nice Try, Etienne ! Is it possible to pass in the schema through
>>> pipeline options ?
>>>
>>> Manu
>>>
>>> On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot <[email protected]>
>>> wrote:
>>>
>>> Hi Kenn,
>>>
>>> Sure, in spark DataSourceV2 providing a schema is mandatory:
>>> - if I set it to null, I obviously get a NPE
>>> - if I set it empty: I get an array out of bounds exception
>>> - if I set it to Datatype.Null, null is stored as actual elements
>>> => Consequently I set it to binary.
>>>
>>> As the beam reader is responsible for reading both the element and the
>>> timestamp, the source outputs a Dataset<WindowedValue>. So, the solution I
>>> found, for which I asked your opinion, is to serialize windowedValue to
>>> bytes using beam FullWindowedValueCoder in reader.get() and deserialize the
>>> whole dataset once the source is done using a map to get the windowedValue
>>> back and give it to the transforms downstream.
>>>
>>> I am aware that this is not optimal because of the bytes serialization
>>> roundtrip, and I wanted your suggestions around that.
>>>
>>> Thanks
>>> Etienne
>>>
>>>
>>> Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit :
>>>
>>> Cool!
>>>
>>> I don't quite understand the issue in "bytes serialization to comply to
>>> spark dataset schemas to store windowedValues". Can you say a little more?
>>>
>>> Kenn
>>>
>>> On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot <[email protected]>
>>> wrote:
>>>
>>> Hi guys,
>>> regarding the new (made from scratch) spark runner POC based on the
>>> dataset API, I was able to make a big step forward: it can now run a first
>>> batch pipeline with a source !
>>>
>>> See
>>> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
>>>
>>> there is no test facilities for now, testmode is enabled and it just
>>> prints the output PCollection .
>>>
>>> I made some workarounds especially String serialization to pass beam
>>> objects (was forced to) and also bytes serialization to comply to spark
>>> dataset schemas to store windowedValues.
>>>
>>> Can you give me your thoughts especially regarding these last 2 matters?
>>>
>>> The other parts are not ready for showing yet
>>>
>>> Here is the whole branch:
>>>
>>>
>>> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming
>>>
>>> Thanks,
>>>
>>> Etienne
>>>
>>>

-- 
Cheers,
Gleb

Reply via email to