On Thu, Mar 28, 2019 at 12:24 PM Brian Hulette <[email protected]> wrote:

> > I think splitting to new transforms rather that adding new options to
> existing IO transforms would be simpler for users.  I think this would be a
> question that could be easier to answer with a PR.
> Ok I'll start working on one :)
>
> > How did you measure this? It would be good for us to also have relevant
> micro benchmarks here.
> I ran some one-off benchmarks comparing
> `pickle.loads(pickle.dumps(batch))` and `decode(encode(batch))`, where
> `decode` and `encode` use the arrow IPC formats. I can formalize this and
> publish the results somewhere. Would it be sufficient to just make a
> personal GitHub repo and push the code/results there?
>

We have a few micro benchmarks here (
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/tools).
Feel free to add them there.


>
>
> > - Presumably there is a pandas counterpart in Java. Is there? Do you
> know?
> I think there are some dataframe libraries in Java we could look into. I'm
> not aware of anything that has the same popularity and arrow integration as
> pandas though. Within the arrow project there is Gandiva [1], which has
> Java bindings. It generates optimized LLVM code for processing arrow data
> based on an expression tree. I think that could be a valuable tool for SQL.
>
>
> > - Is it valuable for Beam to invent its own schemas? I'd love for Beam
> to have identical schema affordances to either protobuf or arrow or avro,
> with everything layered on that as logical types (including SQL types).
> What would it look like if Beam schemas were more-or-less Arrow schemas?
> As it stands right now there is a very clear mapping from Beam schemas to
> Arrow schemas. Both define similar primitive types, as well as nested types
> like row (beam) -> struct (arrow), array (beam) -> list (arrow). In
> addition Arrow schemas have a binary representation and implementations in
> many languages.
>
> I had some offline discussion with Reuven about this - and he pointed out
> that eventually we'd like Beam schemas to have a type for large iterables
> as well, so that even a PCollection<KV<K,Iterable<V>>> can have a schema,
> and that's certainly a concept that wouldn't make sense for Arrow. So I
> think the answer is yes it is valuable for Beam to have its own schemas -
> that way we can represent Beam-only concepts, but still be able to map to
> other schemas when it makes sense (For example in the KV<K, Iterable<V>>
> case we could map V's beam schema to an arrow schema and encode it as arrow
> record batches).
>
> Brian
>
> [1] http://arrow.apache.org/blog/2018/12/05/gandiva-donation/
>
> On Wed, Mar 27, 2019 at 9:19 PM Kenneth Knowles <[email protected]> wrote:
>
>> Thinking about Arrow + Beam SQL + schemas:
>>
>>  - Obviously many SQL operations could be usefully accelerated by arrow /
>> columnar. Especially in the analytical realm this is the new normal. For
>> ETL, perhaps less so.
>>
>>  - Beam SQL planner (pipeline construction) is implemented in Java, and
>> so the various DoFns/CombineFns that implement projection, filter, etc, are
>> also in Java.
>>     - Arrow is of course available in Java.
>>     - Presumably there is a pandas counterpart in Java. Is there? Do you
>> know?
>>     - But perhaps if these building blocks emitted by the planner had
>> well-defined URNs we could use SIMD+columnar Java or Python implementation
>> opportunistically to avoid cross-language data channels. (thinking ahead to
>> when cross-language allows Python pipelines to invoke the construction-time
>> planner implemented in Java)
>>
>>  - Is it valuable for Beam to invent its own schemas? I'd love for Beam
>> to have identical schema affordances to either protobuf or arrow or avro,
>> with everything layered on that as logical types (including SQL types).
>> What would it look like if Beam schemas were more-or-less Arrow schemas?
>>
>>  - For the event timestamp issue, there are two levels of abstraction I
>> could imagine improvements:
>>     - at the model layer (aka portability protos) we could make Beam
>> columnar batch aware. That's a huge change and would need a massive
>> justification IMO in the form of performance numbers.
>>     - at the SDK layer, some language might make it pretty easy to
>> overload the "GroupByKey" transform to understand that for elements that
>> are really batches there are multiple timestamps contained within so it may
>> need to window & group differently. The model doesn't need to know in this
>> case.
>>
>> Kenn
>>
>> On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay <[email protected]> wrote:
>>
>>> Thank you Brian, this looks promising.
>>>
>>> cc: +Chamikara Jayalath <[email protected]> +Heejong Lee
>>> <[email protected]>
>>>
>>> On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette <[email protected]>
>>> wrote:
>>>
>>>> Hi everyone,
>>>> I've been doing some investigations into how Arrow might fit into Beam
>>>> as a way to ramp up on the project. As I've gone about this I've prototyped
>>>> a couple of additions to the Python SDK. I think these additions may be
>>>> useful for others so I'm considering cleaning them up and submitting PRs,
>>>> but I wanted to have a discussion here to see if it makes sense first.
>>>>
>>>> Note that the approach I'm using for this work right now is very naive.
>>>> I've just built pipelines where individual elements are actually arrow
>>>> record batches (or pandas DataFrames). This is really only acceptable for
>>>> bounded pipelines without windowing, since it's impossible to define a
>>>> single event time for each element. That being said, I think these tools
>>>> could still be useful for people who want to run batch pipelines using
>>>> parquet, arrow, and pandas.
>>>>
>>>
>>> I agree these will be generally useful.
>>>
>>>
>>>>
>>>> Here's what I've implemented so far:
>>>> # An option for io.ReadFromParquet to yield arrow record batches
>>>> instead of individual elements
>>>> Currently the python SDK's parquet reader uses pyarrow.parquet to read
>>>> parquet row groups into arrow record batches, and then splits the batches
>>>> into a single dictionary per row [1]. I've added a flag to optionally
>>>> short-circuit this and just yield the arrow record batches directly, making
>>>> it easier for me to build pipelines that process columnar batches. If I
>>>> were to contribute this change I could also split out the record batch <->
>>>> dictionary conversions as separate transforms, since they could be
>>>> generally useful as well.
>>>>
>>>
>>> I think splitting to new transforms rather that adding new options to
>>> existing IO transforms would be simpler for users.  I think this would be a
>>> question that could be easier to answer with a PR.
>>>
>>>
>>>> # Custom coders for Arrow Tables and Record Batches
>>>> I found that the default coder (pickle/dill?) slowed down my arrow
>>>> pipelines, particularly in the case where a record batch had been sliced
>>>> into smaller record batches (presumably because the entire original batch
>>>> is getting serialized for each slice). I put together some coders that
>>>> encode arrow tables and record batches with arrow's IPC formats, which
>>>> improves performance substantially.
>>>>
>>>
>>> How did you measure this? It would be good for us to also have relevant
>>> micro benchmarks here.
>>>
>>>
>>>>
>>>> Would it make sense to add these things to the Python SDK? Or would
>>>> they fit better in a separate library of utilities for building pipelines
>>>> with batched data?
>>>>
>>>
>>> +1 for adding to Beam. Python SDK has a list of utility transforms
>>> (e.g. BatchElements), new additions could also live in that space.
>>>
>>>
>>>>
>>>> Brian
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239
>>>>
>>>

Reply via email to