damccorm opened a new issue, #35934: URL: https://github.com/apache/beam/issues/35934
### What happened? I was trying to repro an issue with slowly updating side inputs, and I found that the following code only logs one element: ``` pipeline = beam.Pipeline(options=options) from apache_beam.transforms import core start_timestamp = time.time() # start timestamp of the periodic impulse main_input_fire_interval = 60 # interval in seconds at which the main input PCollection is emitted. side_input_fire_interval = 60 # interval in seconds at which the side input PCollection is emitted. def add_timestamp(element, timestamp=core.DoFn.TimestampParam): return [(element, timestamp)] side_input = ( pipeline | "SideInputImpulse" >> PeriodicImpulse( start_timestamp=start_timestamp, fire_interval=main_input_fire_interval) | "Window" >> WindowInto( GlobalWindows(), # Define the trigger. Since the Global Window never # closes, we must tell the runner WHEN to emit a result. This trigger # fires repeatedly for every new element that arrives. trigger=Repeatedly(AfterCount(1)), # Define the accumulation mode. DISCARDING tells the # runner to forget old values after a trigger fires. This prevents # state from growing indefinitely and ensures Latest.Globally() only # considers the most recent element. accumulation_mode=AccumulationMode.DISCARDING, ) | 'Add timestamp' >> core.ParDo(add_timestamp) ) side_input | "LocalLatest" >> Latest.Globally() | "show latest" >> beam.Map(logging.info) ``` However, if I change `Latest.Globally()` to `Latest.Globally().without_defaults()` it repeatedly yields a stream of elements as expected based on the trigger. I observed this on Dataflow (this pattern isn't supported on Prism, and I haven't validated it on other runners). As best I can tell, it is getting hung up on side inputs. ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [x] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [x] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org