Great summary overall. A few small things to add, along with a note that
more examples of the intended is for each metric/aggregation may be helpful.

It is worth looking at what existing metric systems provide. Specifically,
two things to consider:

1. Is scoping implict / automatic or explicit. In some systems a metric can
be declared as having one or more labels, and reporting it requires
providing values for those labels. This allows user defined labelling, and
also simplifies scoping to just ensuring the right values are available for
use as labels.

2. There is only one runner that supports logical metrics, and no standard
metrics backend support those. Perhaps metrics in Beam should focus on the
standardly available set? Logical makes the most sense for system metrics
(such as elements in a collection) and don't need to use the standard

This would simultaneously simplify the metrics system and bring it more in
line across runners and metric backends.

On Sat, Apr 7, 2018, 11:37 AM Andrea Foegler <> wrote:

> Hi folks,
> A lot of great / interesting use cases have come up on this thread!
> I would like to advocate for this discussion to include scoping and
> aggregation in a way that is consistent across types of metrics: Gauge,
> Cumulative, etc. In my head, Gauge = "instantaneous reading" and Cumulative
> = "incrementally changing".  There might be some implication on meaningful
> aggregations there, but feels like the specification and handling of scope
> and aggregations could be shared.
> The existing Cumulative Metric implementation may provide a few challenges
> to this, but also a good starting point.  Below I've summarized my
> interpretation of Cumulative Metric behavior and some considerations around
> flexibility.
> Cumulative Metrics
> * Programming model: Implicitly scoped to reporting Transform;
> * Reporting: Implicitly scoped to Transform and Bundle;
> * Monitoring: Explicitly scoped to Transform
> Re: accumulation
> The Bundle scoping at runtime is only strictly required for handling
> "Bundle failed" semantics.  But it also conveniently handles the
> distributed nature of the computation. Regardless of the "logical" nature
> of these metrics, I don't know that external monitoring systems support a
> notion of implicit aggregation across reporting entities.  I believe they
> would require a per-reporting-entity id/scope of some sort.  Bundle id
> works.  A worker identifier would work too.  Then the monitoring system
> would allow aggregations over those per-bundle or per-worker metrics.  But
> the reported metric values must have been received segmented by reporter.
> **Important note: the cardinality of Bundle is likely far to large to
> handle as a scope explicitly.  As Robert mentioned, the same is true for
> "key".  This could be an argument for making certain scopes only available
> to Distributions or at reporting time for internal aggregations.
> Re: scoping / identification
> There's an interesting consideration moving between the scoping of the
> programming language and the scoping of the intended metric.  By implicitly
> scoping Cumulative metrics to a ParDo, the scope of the metric and the
> scope of the variable are in sync.  By scoping to anything else, there's an
> implication that 2 different Metric variables parameterized by the same
> name/ID actually refer to the same Metric. So if a user wanted an
> rpc_error_count metric that accumulated RPC errors between the 3 different
> ParDos that make RPC calls, Cumulative Metrics can't currently support
> that.  If we plan to support something like that for Gauge, consistency
> would nice.
> Aside:  The current usage of Gauge in the SDK looks to be for a Globally
> observable value where the user would like to see the value exported, but
> there's no clear owner.  So every entity reports and the newest value
> wins.  Just mentioning so that that use case is considered.
> Not a proposal, just a high level organization of metric parameters that
> might be useful for discussion. (I'm not sure how to account for local
> aggregation here.  Kind of seems like any arbitrary code block could be
> supported there.)
> Metric Name: <Name>
> Metric Type: Gauge, Cumulative, Distribution...
> Scope: Transform, Global, Worker... (set of?)
> Data Type: Integer, Double, String...
> Aggregation: Sum, Latest, Max...
> Metric ID:  <Name> + <Scope ID>(s)
> Some potentially interesting questions:
> 1. How should the user specify the parameters of the metric?  What
> parameter combos are supported?
> 2. Should the SDK support different Metric variables contributing to the
> same Metric value? If so, how does the SDK communicate that?
> 3. Should runtime-only scopes be exposed to the user?  Useful for
> Distributions?
> 4. What should be exported with the metric - say to an external monitoring
> system?
> Cheers
> Andrea
> On Fri, Apr 6, 2018 at 12:51 PM Robert Bradshaw <>
> wrote:
>> On Fri, Apr 6, 2018 at 12:23 PM Ben Chambers <>
>> wrote:
>>> Generally strong +1 to everything Bill said. I would suggest though that
>>> the per-worker segmentation might be specified using some more general
>>> tagging/labeling API. For instance, all of the following seem like
>>> reasonable uses to support:
>>> 1. Gauge that is tagged with worker to get per-worker segmentation (such
>>> as queue size, memory usage, etc.)
>>> 2. Gauge that is tagged with the "key" being processed. Would be useful
>>> for things like how much data is buffered, where are watermark holds for
>>> each key, etc. If processing is partitioned by key, this is strictly more
>>> granular than per-worker.
>>> 3. Counter tagged with the "key" being processed. Would be useful for
>>> time spent processing each key, etc.
>> Per-key stuff gets really dangerous, as then the counter (control) plane
>> has O(#keys) items to keep track of. That is unless it is paired with some
>> kind of a lossy top/histogram aggregation.
>> However, I think Bill hits the nail on the head that there is an implicit
>> (ill-defined, in the model at least) segmentation desired here, with
>> different aggregations happening within vs. across segments. (Also, FWIW,
>> clobber is not the only aggregation that makes sense at the lowest level.)
>> Default counters use the same aggregation across both levels, giving useful
>> and well-defined semantics regardless of bundling and work distribution
>> (assuming associative aggregation of course), but perhaps the
>> counter/metrics APIs could be augmented to be able to explicitly request
>> the level and differing aggregation with respect to this segmentation.
>>> On Fri, Apr 6, 2018 at 11:39 AM Bill Neubauer <> wrote:
>>>> Thanks for unraveling those themes, Pablo!
>>>> 1. Seems reasonable in light of behaviors metrics backends support.
>>>> 2. Those same backends support histogramming of data, so having integer
>>>> types is very useful.
>>>> 3. I believe that is the case, for the reasons I mentioned earlier,
>>>> Gauges should only clobber previously values reported by the same entity.
>>>> Two workers with the same gauge should not be overwriting each other's
>>>> values, only their own. This implies per-worker segmentation.
>>>> On Fri, Apr 6, 2018 at 11:35 AM Pablo Estrada <>
>>>> wrote:
>>>>> Nobody wants to get rid of Gauges. I see that we have three separate
>>>>> themes being discussed here, and I think it's useful to point them out and
>>>>> address them independently:
>>>>> 1. Whether Gauges should change to hold string values.
>>>>> 2. If Gauges are to support string values, whether Gauges should also
>>>>> continue to have an int API.
>>>>> 3. Whether Beam should support some sort of label / tag / worker-id
>>>>> for aggregation of Gauges (maybe other metrics?)
>>>>> -P.
>>>>> On Fri, Apr 6, 2018 at 11:21 AM Ben Chambers <>
>>>>> wrote:
>>>>>> Gauges are incredibly useful for exposing the current state of the
>>>>>> system. For instance, number of elements in a queue, current memory 
>>>>>> usage,
>>>>>> number of RPCs in flight, etc. As mentioned above, these concepts exist 
>>>>>> in
>>>>>> numerous systems for monitoring distributed environments, including
>>>>>> Stackdriver Monitoring. The key to making them work is the addition of
>>>>>> labels or tags, which as an aside are also useful for *all* metric types,
>>>>>> not just Gauges.
>>>>>> If Beam gets rid of Gauges, how would we go about exporting "current"
>>>>>> values like memory usage, RPCs in flight, etc.?
>>>>>> -- Ben
>>>>>> On Fri, Apr 6, 2018 at 11:13 AM Kenneth Knowles <>
>>>>>> wrote:
>>>>>>> Just naively - the use cases that Gauge addresses seem relevant, and
>>>>>>> the information seems feasible to gather and present. The bit that 
>>>>>>> doesn't
>>>>>>> seem to make sense is aggregating gauges by clobbering each other. So I
>>>>>>> think that's just +1 Ben?
>>>>>>> On Fri, Apr 6, 2018 at 10:26 AM Raghu Angadi <>
>>>>>>> wrote:
>>>>>>>> I am not opposed to removing other data types, though they are
>>>>>>>> extra convenience for user.
>>>>>>>> In Scott's example above, if the metric is a counter, what are the
>>>>>>>> guarantees provided? E.g. would it match the global count using GBK? If
>>>>>>>> yes, then gauges (especially per-key gauges) can be very useful too 
>>>>>>>> (e.g.
>>>>>>>> backlog for each Kafka partition/split).
>>>>>>>> On Fri, Apr 6, 2018 at 10:01 AM Robert Bradshaw <
>>>>>>>>> wrote:
>>>>>>>>> A String API makes it clear(er) that the values will not be
>>>>>>>>> aggregated in any way across workers. I don't think retaining both 
>>>>>>>>> APIs
>>>>>>>>> (except for possibly some short migration period) worthwhile. On 
>>>>>>>>> another
>>>>>>>>> note, I still find the distributed gague API to be a bit odd in 
>>>>>>>>> general.
>>>>>>>>> On Fri, Apr 6, 2018 at 9:46 AM Raghu Angadi <>
>>>>>>>>> wrote:
>>>>>>>>>> I would be in favor of replacing the existing Gauge.set(long) API
>>>>>>>>>>> with the String version and removing the old one. This would be a 
>>>>>>>>>>> breaking
>>>>>>>>>>> change. However this is a relatively new API and is still marked
>>>>>>>>>>> @Experimental. Keeping the old API would retain the potential 
>>>>>>>>>>> confusion.
>>>>>>>>>>> It's better to simplify the API surface: having two APIs makes it 
>>>>>>>>>>> less
>>>>>>>>>>> clear which one users should choose.
>>>>>>>>>> Supporting additional data types sounds good. But the above
>>>>>>>>>> states string API will replace the existing API. I do not see how 
>>>>>>>>>> string
>>>>>>>>>> API makes the semantics more clear.  Semantically both are same to 
>>>>>>>>>> the
>>>>>>>>>> user.
>>>>>>>>>> On Fri, Apr 6, 2018 at 9:31 AM Pablo Estrada <>
>>>>>>>>>> wrote:
>>>>>>>>>>> Hi Ben : D
>>>>>>>>>>> Sure, that's reasonable. And perhaps I started the discussion in
>>>>>>>>>>> the wrong direction. I'm not questioning the utility of Gauge 
>>>>>>>>>>> metrics.
>>>>>>>>>>> What I'm saying is that Beam only supports integers,, but Gauges
>>>>>>>>>>> are aggregated by dropping old values depending on their update 
>>>>>>>>>>> times; so
>>>>>>>>>>> it might be desirable to not restrict the data type to just 
>>>>>>>>>>> integers.
>>>>>>>>>>> -P.
>>>>>>>>>>> On Fri, Apr 6, 2018 at 9:19 AM Ben Chambers <
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> See for instance how gauge metrics are handled in Prometheus,
>>>>>>>>>>>> Datadog and Stackdriver monitoring. Gauges are perfect for use in
>>>>>>>>>>>> distributed systems, they just need to be properly labeled. 
>>>>>>>>>>>> Perhaps we
>>>>>>>>>>>> should apply a default tag or allow users to specify one.
>>>>>>>>>>>> On Fri, Apr 6, 2018, 9:14 AM Ben Chambers <>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Some metrics backend label the value, for instance with the
>>>>>>>>>>>>> worker that sent it. Then the aggregation is latest per label. 
>>>>>>>>>>>>> This makes
>>>>>>>>>>>>> it useful for holding values such as "memory usage" that need to 
>>>>>>>>>>>>> hold
>>>>>>>>>>>>> current value.
>>>>>>>>>>>>> On Fri, Apr 6, 2018, 9:00 AM Scott Wegner <>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> +1 on the proposal to support a "String" gauge.
>>>>>>>>>>>>>> To expand a bit, the current API doesn't make it clear that
>>>>>>>>>>>>>> the gauge value is based on local state. If a runner chooses to 
>>>>>>>>>>>>>> parallelize
>>>>>>>>>>>>>> a DoFn across many workers, each worker will have its own local 
>>>>>>>>>>>>>> Gauge
>>>>>>>>>>>>>> metric and its updates will overwrite other values. For example, 
>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>> API it looks like you could use a gauge to implement your own 
>>>>>>>>>>>>>> element count
>>>>>>>>>>>>>> metric:
>>>>>>>>>>>>>> long count = 0;
>>>>>>>>>>>>>> @ProcessElement
>>>>>>>>>>>>>> public void processElement(ProcessContext c) {
>>>>>>>>>>>>>>   myGauge.set(++count);
>>>>>>>>>>>>>>   c.output(c.element());
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> This looks correct, but each worker has their own local
>>>>>>>>>>>>>> 'count' field, and gauge metric updates from parallel workers 
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>> overwrite each other rather than get aggregated. So the final 
>>>>>>>>>>>>>> value would
>>>>>>>>>>>>>> be "the number of elements processed on one of the workers". 
>>>>>>>>>>>>>> (The correct
>>>>>>>>>>>>>> implementation uses a Counter metric).
>>>>>>>>>>>>>> I would be in favor of replacing the existing Gauge.set(long)
>>>>>>>>>>>>>> API with the String version and removing the old one. This would 
>>>>>>>>>>>>>> be a
>>>>>>>>>>>>>> breaking change. However this is a relatively new API and is 
>>>>>>>>>>>>>> still marked
>>>>>>>>>>>>>> @Experimental. Keeping the old API would retain the potential 
>>>>>>>>>>>>>> confusion.
>>>>>>>>>>>>>> It's better to simplify the API surface: having two APIs makes 
>>>>>>>>>>>>>> it less
>>>>>>>>>>>>>> clear which one users should choose.
>>>>>>>>>>>>>> On Fri, Apr 6, 2018 at 8:28 AM Pablo Estrada <
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>> As I was working on adding support for Gauges in Dataflow,
>>>>>>>>>>>>>>> some noted that Gauge is a fairly unusual kind of metric for a 
>>>>>>>>>>>>>>> distributed
>>>>>>>>>>>>>>> environment, since many workers will report different values 
>>>>>>>>>>>>>>> and stomp on
>>>>>>>>>>>>>>> each other's all the time.
>>>>>>>>>>>>>>> We also looked at Flink and Dropwizard Gauge metrics [1][2],
>>>>>>>>>>>>>>> and we found that these use generics, and Flink explicitly 
>>>>>>>>>>>>>>> mentions that a
>>>>>>>>>>>>>>> toString implementation is required[3].
>>>>>>>>>>>>>>> With that in mind, I'm thinking that it might make sense to
>>>>>>>>>>>>>>> 1) expand Gauge to support string values (keep int-based API 
>>>>>>>>>>>>>>> for backwards
>>>>>>>>>>>>>>> compatibility), and migrate it to use string behind the covers.
>>>>>>>>>>>>>>> What does everyone think about this?
>>>>>>>>>>>>>>> Best
>>>>>>>>>>>>>>> -P.
>>>>>>>>>>>>>>> 1 -
>>>>>>>>>>>>>>> 2 -
>>>>>>>>>>>>>>> 3 -
>>>>>>>>>>>>>>> JIRA issue for Gauge metrics -
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Got feedback? go/pabloem-feedback
>>>>>>>>>>>>>>> <>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Got feedback? http://go/swegner-feedback
>>>>>>>>>>>>> --
>>>>>>>>>>> Got feedback? go/pabloem-feedback
>>>>>>>>>>> <>
>>>>>>>>>> --
>>>>> Got feedback? go/pabloem-feedback
>>>>> <>

Reply via email to