kamilwu commented on pull request #11856:
URL: https://github.com/apache/beam/pull/11856#issuecomment-676510970


   @lukecwik  @tysonjh 
   
   Recently, once again I've started working on side input tests with windows. 
Here's a part of my pipeline which I ended up with:
   
   ```python
   side_input = (
       self.pipeline
       | 'Main input: create' >> beam.Create(range(self.windows))
       | 'Main input: assign timestamps' >> beam.ParDo(AssignTimestamps())
       |
       'Main input: apply windows' >> beam.WindowInto(window.FixedWindows(1))
       | beam.Map(
     lambda _: {
       'key_size': 100,
       'value_size': 900,
       'num_records': 10000,
       'initial_splitting_num_bundles': 0,
       'initial_splitting_desired_bundle_size': 0,
       'sleep_per_input_record_sec': 0,
       'initial_splitting': 'const'
     })
       | 'Read from synthetic source' >> beam.ParDo(SyntheticSDFAsSource()))
   ```
   
   `self.windows` is `1000` and `AssignTimestamps` is a very simple DoFn that 
assigns timestamps to elements:
   
   ```python
   class AssignTimestamps(beam.DoFn):
     def __init__(self):
       # Avoid having to use save_main_session
       self.window = window
   
     def process(self, element):
       yield self.window.TimestampedValue(element, element)
   ```
   
   This doesn't work on Dataflow (although it works perfectly fine when using 
Direct runner). Here's an error message:
   ```
   File 
"/usr/local/lib/python3.7/site-packages/apache_beam/io/restriction_trackers.py",
 line 91, in __init__ assert isinstance(offset_range, OffsetRange) 
RuntimeError: AssertionError [while running 'Read from synthetic source']
   ```
   
   I did some debugging and discovered that `offset_range` is None in this 
context. 
   
   Does SDF properly support windowed side inputs on Dataflow? If not, do you 
know an other way to write tests for side inputs involving windows? Let me know 
if you need more information. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to