olokshyn opened a new issue, #27636:
URL: https://github.com/apache/beam/issues/27636

   ### What happened?
   
   Python SDK version: 2.48.0
   
   It is not possible to fire the `Sessions` window early based on the number 
of elements in it using the `AfterCount` trigger. The window only fires when 
the watermark advances and closes it.
   
   Related issue: https://github.com/apache/beam/issues/20813. It is supposed 
to be fixed, but reproduces for me.
   
   The code snippet provided below is a test that:
   1. Defines a one-day-long `Sessions` window.
   2. Defines the `trigger.AfterCount(1)` trigger that is supposed to fire the 
window early on every element.
   3. Combines the results of the window using a custom combiner that just puts 
all results in a single list. 
   4. Note that there are three elements: two of them fall within a single 
window, and the third one sits in its own window.
   
   Note that when the commented 
`trigger=trigger.AfterEach(trigger.AfterCount(1), trigger.AfterWatermark()),` 
line is used instead, the result is the same.
   
   Expected: All three elements will be processed separately because the window 
fires after every single element:
   ```python
   [
       ("user1", [1]),
       ("user1", [2]),
       ("user1", [5]),
   ]
   ```
   
   Actual: The first two elements are processed together because they fall in 
the same window based on the watermark:
   ```python
   [
       ("user1", [1, 2]),
       ("user1", [5]),
   ]
   ```
   
   The test that reproduces the issue:
   ```python
   from datetime import datetime, timedelta, timezone
   from typing import Iterable
   
   import pytest
   import apache_beam as beam
   from apache_beam.transforms import trigger
   from apache_beam.transforms.window import Sessions
   from apache_beam.pipeline_test import TestPipeline
   from apache_beam.pipeline import PipelineOptions
   from apache_beam.testing.test_stream import ElementEvent, TestStream, 
TimestampedValue, WatermarkEvent
   from apache_beam.testing.util import assert_that, equal_to
   
   
   base_datetime = datetime(2023, 7, 7, 10, 30, 0, 0, timezone.utc)
   
   
   @pytest.mark.parametrize(
       "events, expected",
       [
           (
               [
                   WatermarkEvent(0),
                   ElementEvent(
                       [
                           TimestampedValue(
                               ("user1", 1),
                               base_datetime.timestamp(),
                           ),
                       ]
                   ),
                   WatermarkEvent(base_datetime.timestamp() + 5),
                   ElementEvent(
                       [
                           TimestampedValue(
                               ("user1", 2),
                               (base_datetime + 
timedelta(minutes=10)).timestamp(),
                           ),
                       ]
                   ),
                   WatermarkEvent((base_datetime + 
timedelta(minutes=10)).timestamp() + 5),
                   ElementEvent(
                       [
                           TimestampedValue(
                               ("user1", 5),
                               (base_datetime + timedelta(days=1, 
minutes=20)).timestamp(),
                           ),
                       ]
                   ),
                   WatermarkEvent((base_datetime + timedelta(days=1, 
minutes=20)).timestamp() + 5),
               ],
               [
                   ("user1", [1]),
                   ("user1", [2]),
                   ("user1", [5]),
               ],
           ),
       ],
   )
   def test_after_count_trigger(events, expected):
       with TestPipeline(options=PipelineOptions(allow_unsafe_triggers=True)) 
as p:
           test_stream = p | TestStream(events=events, output_tags={None})
   
           pcoll = (
               test_stream
               | "Window"
               >> beam.WindowInto(
                   Sessions(int(timedelta(days=1).total_seconds())),
                   # trigger=trigger.AfterEach(trigger.AfterCount(1), 
trigger.AfterWatermark()),
                   trigger=trigger.AfterCount(1),
                   accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
               )
               | "Combine" >> beam.CombinePerKey(CustomCombine())
           )
           pcoll | beam.Map(print)
   
           assert_that(pcoll, equal_to(expected))
   
   
   class CustomCombine(beam.CombineFn):
       def create_accumulator(self) -> list:
           return []
   
       def add_input(self, accumulator: list, input) -> list:
           return accumulator + [input]
   
       def merge_accumulators(self, accumulators: Iterable[list]) -> list:
           res = [x for acc in accumulators for x in acc]
           return res
   
       def extract_output(self, accumulator):
           return accumulator
   
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to