kennknowles commented on issue #34212:
URL: https://github.com/apache/beam/issues/34212#issuecomment-3338982741

   > I'm having the same issue, tried all the beam sdk versions from` v2.57` to 
`v2.68` but I still get the same error. As a workaround I monkey-patched the 
code like this:
   > 
   > # main.py
   > 
   > # Apply Apache Beam trigger patch before importing beam modules
   > def _apply_beam_trigger_patch():
   >     """Apply monkey patch to fix after_synchronized_processing_time 
KeyError."""
   >     import apache_beam as beam
   >     from apache_beam.transforms.trigger import TriggerFn, 
AfterProcessingTime
   >     
   >     original_from_runner_api = TriggerFn.from_runner_api
   >     
   >     @staticmethod
   >     def patched_from_runner_api(proto, context):
   >         """Patched version that handles 
after_synchronized_processing_time."""
   >         from apache_beam.transforms.trigger import (
   >             AfterAll, AfterAny, AfterEach, AfterWatermark, 
AfterProcessingTime,
   >             Always, DefaultTrigger, AfterCount, _Never, OrFinally, 
Repeatedly
   >         )
   >         
   >         trigger_type = proto.WhichOneof('trigger')
   >         
   >         # Handle the special case of after_synchronized_processing_time
   >         if trigger_type == 'after_synchronized_processing_time':
   >             return AfterProcessingTime(delay=0)
   >         
   >         # Handle all other trigger types
   >         trigger_mapping = {
   >             'after_all': AfterAll,
   >             'after_any': AfterAny,
   >             'after_each': AfterEach,
   >             'after_end_of_window': AfterWatermark,
   >             'after_processing_time': AfterProcessingTime,
   >             'always': Always,
   >             'default': DefaultTrigger,
   >             'element_count': AfterCount,
   >             'never': _Never,
   >             'or_finally': OrFinally,
   >             'repeat': Repeatedly,
   >         }
   >         
   >         if trigger_type not in trigger_mapping:
   >             raise ValueError(f"Unknown trigger type: {trigger_type}")
   >         
   >         return trigger_mapping[trigger_type].from_runner_api(proto, 
context)
   >     
   >     TriggerFn.from_runner_api = patched_from_runner_api
   > 
   > # Apply the patch before importing beam
   > _apply_beam_trigger_patch()
   > 
   > import apache_beam as beam
   > 
   > # ... the pipeline definition and the rest of my main file goes here after 
patching
   
   Not a bad hack. We haven't released this change with any version of Beam 
because we weren't able to execute it. I believe that the next release we could 
roll this change forward and it would be supported.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to