olokshyn commented on issue #27636:
URL: https://github.com/apache/beam/issues/27636#issuecomment-1650096826
@tvalentyn that solved it, thank you!
So that was the change:
```python
options = PipelineOptions(
allow_unsafe_triggers=True,
streaming=True,
)
```
Pasting the working test for future reference:
```python
from datetime import datetime, timedelta, timezone
from typing import Iterable
import pytest
import apache_beam as beam
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from apache_beam.pipeline_test import TestPipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.testing.test_stream import ElementEvent, TestStream,
TimestampedValue, WatermarkEvent
from apache_beam.testing.util import assert_that, equal_to
base_datetime = datetime(2023, 7, 7, 10, 30, 0, 0, timezone.utc)
@pytest.mark.parametrize(
"events, expected",
[
(
[
WatermarkEvent(0),
ElementEvent(
[
TimestampedValue(
("user1", 1),
base_datetime.timestamp(),
),
]
),
WatermarkEvent(base_datetime.timestamp() + 5),
ElementEvent(
[
TimestampedValue(
("user1", 2),
(base_datetime +
timedelta(minutes=10)).timestamp(),
),
]
),
WatermarkEvent((base_datetime +
timedelta(minutes=10)).timestamp() + 5),
ElementEvent(
[
TimestampedValue(
("user1", 5),
(base_datetime + timedelta(days=1,
minutes=20)).timestamp(),
),
]
),
WatermarkEvent((base_datetime + timedelta(days=1,
minutes=20)).timestamp() + 5),
],
[
("user1", [1]),
("user1", [1, 2]),
("user1", [1, 2]),
("user1", [5]),
("user1", [5]),
],
),
],
)
def test_after_count_trigger(events, expected):
options = PipelineOptions(
allow_unsafe_triggers=True,
streaming=True,
)
with TestPipeline(options=options) as p:
test_stream = p | TestStream(events=events, output_tags={None})
pcoll = (
test_stream
| "Window"
>> beam.WindowInto(
Sessions(int(timedelta(days=1).total_seconds())),
trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
)
| "Combine" >> beam.CombinePerKey(CustomCombine())
)
pcoll | beam.Map(print)
assert_that(pcoll, equal_to(expected))
class CustomCombine(beam.CombineFn):
def create_accumulator(self) -> list:
return []
def add_input(self, accumulator: list, input) -> list:
return accumulator + [input]
def merge_accumulators(self, accumulators: Iterable[list]) -> list:
res = [x for acc in accumulators for x in acc]
return res
def extract_output(self, accumulator):
return accumulator
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]