On Tue, Jan 8, 2019 at 4:32 PM Reuven Lax <re...@google.com> wrote:
>
> I agree with this, but I think it's a significant rethinking of Beam that I 
> didn't want to couple to schemas. In addition to rethinking the API, it might 
> also require rethinking all of our runners.

We're already marshaling (including batching) data over the FnApi, so
it might not be that big of a change. Also, the choice of encoding
over the data channel is already parametrizable via a coder, so it's
easy to make this an optional feature that runners and SDKs can opt
into. I agree that we don't want to couple it to schemas (though
that's where it becomes even more useful).

> Also while columnar can be a large perf win, I suspect that we currently have 
> lower-hanging fruit to optimize when it comes to performance.

It's probably a bigger win for Python than for Java.

>
> Reuven
>
> On Tue, Jan 8, 2019 at 5:25 AM Robert Bradshaw <rober...@google.com> wrote:
>>
>> On Fri, Jan 4, 2019 at 12:54 AM Reuven Lax <re...@google.com> wrote:
>> >
>> > I looked at Apache Arrow as a potential serialization format for Row 
>> > coders. At the time it didn't seem a perfect fit - Beam's programming 
>> > model is record-at-a-time, and Arrow is optimized for large batches of 
>> > records (while Beam has a concept of "bundles" they are completely non 
>> > deterministic, and records might bundle different on retry). You could use 
>> > Arrow with single-record batches, but I suspect that would end up adding a 
>> > lot of extra overhead. That being said, I think it's still something worth 
>> > investigating further.
>>
>> Though Beam's model is row-oriented, I think it'd make a lot of sense
>> to support column-oriented transfer of data across the data plane
>> (we're already concatenating serialized records lengthwise), with
>> Arrow as a first candidate, and (either as part of the public API or
>> as an implementation detail) columnar processing as well (e.g.
>> projections, maps, filters, and aggregations can often be done more
>> efficiently in a columnar fashion). While this is often a significant
>> win in C++ (and presumably Java), it's essential for doing
>> high-performance computing in Python (e.g. Numpy, SciPy, Pandas,
>> Tensorflow, ... all have batch-oriented APIs and avoid representing
>> records as individual objects, something we'll need to tackle for
>> BeamPython at least).
>>
>> >
>> > Reuven
>> >
>> >
>> >
>> > On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <g...@spotify.com> wrote:
>> >>
>> >> Reuven, it sounds great. I see there is a similar thing to Row coders 
>> >> happening in Apache Arrow, and there is a similarity between Apache Arrow 
>> >> Flight and data exchange service in portability. How do you see these two 
>> >> things relate to each other in the long term?
>> >>
>> >> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>> >>>
>> >>> The biggest advantage is actually readability and usability. A secondary 
>> >>> advantage is that it means that Go will be able to interact seamlessly 
>> >>> with BeamSQL, which would be a big win for Go.
>> >>>
>> >>> A schema is basically a way of saying that a record has a specific set 
>> >>> of (possibly nested, possibly repeated) fields. So for instance let's 
>> >>> say that the user's type is a struct with fields named user, country, 
>> >>> purchaseCost. This allows us to provide transforms that operate on field 
>> >>> names. Some example (using the Java API):
>> >>>
>> >>> PCollection users = events.apply(Select.fields("user"));  // Select out 
>> >>> only the user field.
>> >>>
>> >>> PCollection joinedEvents = 
>> >>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two 
>> >>> PCollections by user.
>> >>>
>> >>> // For each country, calculate the total purchase cost as well as the 
>> >>> top 10 purchases.
>> >>> // A new schema is created containing fields total_cost and 
>> >>> top_purchases, and rows are created with the aggregation results.
>> >>> PCollection purchaseStatistics = events.apply(
>> >>>     Group.byFieldNames("country")
>> >>>                .aggregateField("purchaseCost", Sum.ofLongs(), 
>> >>> "total_cost"))
>> >>>                 .aggregateField("purchaseCost", Top.largestLongs(10), 
>> >>> "top_purchases"))
>> >>>
>> >>>
>> >>> This is far more readable than what we have today, and what unlocks this 
>> >>> is that Beam actually knows the structure of the record instead of 
>> >>> assuming records are uncrackable blobs.
>> >>>
>> >>> Note that a coder is basically a special case of a schema that has a 
>> >>> single field.
>> >>>
>> >>> In BeamJava we have a SchemaRegistry which knows how to turn user types 
>> >>> into schemas. We use reflection to analyze many user types (e.g. simple 
>> >>> POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to 
>> >>> determine the schema, however this is done only when the graph is 
>> >>> initially generated. We do use code generation (in Java we do bytecode 
>> >>> generation) to make this somewhat more efficient. I'm willing to bet 
>> >>> that the code generator you've written for structs could be very easily 
>> >>> modified for schemas instead, so it would not be wasted work if we went 
>> >>> with schemas.
>> >>>
>> >>> One of the things I'm working on now is documenting Beam schemas. They 
>> >>> are already very powerful and useful, but since there is still nothing 
>> >>> in our documentation about them, they are not yet widely used. I expect 
>> >>> to finish draft documentation by the end of January.
>> >>>
>> >>> Reuven
>> >>>
>> >>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <r...@google.com> wrote:
>> >>>>
>> >>>> That's an interesting idea. I must confess I don't rightly know the 
>> >>>> difference between a schema and coder, but here's what I've got with a 
>> >>>> bit of searching through memory and the mailing list. Please let me 
>> >>>> know if I'm off track.
>> >>>>
>> >>>> As near as I can tell, a schema, as far as Beam takes it is a mechanism 
>> >>>> to define what data is extracted from a given row of data. So in 
>> >>>> principle, there's an opportunity to be more efficient with data with 
>> >>>> many columns that aren't being used, and only extract the data that's 
>> >>>> meaningful to the pipeline.
>> >>>> The trick then is how to apply the schema to a given serialization 
>> >>>> format, which is something I'm missing in my mental model (and then how 
>> >>>> to do it efficiently in Go).
>> >>>>
>> >>>> I do know that the Go client package for BigQuery does something like 
>> >>>> that, using field tags. Similarly, the "encoding/json" package in the 
>> >>>> Go Standard Library permits annotating fields and it will read out and 
>> >>>> deserialize the JSON fields and that's it.
>> >>>>
>> >>>> A concern I have is that Go (at present) would require pre-compile time 
>> >>>> code generation for schemas to be efficient, and they would still 
>> >>>> mostly boil down to turning []bytes into real structs. Go reflection 
>> >>>> doesn't keep up.
>> >>>> Go has no mechanism I'm aware of to Just In Time compile more efficient 
>> >>>> processing of values.
>> >>>> It's also not 100% clear how Schema's would play with protocol buffers 
>> >>>> or similar.
>> >>>> BigQuery has a mechanism of generating a JSON schema from a proto file, 
>> >>>> but that's only the specification half, not the using half.
>> >>>>
>> >>>> As it stands, the code generator I've been building these last months 
>> >>>> could (in principle) statically analyze a user's struct, and then 
>> >>>> generate an efficient dedicated coder for it. It just has no where to 
>> >>>> put them such that the Go SDK would use it.
>> >>>>
>> >>>>
>> >>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>> >>>>>
>> >>>>> I'll make a different suggestion. There's been some chatter that 
>> >>>>> schemas are a better tool than coders, and that in Beam 3.0 we should 
>> >>>>> make schemas the basic semantics instead of coders. Schemas provide 
>> >>>>> everything a coder provides, but also allows for far more readable 
>> >>>>> code. We can't make such a change in Beam Java 2.X for compatibility 
>> >>>>> reasons, but maybe in Go we're better off starting with schemas 
>> >>>>> instead of coders?
>> >>>>>
>> >>>>> Reuven
>> >>>>>
>> >>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <rob...@frantil.com> wrote:
>> >>>>>>
>> >>>>>> One area that the Go SDK currently lacks: is the ability for users to 
>> >>>>>> specify their own coders for types.
>> >>>>>>
>> >>>>>> I've written a proposal document, and while I'm confident about the 
>> >>>>>> core, there are certainly some edge cases that require discussion 
>> >>>>>> before getting on with the implementation.
>> >>>>>>
>> >>>>>> At presently, the SDK only permits primitive value types (all numeric 
>> >>>>>> types but complex, strings, and []bytes) which are coded with beam 
>> >>>>>> coders, and structs whose exported fields are of those type, which is 
>> >>>>>> then encoded as JSON. Protocol buffer support is hacked in to avoid 
>> >>>>>> the type anaiyzer, and presents the current work around this issue.
>> >>>>>>
>> >>>>>> The high level proposal is to catch up with Python and Java, and have 
>> >>>>>> a coder registry. In addition, arrays, and maps should be permitted 
>> >>>>>> as well.
>> >>>>>>
>> >>>>>> If you have alternatives, or other suggestions and opinions, I'd love 
>> >>>>>> to hear them! Otherwise my intent is to get a PR ready by the end of 
>> >>>>>> January.
>> >>>>>>
>> >>>>>> Thanks!
>> >>>>>> Robert Burke
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> http://go/where-is-rebo
>> >>
>> >>
>> >>
>> >> --
>> >> Cheers,
>> >> Gleb

Reply via email to