tvalentyn commented on code in PR #31969: URL: https://github.com/apache/beam/pull/31969#discussion_r1695656913
########## sdks/python/apache_beam/runners/portability/portable_runner.py: ########## @@ -437,15 +437,16 @@ def _combine(committed, attempted, filter): ] def query(self, filter=None): - counters, distributions, gauges = [ + counters, distributions, gauges, stringsets = [ Review Comment: nit: consider using either string_set or stringset consistently. ########## sdks/python/apache_beam/metrics/monitoring_infos.py: ########## @@ -67,10 +72,12 @@ common_urns.monitoring_info_types.DISTRIBUTION_INT64_TYPE.urn) LATEST_INT64_TYPE = common_urns.monitoring_info_types.LATEST_INT64_TYPE.urn PROGRESS_TYPE = common_urns.monitoring_info_types.PROGRESS_TYPE.urn +SET_STRING_TYPE = common_urns.monitoring_info_types.SET_STRING_TYPE.urn Review Comment: Should this be STRING_SET_TYPE for consistency? ########## sdks/python/apache_beam/metrics/cells.py: ########## @@ -553,3 +610,22 @@ def combine(self, x, y): def result(self, x): # type: (GaugeData) -> GaugeResult return GaugeResult(x.get_cumulative()) + + +class StringSetAggregator(MetricAggregator): + @staticmethod + def identity_element(): + # type: () -> set + return set() + + def combine(self, x, y): + # type: (set, set) -> set + if len(x) == 0: + return y + elif len(y) == 0: + return x + else: + return set.union(x, y) Review Comment: wondering if it makes sense to use return set.union(x, y) unconditionally. Could there be any concerns about side effects due to Aggregation returning a mutable collection that can be modified elsewhere after aggregation was computed? ########## sdks/python/apache_beam/metrics/cells.py: ########## @@ -268,6 +268,63 @@ def to_runner_api_monitoring_info_impl(self, name, transform_id): ptransform=transform_id) +class StringSetCell(MetricCell): + """For internal use only; no backwards-compatibility guarantees. + + Tracks the current value for a StringSet metric. + + Each cell tracks the state of a metric independently per context per bundle. + Therefore, each metric has a different cell in each bundle, that is later + aggregated. + + This class is thread safe. + """ + def __init__(self, *args): + super().__init__(*args) + self.data = StringSetAggregator.identity_element() + + def add(self, value): + self.update(value) + + def update(self, value): + # type: (str) -> None + if cython.compiled: + # We will hold the GIL throughout the entire _update. + self._update(value) + else: + with self._lock: + self._update(value) + + def _update(self, value): + self.data.add(value) + + def get_cumulative(self): + # type: () -> set + with self._lock: + return set(self.data) + + def combine(self, other): + # type: (StringSetCell) -> StringSetCell + combined = StringSetAggregator().combine(self.data, other.data) + result = StringSetCell() + result.data = combined + return result + + def to_runner_api_monitoring_info_impl(self, name, transform_id): + from apache_beam.metrics import monitoring_infos + # return monitoring_infos.set Review Comment: leftover? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org