shunping opened a new pull request, #36188:
URL: https://github.com/apache/beam/pull/36188

   Currently, each stage state only stores the latest pane info. In the some 
scenarios, that's not enough. For example, when we have multiple injected 
bundles, and each call `computeNextTriggeredPane`, the latter pane will 
overwrite the previous one.
   
   
https://github.com/apache/beam/blob/117042c4ca5e99c256b52d80b2b6c43c347a5ac9/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1340
   
   
   Here is the code to reproduce:
   ```
   import logging
   
   import apache_beam as beam
   from apache_beam.transforms.window import GlobalWindows
   from apache_beam.transforms import trigger
   from apache_beam.options.pipeline_options import PipelineOptions
   
   logging.basicConfig(level=logging.INFO)
   
   # prism runner option
   options = PipelineOptions([
       "--environment_type=LOOPBACK",
       #"--runner=PrismRunner",
       "--runner=PortableRunner",
       "--job_endpoint=localhost:8073",
       "--experiments=prism_enable_rtc",
       "--prism_log_level=info",
       "--allow_unsafe_triggers",
   ])
   
   with beam.Pipeline(options=options) as p:
     result = (
         p | beam.Create([1, 2])
         | beam.WindowInto(
             GlobalWindows(),
             trigger=trigger.Repeatedly(trigger.AfterCount(1)),
             accumulation_mode=trigger.AccumulationMode.DISCARDING,
             allowed_lateness=0,
         )
         | beam.GroupBy()
         | beam.LogElements(
             level=logging.WARNING,
             with_timestamp=True,
             with_window=True,
             with_pane_info=True,
             use_epoch_time=True))
   ```
   
   Running on the current code, the result will be
   ```
   WARNING:root:(BeamSchema_d621aa78_ce21_4a09_bcaf_45d143ced57e(), [2]), 
timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), 
pane_info=PaneInfo(first: False, last: False, timing: EARLY, index: 1, 
nonspeculative_index: -1)
   WARNING:root:(BeamSchema_d621aa78_ce21_4a09_bcaf_45d143ced57e(), [1]), 
timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), 
pane_info=PaneInfo(first: False, last: False, timing: EARLY, index: 1, 
nonspeculative_index: -1)
   ```
   
   We can see both pane_info are the same for these two elements.
   
   The correct output should be:
   WARNING:root:(BeamSchema_ef4d452e_e897_4dd8_9002_014e04413bc6(), [2]), 
timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), 
pane_info=PaneInfo(first: False, last: False, timing: EARLY, index: 1, 
nonspeculative_index: -1)
   WARNING:root:(BeamSchema_ef4d452e_e897_4dd8_9002_014e04413bc6(), [1]), 
timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), 
pane_info=PaneInfo(first: True, last: False, timing: EARLY, index: 0, 
nonspeculative_index: -1)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to