tvalentyn commented on issue #20528:
URL: https://github.com/apache/beam/issues/20528#issuecomment-1289118236
> This re-duplicates the windows for sliding windows WindowFns. (It will do
the wrong thing for sessions as well.)
Does not seem to repro with sessions. Passing test:
```
def test_combining_with_session_windows_and_fanout(self):
import logging
class ListFn(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, mutable_accumulator, element):
return mutable_accumulator + [element]
def merge_accumulators(self, accumulators):
res = []
for accu in accumulators:
res = res + accu
return res
def extract_output(self, accumulator):
return accumulator
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
def has_expected_values(actual):
from hamcrest.core import assert_that as hamcrest_assert
from hamcrest.library.collection import contains_exactly
from hamcrest.library.collection import only_contains
hamcrest_assert(ordered, contains_exactly([0, 1, 2, 3], [5, 6, 7,
8]))
result = (
p
| beam.Create([
window.TimestampedValue(0, Timestamp(seconds=0)),
window.TimestampedValue(1, Timestamp(seconds=1)),
window.TimestampedValue(2, Timestamp(seconds=2)),
window.TimestampedValue(3, Timestamp(seconds=3)),
window.TimestampedValue(5, Timestamp(seconds=5)),
window.TimestampedValue(6, Timestamp(seconds=6)),
window.TimestampedValue(7, Timestamp(seconds=7)),
window.TimestampedValue(8, Timestamp(seconds=8))])
| beam.WindowInto(window.Sessions(2))
|
beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]