On Thu, Mar 28, 2019 at 12:24 PM Brian Hulette <[email protected]> wrote:
> > I think splitting to new transforms rather that adding new options to > existing IO transforms would be simpler for users. I think this would be a > question that could be easier to answer with a PR. > Ok I'll start working on one :) > > > How did you measure this? It would be good for us to also have relevant > micro benchmarks here. > I ran some one-off benchmarks comparing > `pickle.loads(pickle.dumps(batch))` and `decode(encode(batch))`, where > `decode` and `encode` use the arrow IPC formats. I can formalize this and > publish the results somewhere. Would it be sufficient to just make a > personal GitHub repo and push the code/results there? > We have a few micro benchmarks here ( https://github.com/apache/beam/tree/master/sdks/python/apache_beam/tools). Feel free to add them there. > > > > - Presumably there is a pandas counterpart in Java. Is there? Do you > know? > I think there are some dataframe libraries in Java we could look into. I'm > not aware of anything that has the same popularity and arrow integration as > pandas though. Within the arrow project there is Gandiva [1], which has > Java bindings. It generates optimized LLVM code for processing arrow data > based on an expression tree. I think that could be a valuable tool for SQL. > > > > - Is it valuable for Beam to invent its own schemas? I'd love for Beam > to have identical schema affordances to either protobuf or arrow or avro, > with everything layered on that as logical types (including SQL types). > What would it look like if Beam schemas were more-or-less Arrow schemas? > As it stands right now there is a very clear mapping from Beam schemas to > Arrow schemas. Both define similar primitive types, as well as nested types > like row (beam) -> struct (arrow), array (beam) -> list (arrow). In > addition Arrow schemas have a binary representation and implementations in > many languages. > > I had some offline discussion with Reuven about this - and he pointed out > that eventually we'd like Beam schemas to have a type for large iterables > as well, so that even a PCollection<KV<K,Iterable<V>>> can have a schema, > and that's certainly a concept that wouldn't make sense for Arrow. So I > think the answer is yes it is valuable for Beam to have its own schemas - > that way we can represent Beam-only concepts, but still be able to map to > other schemas when it makes sense (For example in the KV<K, Iterable<V>> > case we could map V's beam schema to an arrow schema and encode it as arrow > record batches). > > Brian > > [1] http://arrow.apache.org/blog/2018/12/05/gandiva-donation/ > > On Wed, Mar 27, 2019 at 9:19 PM Kenneth Knowles <[email protected]> wrote: > >> Thinking about Arrow + Beam SQL + schemas: >> >> - Obviously many SQL operations could be usefully accelerated by arrow / >> columnar. Especially in the analytical realm this is the new normal. For >> ETL, perhaps less so. >> >> - Beam SQL planner (pipeline construction) is implemented in Java, and >> so the various DoFns/CombineFns that implement projection, filter, etc, are >> also in Java. >> - Arrow is of course available in Java. >> - Presumably there is a pandas counterpart in Java. Is there? Do you >> know? >> - But perhaps if these building blocks emitted by the planner had >> well-defined URNs we could use SIMD+columnar Java or Python implementation >> opportunistically to avoid cross-language data channels. (thinking ahead to >> when cross-language allows Python pipelines to invoke the construction-time >> planner implemented in Java) >> >> - Is it valuable for Beam to invent its own schemas? I'd love for Beam >> to have identical schema affordances to either protobuf or arrow or avro, >> with everything layered on that as logical types (including SQL types). >> What would it look like if Beam schemas were more-or-less Arrow schemas? >> >> - For the event timestamp issue, there are two levels of abstraction I >> could imagine improvements: >> - at the model layer (aka portability protos) we could make Beam >> columnar batch aware. That's a huge change and would need a massive >> justification IMO in the form of performance numbers. >> - at the SDK layer, some language might make it pretty easy to >> overload the "GroupByKey" transform to understand that for elements that >> are really batches there are multiple timestamps contained within so it may >> need to window & group differently. The model doesn't need to know in this >> case. >> >> Kenn >> >> On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay <[email protected]> wrote: >> >>> Thank you Brian, this looks promising. >>> >>> cc: +Chamikara Jayalath <[email protected]> +Heejong Lee >>> <[email protected]> >>> >>> On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette <[email protected]> >>> wrote: >>> >>>> Hi everyone, >>>> I've been doing some investigations into how Arrow might fit into Beam >>>> as a way to ramp up on the project. As I've gone about this I've prototyped >>>> a couple of additions to the Python SDK. I think these additions may be >>>> useful for others so I'm considering cleaning them up and submitting PRs, >>>> but I wanted to have a discussion here to see if it makes sense first. >>>> >>>> Note that the approach I'm using for this work right now is very naive. >>>> I've just built pipelines where individual elements are actually arrow >>>> record batches (or pandas DataFrames). This is really only acceptable for >>>> bounded pipelines without windowing, since it's impossible to define a >>>> single event time for each element. That being said, I think these tools >>>> could still be useful for people who want to run batch pipelines using >>>> parquet, arrow, and pandas. >>>> >>> >>> I agree these will be generally useful. >>> >>> >>>> >>>> Here's what I've implemented so far: >>>> # An option for io.ReadFromParquet to yield arrow record batches >>>> instead of individual elements >>>> Currently the python SDK's parquet reader uses pyarrow.parquet to read >>>> parquet row groups into arrow record batches, and then splits the batches >>>> into a single dictionary per row [1]. I've added a flag to optionally >>>> short-circuit this and just yield the arrow record batches directly, making >>>> it easier for me to build pipelines that process columnar batches. If I >>>> were to contribute this change I could also split out the record batch <-> >>>> dictionary conversions as separate transforms, since they could be >>>> generally useful as well. >>>> >>> >>> I think splitting to new transforms rather that adding new options to >>> existing IO transforms would be simpler for users. I think this would be a >>> question that could be easier to answer with a PR. >>> >>> >>>> # Custom coders for Arrow Tables and Record Batches >>>> I found that the default coder (pickle/dill?) slowed down my arrow >>>> pipelines, particularly in the case where a record batch had been sliced >>>> into smaller record batches (presumably because the entire original batch >>>> is getting serialized for each slice). I put together some coders that >>>> encode arrow tables and record batches with arrow's IPC formats, which >>>> improves performance substantially. >>>> >>> >>> How did you measure this? It would be good for us to also have relevant >>> micro benchmarks here. >>> >>> >>>> >>>> Would it make sense to add these things to the Python SDK? Or would >>>> they fit better in a separate library of utilities for building pipelines >>>> with batched data? >>>> >>> >>> +1 for adding to Beam. Python SDK has a list of utility transforms >>> (e.g. BatchElements), new additions could also live in that space. >>> >>> >>>> >>>> Brian >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239 >>>> >>>
