I wonder if this could tie in with Reuven's recent work. He's basically making it so every type with an "obvious" schema automatically converts to/from Row whenever needed. Sounds like a similar need, superficially.
Kenn On Fri, Jan 18, 2019, 02:36 Manu Zhang <owenzhang1...@gmail.com wrote: > Hi Etienne, > > I see your point. I'm a bit worried that every ParDo has to be wrapped in > a `mapPartition` which introduces cost of serde and forgoes the benefits of > Dataset API. > Maybe Dataset is not the best idea to integrate Beam with Spark. Just my > $0.02. > > Manu > > > On Thu, Jan 17, 2019 at 10:44 PM Etienne Chauchot <echauc...@apache.org> > wrote: > >> Hi Manu, >> Yes a json schema can make its way to the spark source with no >> difficulty. but still we need to store windowedValue not only the elements >> that would comply with this schema. The problem is that spark will try to >> match the element (windowedValue) to the schema of the source at any >> element wise processing. (and downstream it will auto guess the schema with >> the content of dataset. For example if I extract timestamp in a pardo I get >> a Long schema in the output dataset). The problem is that windowedValue is >> complex and has many subclasses. Maybe bytes serialization is still the >> best way to go, but we don't leverage schema PCollections. >> Best >> Etienne >> >> Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit : >> >> Nice Try, Etienne ! Is it possible to pass in the schema through pipeline >> options ? >> >> Manu >> >> On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot <echauc...@apache.org> >> wrote: >> >> Hi Kenn, >> >> Sure, in spark DataSourceV2 providing a schema is mandatory: >> - if I set it to null, I obviously get a NPE >> - if I set it empty: I get an array out of bounds exception >> - if I set it to Datatype.Null, null is stored as actual elements >> => Consequently I set it to binary. >> >> As the beam reader is responsible for reading both the element and the >> timestamp, the source outputs a Dataset<WindowedValue>. So, the solution I >> found, for which I asked your opinion, is to serialize windowedValue to >> bytes using beam FullWindowedValueCoder in reader.get() and deserialize the >> whole dataset once the source is done using a map to get the windowedValue >> back and give it to the transforms downstream. >> >> I am aware that this is not optimal because of the bytes serialization >> roundtrip, and I wanted your suggestions around that. >> >> Thanks >> Etienne >> >> >> Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit : >> >> Cool! >> >> I don't quite understand the issue in "bytes serialization to comply to >> spark dataset schemas to store windowedValues". Can you say a little more? >> >> Kenn >> >> On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot <echauc...@apache.org> >> wrote: >> >> Hi guys, >> regarding the new (made from scratch) spark runner POC based on the >> dataset API, I was able to make a big step forward: it can now run a first >> batch pipeline with a source ! >> >> See >> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java >> >> there is no test facilities for now, testmode is enabled and it just >> prints the output PCollection . >> >> I made some workarounds especially String serialization to pass beam >> objects (was forced to) and also bytes serialization to comply to spark >> dataset schemas to store windowedValues. >> >> Can you give me your thoughts especially regarding these last 2 matters? >> >> The other parts are not ready for showing yet >> >> Here is the whole branch: >> >> >> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming >> >> Thanks, >> >> Etienne >> >>