Kludex commented on issue #33176:
URL: https://github.com/apache/beam/issues/33176#issuecomment-2516780840

   @liferoad @Abacn I was able to do what I want by manually creating a wrapper 
on the `PTransformer`s.
   
   ```py
   class AutoSpanDoFn(beam.DoFn):
       def __init__(self, transform: beam.PTransform[InputT, OutputT]):
           super().__init__()
           self.transform = transform
   
   
       def process(self, element: Any, *args, **kwargs):
           label = self.transform.label if hasattr(self.transform, "label") 
else "UnnamedTransform"
           with logfire.span(label):
               yield element
   
   
   
   
   class AutoSpanTransform(beam.PTransform[InputT, OutputT]):
       def __init__(self, transform: beam.PTransform[InputT, OutputT]):
           super().__init__()
           self.transform = transform
   
   
       def expand(self, input_or_inputs: InputT) -> OutputT:
           return input_or_inputs | beam.ParDo(AutoSpanDoFn(self.transform)) | 
self.transform
   ```
   
   And then, I can use it like this:
   
   ```py
   with logfire.span("main"):
       with Pipeline() as pipeline:
           text = [
               "To be, or not to be: that is the question: ",
               "Whether 'tis nobler in the mind to suffer ",
               "The slings and arrows of outrageous fortune, ",
               "Or to take arms against a sea of troubles, ",
           ]
   
           pipeline = (
               pipeline
               | "Create" >> beam.Create(text)
               | "Split" >> AutoSpanTransform(beam.ParDo(Split()))
               | "Filter" >> AutoSpanTransform(beam.Filter(lambda x: x != 
"the"))
               | "Print" >> AutoSpanTransform(beam.Map(logfire_print))
           )
   ```
   
   You can see the full code here: 
https://github.com/Kludex/logfire-apache-beam/blob/main/main.py
   
   You can run it with `python main.py`. 
   
   This is what I see in Logfire (the observability platform we are developing):
   
![image](https://github.com/user-attachments/assets/5becac24-72aa-4d63-bfbf-b19cc8def8f2)
   
   Now... I need to be able to do it automatically. I tried to do some 
patching, like this:
   ```py
   _original_pipeline_apply = Pipeline.apply
   
   
   def patched_pipeline_apply(self, transform, *args, **kwargs):
       if (
           isinstance(transform, beam.PTransform)
           and not isinstance(transform, AutoSpanTransform)
           and not getattr(transform, "_instrumented", False)
       ):
           transform = AutoSpanTransform(transform)  # AutoSpanTrasnform has a 
`_instrumented = True` in this code.
       return _original_pipeline_apply(self, transform, *args, **kwargs)
   
   
   Pipeline.apply = patched_pipeline_apply
   ```
   
   But I keep getting recursive exception - I'm still debugging it. But... I 
would appreciate help in two things:
   1. Would you accept a PR adding some kind of API for me to not need to do 
this? Maybe a callback on the `Pipeline()`?
   2. On my solution above, do you have any ideas on how to do it cleaner? In 
case I need a faster solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to