Thinking about Arrow + Beam SQL + schemas: - Obviously many SQL operations could be usefully accelerated by arrow / columnar. Especially in the analytical realm this is the new normal. For ETL, perhaps less so.
- Beam SQL planner (pipeline construction) is implemented in Java, and so the various DoFns/CombineFns that implement projection, filter, etc, are also in Java. - Arrow is of course available in Java. - Presumably there is a pandas counterpart in Java. Is there? Do you know? - But perhaps if these building blocks emitted by the planner had well-defined URNs we could use SIMD+columnar Java or Python implementation opportunistically to avoid cross-language data channels. (thinking ahead to when cross-language allows Python pipelines to invoke the construction-time planner implemented in Java) - Is it valuable for Beam to invent its own schemas? I'd love for Beam to have identical schema affordances to either protobuf or arrow or avro, with everything layered on that as logical types (including SQL types). What would it look like if Beam schemas were more-or-less Arrow schemas? - For the event timestamp issue, there are two levels of abstraction I could imagine improvements: - at the model layer (aka portability protos) we could make Beam columnar batch aware. That's a huge change and would need a massive justification IMO in the form of performance numbers. - at the SDK layer, some language might make it pretty easy to overload the "GroupByKey" transform to understand that for elements that are really batches there are multiple timestamps contained within so it may need to window & group differently. The model doesn't need to know in this case. Kenn On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay <al...@google.com> wrote: > Thank you Brian, this looks promising. > > cc: +Chamikara Jayalath <chamik...@google.com> +Heejong Lee > <heej...@google.com> > > On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette <bhule...@google.com> wrote: > >> Hi everyone, >> I've been doing some investigations into how Arrow might fit into Beam as >> a way to ramp up on the project. As I've gone about this I've prototyped a >> couple of additions to the Python SDK. I think these additions may be >> useful for others so I'm considering cleaning them up and submitting PRs, >> but I wanted to have a discussion here to see if it makes sense first. >> >> Note that the approach I'm using for this work right now is very naive. >> I've just built pipelines where individual elements are actually arrow >> record batches (or pandas DataFrames). This is really only acceptable for >> bounded pipelines without windowing, since it's impossible to define a >> single event time for each element. That being said, I think these tools >> could still be useful for people who want to run batch pipelines using >> parquet, arrow, and pandas. >> > > I agree these will be generally useful. > > >> >> Here's what I've implemented so far: >> # An option for io.ReadFromParquet to yield arrow record batches instead >> of individual elements >> Currently the python SDK's parquet reader uses pyarrow.parquet to read >> parquet row groups into arrow record batches, and then splits the batches >> into a single dictionary per row [1]. I've added a flag to optionally >> short-circuit this and just yield the arrow record batches directly, making >> it easier for me to build pipelines that process columnar batches. If I >> were to contribute this change I could also split out the record batch <-> >> dictionary conversions as separate transforms, since they could be >> generally useful as well. >> > > I think splitting to new transforms rather that adding new options to > existing IO transforms would be simpler for users. I think this would be a > question that could be easier to answer with a PR. > > >> # Custom coders for Arrow Tables and Record Batches >> I found that the default coder (pickle/dill?) slowed down my arrow >> pipelines, particularly in the case where a record batch had been sliced >> into smaller record batches (presumably because the entire original batch >> is getting serialized for each slice). I put together some coders that >> encode arrow tables and record batches with arrow's IPC formats, which >> improves performance substantially. >> > > How did you measure this? It would be good for us to also have relevant > micro benchmarks here. > > >> >> Would it make sense to add these things to the Python SDK? Or would they >> fit better in a separate library of utilities for building pipelines with >> batched data? >> > > +1 for adding to Beam. Python SDK has a list of utility transforms > (e.g. BatchElements), new additions could also live in that space. > > >> >> Brian >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239 >> >