damccorm opened a new issue, #35934:
URL: https://github.com/apache/beam/issues/35934

   ### What happened?
   
   I was trying to repro an issue with slowly updating side inputs, and I found 
that the following code only logs one element:
   
   ```
   pipeline = beam.Pipeline(options=options)
   
   from apache_beam.transforms import core
   
   
   start_timestamp = time.time() # start timestamp of the periodic impulse
   main_input_fire_interval = 60 # interval in seconds at which the main input 
PCollection is emitted.
   side_input_fire_interval = 60 # interval in seconds at which the side input 
PCollection is emitted.
   
   def add_timestamp(element, timestamp=core.DoFn.TimestampParam):
     return [(element, timestamp)]
   
   side_input = (
         pipeline
         | "SideInputImpulse" >> PeriodicImpulse(
             start_timestamp=start_timestamp,
             fire_interval=main_input_fire_interval)
         | "Window" >> WindowInto( 
             GlobalWindows(), 
             # Define the trigger. Since the Global Window never  # closes, we 
must tell the runner WHEN to emit a result. This trigger 
             # fires repeatedly for every new element that arrives.
             trigger=Repeatedly(AfterCount(1)), 
             # Define the accumulation mode. DISCARDING tells the  # runner to 
forget old values after a trigger fires. This prevents  # state from growing 
indefinitely and ensures Latest.Globally() only 
             # considers the most recent element. 
             accumulation_mode=AccumulationMode.DISCARDING, 
             ) 
         | 'Add timestamp' >> core.ParDo(add_timestamp)
   )
   
   side_input | "LocalLatest" >> Latest.Globally() | "show latest" >> 
beam.Map(logging.info)
   ```
   
   However, if I change `Latest.Globally()` to 
`Latest.Globally().without_defaults()` it repeatedly yields a stream of 
elements as expected based on the trigger. I observed this on Dataflow (this 
pattern isn't supported on Prism, and I haven't validated it on other runners). 
As best I can tell, it is getting hung up on side inputs.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [x] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [x] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to