tvalentyn commented on code in PR #31969:
URL: https://github.com/apache/beam/pull/31969#discussion_r1695656913


##########
sdks/python/apache_beam/runners/portability/portable_runner.py:
##########
@@ -437,15 +437,16 @@ def _combine(committed, attempted, filter):
     ]
 
   def query(self, filter=None):
-    counters, distributions, gauges = [
+    counters, distributions, gauges, stringsets = [

Review Comment:
   nit: consider using either string_set or stringset consistently.



##########
sdks/python/apache_beam/metrics/monitoring_infos.py:
##########
@@ -67,10 +72,12 @@
     common_urns.monitoring_info_types.DISTRIBUTION_INT64_TYPE.urn)
 LATEST_INT64_TYPE = common_urns.monitoring_info_types.LATEST_INT64_TYPE.urn
 PROGRESS_TYPE = common_urns.monitoring_info_types.PROGRESS_TYPE.urn
+SET_STRING_TYPE = common_urns.monitoring_info_types.SET_STRING_TYPE.urn

Review Comment:
   Should this be STRING_SET_TYPE for consistency?



##########
sdks/python/apache_beam/metrics/cells.py:
##########
@@ -553,3 +610,22 @@ def combine(self, x, y):
   def result(self, x):
     # type: (GaugeData) -> GaugeResult
     return GaugeResult(x.get_cumulative())
+
+
+class StringSetAggregator(MetricAggregator):
+  @staticmethod
+  def identity_element():
+    # type: () -> set
+    return set()
+
+  def combine(self, x, y):
+    # type: (set, set) -> set
+    if len(x) == 0:
+      return y
+    elif len(y) == 0:
+      return x
+    else:
+      return set.union(x, y)

Review Comment:
   wondering if it makes sense to use return set.union(x, y) unconditionally. 
Could there be any concerns about side effects due to Aggregation returning a 
mutable collection that can be modified elsewhere after aggregation was 
computed?  
   



##########
sdks/python/apache_beam/metrics/cells.py:
##########
@@ -268,6 +268,63 @@ def to_runner_api_monitoring_info_impl(self, name, 
transform_id):
         ptransform=transform_id)
 
 
+class StringSetCell(MetricCell):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Tracks the current value for a StringSet metric.
+
+  Each cell tracks the state of a metric independently per context per bundle.
+  Therefore, each metric has a different cell in each bundle, that is later
+  aggregated.
+
+  This class is thread safe.
+  """
+  def __init__(self, *args):
+    super().__init__(*args)
+    self.data = StringSetAggregator.identity_element()
+
+  def add(self, value):
+    self.update(value)
+
+  def update(self, value):
+    # type: (str) -> None
+    if cython.compiled:
+      # We will hold the GIL throughout the entire _update.
+      self._update(value)
+    else:
+      with self._lock:
+        self._update(value)
+
+  def _update(self, value):
+    self.data.add(value)
+
+  def get_cumulative(self):
+    # type: () -> set
+    with self._lock:
+      return set(self.data)
+
+  def combine(self, other):
+    # type: (StringSetCell) -> StringSetCell
+    combined = StringSetAggregator().combine(self.data, other.data)
+    result = StringSetCell()
+    result.data = combined
+    return result
+
+  def to_runner_api_monitoring_info_impl(self, name, transform_id):
+    from apache_beam.metrics import monitoring_infos
+    # return monitoring_infos.set

Review Comment:
   leftover?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to