This is an automated email from the ASF dual-hosted git repository.

anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bd23b16666 Add counter for MLTransform and data processing transforms 
(#28927)
2bd23b16666 is described below

commit 2bd23b166660a5a6fdf78f0615d3cd6a101122f2
Author: Anand Inguva <[email protected]>
AuthorDate: Wed Oct 11 16:20:31 2023 +0000

    Add counter for MLTransform and data processing transforms (#28927)
---
 sdks/python/apache_beam/ml/transforms/base.py      | 39 +++++++++++++++++++++-
 sdks/python/apache_beam/ml/transforms/base_test.py | 25 ++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/ml/transforms/base.py 
b/sdks/python/apache_beam/ml/transforms/base.py
index a45928f5c8b..49ce6e9ec1e 100644
--- a/sdks/python/apache_beam/ml/transforms/base.py
+++ b/sdks/python/apache_beam/ml/transforms/base.py
@@ -25,6 +25,7 @@ from typing import Sequence
 from typing import TypeVar
 
 import apache_beam as beam
+from apache_beam.metrics.metric import Metrics
 
 __all__ = ['MLTransform', 'ProcessHandler', 'BaseOperation']
 
@@ -88,6 +89,13 @@ class BaseOperation(Generic[OperationInputT, 
OperationOutputT], abc.ABC):
       transformed_data = {**transformed_data, **artifacts}
     return transformed_data
 
+  def get_counter(self):
+    """
+    Returns the counter name for the operation.
+    """
+    counter_name = self.__class__.__name__
+    return Metrics.counter(MLTransform, f'BeamML_{counter_name}')
+
 
 class ProcessHandler(Generic[ExampleT, MLTransformOutputT], abc.ABC):
   """
@@ -194,6 +202,9 @@ class 
MLTransform(beam.PTransform[beam.PCollection[ExampleT],
         transforms=transforms)  # type: ignore[arg-type]
 
     self._process_handler = process_handler
+    self.transforms = transforms
+    self._counter = Metrics.counter(
+        MLTransform, f'BeamML_{self.__class__.__name__}')
 
   def expand(
       self, pcoll: beam.PCollection[ExampleT]
@@ -209,8 +220,11 @@ class 
MLTransform(beam.PTransform[beam.PCollection[ExampleT],
     Args:
       pcoll: A PCollection of ExampleT type.
     Returns:
-      A PCollection of MLTransformOutputT type.
+      A PCollection of MLTransformOutputT type
     """
+    _ = (
+        pcoll.pipeline
+        | "MLTransformMetricsUsage" >> MLTransformMetricsUsage(self))
     return self._process_handler.process_data(pcoll)
 
   def with_transform(self, transform: BaseOperation):
@@ -230,3 +244,26 @@ class 
MLTransform(beam.PTransform[beam.PCollection[ExampleT],
       raise TypeError(
           'transform must be a subclass of BaseOperation. '
           'Got: %s instead.' % type(transform))
+
+
+class MLTransformMetricsUsage(beam.PTransform):
+  def __init__(self, ml_transform: MLTransform):
+    self._ml_transform = ml_transform
+    self._ml_transform._counter.inc()
+
+  def expand(self, pipeline):
+    def _increment_counters():
+      # increment for MLTransform.
+      self._ml_transform._counter.inc()
+      # increment if data processing transforms are passed.
+      transforms = (
+          self._ml_transform.transforms or
+          self._ml_transform._process_handler.transforms)
+      if transforms:
+        for transform in transforms:
+          transform.get_counter().inc()
+
+    _ = (
+        pipeline
+        | beam.Create([None])
+        | beam.Map(lambda _: _increment_counters()))
diff --git a/sdks/python/apache_beam/ml/transforms/base_test.py 
b/sdks/python/apache_beam/ml/transforms/base_test.py
index df7a6d26b47..2e447964541 100644
--- a/sdks/python/apache_beam/ml/transforms/base_test.py
+++ b/sdks/python/apache_beam/ml/transforms/base_test.py
@@ -27,6 +27,7 @@ from parameterized import param
 from parameterized import parameterized
 
 import apache_beam as beam
+from apache_beam.metrics.metric import MetricsFilter
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 
@@ -244,6 +245,30 @@ class BaseMLTransformTest(unittest.TestCase):
           equal_to(expected_output_y, equals_fn=np.array_equal),
           label='actual_output_y')
 
+  def test_mltransform_with_counter(self):
+    transforms = [
+        tft.ComputeAndApplyVocabulary(columns=['y']),
+        tft.ScaleTo01(columns=['x'])
+    ]
+    data = [{'x': [1, 2, 3], 'y': ['a', 'b', 'c']}]
+    with beam.Pipeline() as p:
+      _ = (
+          p | beam.Create(data)
+          | base.MLTransform(
+              transforms=transforms,
+              write_artifact_location=self.artifact_location))
+    scale_to_01_counter = MetricsFilter().with_name('BeamML_ScaleTo01')
+    vocab_counter = MetricsFilter().with_name(
+        'BeamML_ComputeAndApplyVocabulary')
+    mltransform_counter = MetricsFilter().with_name('BeamML_MLTransform')
+    result = p.result
+    self.assertEqual(
+        result.metrics().query(scale_to_01_counter)['counters'][0].result, 1)
+    self.assertEqual(
+        result.metrics().query(vocab_counter)['counters'][0].result, 1)
+    self.assertEqual(
+        result.metrics().query(mltransform_counter)['counters'][0].result, 1)
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to