tvalentyn commented on issue #20528:
URL: https://github.com/apache/beam/issues/20528#issuecomment-1289118236

   > This re-duplicates the windows for sliding windows WindowFns. (It will do 
the wrong thing for sessions as well.) 
   
   Does not seem to repro with sessions.  Passing test:
   
   ```
     def test_combining_with_session_windows_and_fanout(self):
       import logging
       class ListFn(beam.CombineFn):
         def create_accumulator(self):
           return []
   
         def add_input(self, mutable_accumulator, element):
           return mutable_accumulator + [element]
   
         def merge_accumulators(self, accumulators):
           res = []
           for accu in accumulators:
             res = res + accu
           return res
   
         def extract_output(self, accumulator):
           return accumulator
   
       options = PipelineOptions()
       options.view_as(StandardOptions).streaming = True
       with TestPipeline(options=options) as p:
         def has_expected_values(actual):
           from hamcrest.core import assert_that as hamcrest_assert
           from hamcrest.library.collection import contains_exactly
           from hamcrest.library.collection import only_contains
   
           hamcrest_assert(ordered, contains_exactly([0, 1, 2, 3], [5, 6, 7, 
8]))
          
         result = (
                 p
                 | beam.Create([
           window.TimestampedValue(0, Timestamp(seconds=0)),
           window.TimestampedValue(1, Timestamp(seconds=1)),
           window.TimestampedValue(2, Timestamp(seconds=2)),
           window.TimestampedValue(3, Timestamp(seconds=3)),
   
           window.TimestampedValue(5, Timestamp(seconds=5)),
           window.TimestampedValue(6, Timestamp(seconds=6)),
           window.TimestampedValue(7, Timestamp(seconds=7)),
           window.TimestampedValue(8, Timestamp(seconds=8))])
                 | beam.WindowInto(window.Sessions(2))
                 | 
beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
         )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to