Beam Yaml has good support for IOs and mappings, but one key missing
feature for even writing a WordCount is the ability to do Aggregations
[1]. While the traditional Beam primitive is GroupByKey (and
CombineValues), we're eschewing KVs in the notion of more schema'd
data (which has some precedence in our other languages, see the links
below). The key components the user needs to specify are (1) the key
fields on which the grouping will take place, (2) the fields
(expressions?) involved in the aggregation, and (3) what aggregating
fn to use.

A straw-man example could be something like

type: Aggregating
config:
  key: [field1, field2]
  aggregating:
    total_cost:
      fn: sum
      value: cost
    max_cost:
      fn: max
      value: cost

This would basically correspond to the SQL expression

"SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
from table GROUP BY field1, field2"

(though I'm not requiring that we use this as an implementation
strategy). I do not think we need a separate (non aggregating)
Grouping operation, this can be accomplished by having a concat-style
combiner.

There are still some open questions here, notably around how to
specify the aggregation fns themselves. We could of course provide a
number of built-ins (like SQL does). This gets into the question of
how and where to document this complete set, but some basics should
take us pretty far. Many aggregators, however, are parameterized (e.g.
quantiles); where do we put the parameters? We could go with something
like

fn:
  type: ApproximateQuantiles
  config:
    n: 10

but others are even configured by functions themselves (e.g. LargestN
that wants a comparator Fn). Maybe we decide not to support these
(yet?)

One thing I think we should support, however, is referencing custom
CombineFns. We have some precedent for this with our Fns from
MapToFields, where we accept things like inline lambdas and external
references. Again the topic of how to configure them comes up, as
these custom Fns are more likely to be parameterized than Map Fns
(though, to be clear, perhaps it'd be good to allow parameterizatin of
MapFns as well). Maybe we allow

language: python. # like MapToFields (and here it'd be harder to mix
and match per Fn)
fn:
  type: ???
  # should these be nested as config?
  name: fully.qualiied.name
  path: /path/to/defining/file
  args: [...]
  kwargs: {...}

which would invoke the constructor.

I'm also open to other ways of naming/structuring these essential
parameters if it makes things more clear.

- Robert


Java: 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
Python: 
https://beam.apache.org/documentation/transforms/python/aggregation/groupby
Typescript: 
https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html

[1] One can of course use SqlTransform for this, but I'm leaning
towards offering something more native.

Reply via email to