I looked at Apache Arrow as a potential serialization format for Row
coders. At the time it didn't seem a perfect fit - Beam's programming model
is record-at-a-time, and Arrow is optimized for large batches of records
(while Beam has a concept of "bundles" they are completely non
deterministic, and records might bundle different on retry). You could use
Arrow with single-record batches, but I suspect that would end up adding a
lot of extra overhead. That being said, I think it's still something worth
investigating further.

Reuven



On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <g...@spotify.com> wrote:

> Reuven, it sounds great. I see there is a similar thing to Row coders
> happening in Apache Arrow <https://arrow.apache.org>, and there is a
> similarity between Apache Arrow Flight
> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
> and data exchange service in portability. How do you see these two things
> relate to each other in the long term?
>
> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>
>> The biggest advantage is actually readability and usability. A secondary
>> advantage is that it means that Go will be able to interact seamlessly with
>> BeamSQL, which would be a big win for Go.
>>
>> A schema is basically a way of saying that a record has a specific set of
>> (possibly nested, possibly repeated) fields. So for instance let's say that
>> the user's type is a struct with fields named user, country, purchaseCost.
>> This allows us to provide transforms that operate on field names. Some
>> example (using the Java API):
>>
>> PCollection users = events.apply(Select.fields("user"));  // Select out
>> only the user field.
>>
>> PCollection joinedEvents =
>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>> PCollections by user.
>>
>> // For each country, calculate the total purchase cost as well as the top
>> 10 purchases.
>> // A new schema is created containing fields total_cost and
>> top_purchases, and rows are created with the aggregation results.
>> PCollection purchaseStatistics = events.apply(
>>     Group.byFieldNames("country")
>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>> "total_cost"))
>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>> "top_purchases"))
>>
>>
>> This is far more readable than what we have today, and what unlocks this
>> is that Beam actually knows the structure of the record instead of assuming
>> records are uncrackable blobs.
>>
>> Note that a coder is basically a special case of a schema that has a
>> single field.
>>
>> In BeamJava we have a SchemaRegistry which knows how to turn user types
>> into schemas. We use reflection to analyze many user types (e.g. simple
>> POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to
>> determine the schema, however this is done only when the graph is initially
>> generated. We do use code generation (in Java we do bytecode generation) to
>> make this somewhat more efficient. I'm willing to bet that the code
>> generator you've written for structs could be very easily modified for
>> schemas instead, so it would not be wasted work if we went with schemas.
>>
>> One of the things I'm working on now is documenting Beam schemas. They
>> are already very powerful and useful, but since there is still nothing in
>> our documentation about them, they are not yet widely used. I expect to
>> finish draft documentation by the end of January.
>>
>> Reuven
>>
>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <r...@google.com> wrote:
>>
>>> That's an interesting idea. I must confess I don't rightly know the
>>> difference between a schema and coder, but here's what I've got with a bit
>>> of searching through memory and the mailing list. Please let me know if I'm
>>> off track.
>>>
>>> As near as I can tell, a schema, as far as Beam takes it
>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java>
>>>  is
>>> a mechanism to define what data is extracted from a given row of data. So
>>> in principle, there's an opportunity to be more efficient with data with
>>> many columns that aren't being used, and only extract the data that's
>>> meaningful to the pipeline.
>>> The trick then is how to apply the schema to a given serialization
>>> format, which is something I'm missing in my mental model (and then how to
>>> do it efficiently in Go).
>>>
>>> I do know that the Go client package for BigQuery
>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>> something like that, using field tags. Similarly, the "encoding/json"
>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>> Standard Library permits annotating fields and it will read out and
>>> deserialize the JSON fields and that's it.
>>>
>>> A concern I have is that Go (at present) would require pre-compile time
>>> code generation for schemas to be efficient, and they would still mostly
>>> boil down to turning []bytes into real structs. Go reflection doesn't keep
>>> up.
>>> Go has no mechanism I'm aware of to Just In Time compile more efficient
>>> processing of values.
>>> It's also not 100% clear how Schema's would play with protocol buffers
>>> or similar.
>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>> that's only the specification half, not the using half.
>>>
>>> As it stands, the code generator I've been building these last months
>>> could (in principle) statically analyze a user's struct, and then generate
>>> an efficient dedicated coder for it. It just has no where to put them such
>>> that the Go SDK would use it.
>>>
>>>
>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I'll make a different suggestion. There's been some chatter that
>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>> coder provides, but also allows for far more readable code. We can't make
>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>> we're better off starting with schemas instead of coders?
>>>>
>>>> Reuven
>>>>
>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <rob...@frantil.com> wrote:
>>>>
>>>>> One area that the Go SDK currently lacks: is the ability for users to
>>>>> specify their own coders for types.
>>>>>
>>>>> I've written a proposal document,
>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#>
>>>>>  and
>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>> that require discussion before getting on with the implementation.
>>>>>
>>>>> At presently, the SDK only permits primitive value types (all numeric
>>>>> types but complex, strings, and []bytes) which are coded with beam coders,
>>>>> and structs whose exported fields are of those type, which is then encoded
>>>>> as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer,
>>>>> and presents the current work around this issue.
>>>>>
>>>>> The high level proposal is to catch up with Python and Java, and have
>>>>> a coder registry. In addition, arrays, and maps should be permitted as 
>>>>> well.
>>>>>
>>>>> If you have alternatives, or other suggestions and opinions, I'd love
>>>>> to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>> January.
>>>>>
>>>>> Thanks!
>>>>> Robert Burke
>>>>>
>>>>
>>>
>>> --
>>> http://go/where-is-rebo
>>>
>>
>
> --
> Cheers,
> Gleb
>

Reply via email to