Abacn commented on code in PR #31969:
URL: https://github.com/apache/beam/pull/31969#discussion_r1695685802


##########
sdks/python/apache_beam/metrics/cells.py:
##########
@@ -268,6 +268,63 @@ def to_runner_api_monitoring_info_impl(self, name, 
transform_id):
         ptransform=transform_id)
 
 
+class StringSetCell(MetricCell):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Tracks the current value for a StringSet metric.
+
+  Each cell tracks the state of a metric independently per context per bundle.
+  Therefore, each metric has a different cell in each bundle, that is later
+  aggregated.
+
+  This class is thread safe.
+  """
+  def __init__(self, *args):
+    super().__init__(*args)
+    self.data = StringSetAggregator.identity_element()
+
+  def add(self, value):
+    self.update(value)
+
+  def update(self, value):
+    # type: (str) -> None
+    if cython.compiled:
+      # We will hold the GIL throughout the entire _update.
+      self._update(value)
+    else:
+      with self._lock:
+        self._update(value)
+
+  def _update(self, value):
+    self.data.add(value)
+
+  def get_cumulative(self):
+    # type: () -> set
+    with self._lock:
+      return set(self.data)
+
+  def combine(self, other):
+    # type: (StringSetCell) -> StringSetCell
+    combined = StringSetAggregator().combine(self.data, other.data)
+    result = StringSetCell()
+    result.data = combined
+    return result
+
+  def to_runner_api_monitoring_info_impl(self, name, transform_id):
+    from apache_beam.metrics import monitoring_infos
+    # return monitoring_infos.set

Review Comment:
   thanks for catching it, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to