shunping opened a new pull request, #36188: URL: https://github.com/apache/beam/pull/36188
Currently, each stage state only stores the latest pane info. In the some scenarios, that's not enough. For example, when we have multiple injected bundles, and each call `computeNextTriggeredPane`, the latter pane will overwrite the previous one. https://github.com/apache/beam/blob/117042c4ca5e99c256b52d80b2b6c43c347a5ac9/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1340 Here is the code to reproduce: ``` import logging import apache_beam as beam from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms import trigger from apache_beam.options.pipeline_options import PipelineOptions logging.basicConfig(level=logging.INFO) # prism runner option options = PipelineOptions([ "--environment_type=LOOPBACK", #"--runner=PrismRunner", "--runner=PortableRunner", "--job_endpoint=localhost:8073", "--experiments=prism_enable_rtc", "--prism_log_level=info", "--allow_unsafe_triggers", ]) with beam.Pipeline(options=options) as p: result = ( p | beam.Create([1, 2]) | beam.WindowInto( GlobalWindows(), trigger=trigger.Repeatedly(trigger.AfterCount(1)), accumulation_mode=trigger.AccumulationMode.DISCARDING, allowed_lateness=0, ) | beam.GroupBy() | beam.LogElements( level=logging.WARNING, with_timestamp=True, with_window=True, with_pane_info=True, use_epoch_time=True)) ``` Running on the current code, the result will be ``` WARNING:root:(BeamSchema_d621aa78_ce21_4a09_bcaf_45d143ced57e(), [2]), timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), pane_info=PaneInfo(first: False, last: False, timing: EARLY, index: 1, nonspeculative_index: -1) WARNING:root:(BeamSchema_d621aa78_ce21_4a09_bcaf_45d143ced57e(), [1]), timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), pane_info=PaneInfo(first: False, last: False, timing: EARLY, index: 1, nonspeculative_index: -1) ``` We can see both pane_info are the same for these two elements. The correct output should be: WARNING:root:(BeamSchema_ef4d452e_e897_4dd8_9002_014e04413bc6(), [2]), timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), pane_info=PaneInfo(first: False, last: False, timing: EARLY, index: 1, nonspeculative_index: -1) WARNING:root:(BeamSchema_ef4d452e_e897_4dd8_9002_014e04413bc6(), [1]), timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), pane_info=PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
