Agree with Kenn. It should be possible, Spark has a similar concept called ExpressionEncoder, I was doing similar derivation using Scala macro in typelevel/frameless <https://github.com/typelevel/frameless>.
Most of the code in Beam is a blackbox function in ParDo, and the only way to translate it is using `mapPartition`, however, we could override behavior for known transforms from beam-java-core, for instance, Group, Select, and use FieldAccessDescriptor to pushdown projections. There is a bigger opportunity for Beam SQL, that translates into a transforms that fit more Spark DataFrame model. Gleb On Fri, Jan 18, 2019 at 3:25 PM Kenneth Knowles <[email protected]> wrote: > I wonder if this could tie in with Reuven's recent work. He's basically > making it so every type with an "obvious" schema automatically converts > to/from Row whenever needed. Sounds like a similar need, superficially. > > Kenn > > On Fri, Jan 18, 2019, 02:36 Manu Zhang <[email protected] wrote: > >> Hi Etienne, >> >> I see your point. I'm a bit worried that every ParDo has to be wrapped in >> a `mapPartition` which introduces cost of serde and forgoes the benefits of >> Dataset API. >> Maybe Dataset is not the best idea to integrate Beam with Spark. Just my >> $0.02. >> >> Manu >> >> >> On Thu, Jan 17, 2019 at 10:44 PM Etienne Chauchot <[email protected]> >> wrote: >> >>> Hi Manu, >>> Yes a json schema can make its way to the spark source with no >>> difficulty. but still we need to store windowedValue not only the elements >>> that would comply with this schema. The problem is that spark will try to >>> match the element (windowedValue) to the schema of the source at any >>> element wise processing. (and downstream it will auto guess the schema with >>> the content of dataset. For example if I extract timestamp in a pardo I get >>> a Long schema in the output dataset). The problem is that windowedValue is >>> complex and has many subclasses. Maybe bytes serialization is still the >>> best way to go, but we don't leverage schema PCollections. >>> Best >>> Etienne >>> >>> Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit : >>> >>> Nice Try, Etienne ! Is it possible to pass in the schema through >>> pipeline options ? >>> >>> Manu >>> >>> On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot <[email protected]> >>> wrote: >>> >>> Hi Kenn, >>> >>> Sure, in spark DataSourceV2 providing a schema is mandatory: >>> - if I set it to null, I obviously get a NPE >>> - if I set it empty: I get an array out of bounds exception >>> - if I set it to Datatype.Null, null is stored as actual elements >>> => Consequently I set it to binary. >>> >>> As the beam reader is responsible for reading both the element and the >>> timestamp, the source outputs a Dataset<WindowedValue>. So, the solution I >>> found, for which I asked your opinion, is to serialize windowedValue to >>> bytes using beam FullWindowedValueCoder in reader.get() and deserialize the >>> whole dataset once the source is done using a map to get the windowedValue >>> back and give it to the transforms downstream. >>> >>> I am aware that this is not optimal because of the bytes serialization >>> roundtrip, and I wanted your suggestions around that. >>> >>> Thanks >>> Etienne >>> >>> >>> Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit : >>> >>> Cool! >>> >>> I don't quite understand the issue in "bytes serialization to comply to >>> spark dataset schemas to store windowedValues". Can you say a little more? >>> >>> Kenn >>> >>> On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot <[email protected]> >>> wrote: >>> >>> Hi guys, >>> regarding the new (made from scratch) spark runner POC based on the >>> dataset API, I was able to make a big step forward: it can now run a first >>> batch pipeline with a source ! >>> >>> See >>> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java >>> >>> there is no test facilities for now, testmode is enabled and it just >>> prints the output PCollection . >>> >>> I made some workarounds especially String serialization to pass beam >>> objects (was forced to) and also bytes serialization to comply to spark >>> dataset schemas to store windowedValues. >>> >>> Can you give me your thoughts especially regarding these last 2 matters? >>> >>> The other parts are not ready for showing yet >>> >>> Here is the whole branch: >>> >>> >>> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming >>> >>> Thanks, >>> >>> Etienne >>> >>> -- Cheers, Gleb
