olokshyn commented on issue #27636:
URL: https://github.com/apache/beam/issues/27636#issuecomment-1649822630
Summary:
1. The DataFlow runner works correctly.
2. The DirectRunner runner works correctly.
3. The same trigger doesn't work with `TestStream` and `TestPipeline` on
DirectRunner.
The code used to test both runners is linked below.
The intention is to fire a `Sessions` window early on each single new
element, AND to fire this window when it is closed.
I used the following window definition, which, I believe, is more correct
than the one I used for the tests before:
```python
>> beam.WindowInto(
Sessions(WINDOW_LENGTH_SEC),
trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
)
```
Both DataFlow and DirectRunner runners behave as expected.
The code snippet:
```python
"""
A Python Beam pipeline that applies a sessions window to the Pub/Sub messages
with an early trigger that fires after every single element, accumulating
the results.
"""
from datetime import timedelta
from typing import Iterable
import json
from multiprocessing import Process
import time
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from google.cloud import pubsub_v1
logging.basicConfig(
format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def get_logger(name: str):
log = logging.getLogger(name)
log.setLevel(logging.INFO)
return log
m_log = get_logger("main")
PROJECT = "MYPROJECT"
PRODUCER_TOPIC = pubsub_v1.PublisherClient.topic_path(PROJECT,
"beam-test-producer-topic")
CONSUMER_TOPIC = pubsub_v1.PublisherClient.topic_path(PROJECT,
"beam-test-consumer-topic")
PRODUCER_SUBSCRIPTION =
pubsub_v1.SubscriberClient.subscription_path(PROJECT,
"beam-test-topic-producer-subscription")
CONSUMER_SUBSCRIPTION =
pubsub_v1.SubscriberClient.subscription_path(PROJECT,
"beam-test-topic-consumer-subscription")
WINDOW_LENGTH_SEC = timedelta(minutes=3).total_seconds()
WARMUP_TIME_SEC = timedelta(seconds=5).total_seconds()
COOLDOWN_TIME_SEC = timedelta(seconds=5).total_seconds()
def log_element(element, prefix):
m_log.info(f"{prefix}: {element}")
return element
def run_pipeline():
options = PipelineOptions(
# runner="DataflowRunner",
runner="DirectRunner",
streaming=True,
project=PROJECT,
region="europe-west1",
temp_location=f"gs://{PROJECT}-dataflow-temp",
allow_unsafe_triggers=True,
save_main_session=True,
)
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| "Read from PubSub" >>
beam.io.ReadFromPubSub(subscription=PRODUCER_SUBSCRIPTION).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
| "Extract" >> beam.Map(lambda x: (x["user_id"], x["value"]))
| "Log elements Before Window" >> beam.Map(log_element,
prefix="Before Window")
| "Window into"
>> beam.WindowInto(
Sessions(WINDOW_LENGTH_SEC),
trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
)
| "Combine" >> beam.CombinePerKey(CustomCombine())
| "Log elements After Combine" >> beam.Map(log_element,
prefix="After Combine")
| "Encode" >> beam.Map(lambda x: json.dumps({"user_id": x[0],
"values": x[1]}).encode("utf-8"))
| "Write to PubSub" >> beam.io.WriteToPubSub(CONSUMER_TOPIC)
)
class CustomCombine(beam.CombineFn):
def create_accumulator(self) -> list:
return []
def add_input(self, accumulator: list, input) -> list:
return accumulator + [input]
def merge_accumulators(self, accumulators: Iterable[list]) -> list:
res = [x for acc in accumulators for x in acc]
return res
def extract_output(self, accumulator):
return accumulator
def producer_worker():
publisher = pubsub_v1.PublisherClient()
log = get_logger("producer")
log.info(f"Producer sleeps for {WARMUP_TIME_SEC}")
time.sleep(WARMUP_TIME_SEC)
log.info("Producing first 5 messages")
for i in range(5):
data = {"user_id": "user1", "value": f"first-{i}"}
data = json.dumps(data).encode("utf-8")
future = publisher.publish(PRODUCER_TOPIC, data=data)
log.info(future.result())
time.sleep(COOLDOWN_TIME_SEC)
sleep_time = WINDOW_LENGTH_SEC + 10
log.info(f"Sleeping for {sleep_time} seconds")
time.sleep(sleep_time)
log.info("Producing next 5 messages")
for i in range(5):
data = {"user_id": "user1", "value": f"second-{i}"}
data = json.dumps(data).encode("utf-8")
future = publisher.publish(PRODUCER_TOPIC, data=data)
log.info(future.result())
time.sleep(COOLDOWN_TIME_SEC)
def consumer_callback(message):
log = get_logger("consumer")
data = json.loads(message.data.decode("utf-8"))
log.info(f"!!! MESSAGE !!!: {data}")
message.ack()
def consumer_worker():
log = get_logger("consumer")
subscriber = pubsub_v1.SubscriberClient()
log.info(f"Consumer is listening to {CONSUMER_SUBSCRIPTION}")
subscriber.subscribe(CONSUMER_SUBSCRIPTION, callback=consumer_callback)
try:
while True:
time.sleep(1)
finally:
log.warning("Consumer is shutting down")
def create_pubsub_resources():
publisher = pubsub_v1.PublisherClient()
try:
publisher.get_topic(topic=PRODUCER_TOPIC)
m_log.info(f"Topic {PRODUCER_TOPIC} already exists")
except Exception:
publisher.create_topic(name=PRODUCER_TOPIC)
m_log.info(f"Topic {PRODUCER_TOPIC} created")
time.sleep(COOLDOWN_TIME_SEC)
try:
publisher.get_topic(topic=CONSUMER_TOPIC)
m_log.info(f"Topic {CONSUMER_TOPIC} already exists")
except Exception:
publisher.create_topic(name=CONSUMER_TOPIC)
m_log.info(f"Topic {CONSUMER_TOPIC} created")
time.sleep(COOLDOWN_TIME_SEC)
subscriber = pubsub_v1.SubscriberClient()
try:
subscriber.get_subscription(subscription=PRODUCER_SUBSCRIPTION)
m_log.info(f"Subscription {PRODUCER_SUBSCRIPTION} already exists")
except Exception:
subscriber.create_subscription(name=PRODUCER_SUBSCRIPTION,
topic=PRODUCER_TOPIC)
m_log.info(f"Subscription {PRODUCER_SUBSCRIPTION} created")
time.sleep(COOLDOWN_TIME_SEC)
try:
subscriber.get_subscription(subscription=CONSUMER_SUBSCRIPTION)
m_log.info(f"Subscription {CONSUMER_SUBSCRIPTION} already exists")
except Exception:
subscriber.create_subscription(name=CONSUMER_SUBSCRIPTION,
topic=CONSUMER_TOPIC)
m_log.info(f"Subscription {CONSUMER_SUBSCRIPTION} created")
time.sleep(COOLDOWN_TIME_SEC)
def main():
create_pubsub_resources()
consumer = Process(target=consumer_worker)
consumer.start()
producer = Process(target=producer_worker)
producer.start()
run_pipeline()
producer.join()
consumer.join()
if __name__ == "__main__":
main()
```
This is the output when running with the DirectRunner:
```
2023-07-25 13:49:28.401 INFO Topic
projects/MYPROJECT/topics/beam-test-producer-topic already exists
2023-07-25 13:49:28.590 INFO Topic
projects/MYPROJECT/topics/beam-test-consumer-topic already exists
2023-07-25 13:49:33.943 INFO Subscription
projects/MYPROJECT/subscriptions/beam-test-topic-producer-subscription created
2023-07-25 13:49:44.958 INFO Subscription
projects/MYPROJECT/subscriptions/beam-test-topic-consumer-subscription created
0.00s - Debugger warning: It seems that frozen modules are being used, which
may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1
to disable this validation.
0.00s - Debugger warning: It seems that frozen modules are being used, which
may
0.00s - Debugger warning: It seems that frozen modules are being used, which
may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1
to disable this validation.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1
to disable this validation.
2023-07-25 13:49:51.583 INFO Producer sleeps for 5.0
2023-07-25 13:49:51.583 INFO Consumer is listening to
projects/MYPROJECT/subscriptions/beam-test-topic-consumer-subscription
2023-07-25 13:49:56.585 INFO Producing first 5 messages
2023-07-25 13:49:56.842 INFO 7901136752441307
2023-07-25 13:49:57.035 INFO Before Window: ('user1', 'first-0')
2023-07-25 13:49:57.084 INFO After Combine: ('user1', ['first-0'])
2023-07-25 13:49:58.425 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0']}
2023-07-25 13:50:01.915 INFO 7901152274226985
2023-07-25 13:50:02.102 INFO Before Window: ('user1', 'first-1')
2023-07-25 13:50:02.148 INFO After Combine: ('user1', ['first-0',
'first-1'])
2023-07-25 13:50:03.544 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-1']}
2023-07-25 13:50:06.986 INFO 7901136932871865
2023-07-25 13:50:07.172 INFO Before Window: ('user1', 'first-2')
2023-07-25 13:50:07.219 INFO After Combine: ('user1', ['first-0',
'first-1', 'first-2'])
2023-07-25 13:50:08.299 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-1', 'first-2']}
2023-07-25 13:50:12.069 INFO 8701650771078307
2023-07-25 13:50:12.246 INFO Before Window: ('user1', 'first-3')
2023-07-25 13:50:12.293 INFO After Combine: ('user1', ['first-0',
'first-3', 'first-1', 'first-2'])
2023-07-25 13:50:12.565 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-3', 'first-1', 'first-2']}
2023-07-25 13:50:17.143 INFO 7901136738393628
2023-07-25 13:50:17.338 INFO Before Window: ('user1', 'first-4')
2023-07-25 13:50:17.384 INFO After Combine: ('user1', ['first-0',
'first-3', 'first-1', 'first-4', 'first-2'])
2023-07-25 13:50:18.635 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-3', 'first-1', 'first-4', 'first-2']}
2023-07-25 13:50:22.149 INFO Sleeping for 190.0 seconds
2023-07-25 13:53:29.395 INFO After Combine: ('user1', ['first-0',
'first-3', 'first-1', 'first-4', 'first-2'])
2023-07-25 13:53:29.682 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-3', 'first-1', 'first-4', 'first-2']}
2023-07-25 13:53:32.154 INFO Producing next 5 messages
2023-07-25 13:53:32.233 INFO 7901152440676901
2023-07-25 13:53:32.424 INFO Before Window: ('user1', 'second-0')
2023-07-25 13:53:32.470 INFO After Combine: ('user1', ['second-0'])
2023-07-25 13:53:32.732 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-0']}
2023-07-25 13:53:37.312 INFO 8701672411108061
2023-07-25 13:53:37.501 INFO Before Window: ('user1', 'second-1')
2023-07-25 13:53:37.549 INFO After Combine: ('user1', ['second-0',
'second-1'])
2023-07-25 13:53:37.819 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-0', 'second-1']}
2023-07-25 13:53:42.392 INFO 7901136872402657
2023-07-25 13:53:42.591 INFO Before Window: ('user1', 'second-2')
2023-07-25 13:53:42.637 INFO After Combine: ('user1', ['second-0',
'second-1', 'second-2'])
2023-07-25 13:53:42.916 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-0', 'second-1', 'second-2']}
2023-07-25 13:53:47.468 INFO 7901136997195278
2023-07-25 13:53:47.658 INFO Before Window: ('user1', 'second-3')
2023-07-25 13:53:47.705 INFO After Combine: ('user1', ['second-0',
'second-3', 'second-1', 'second-2'])
2023-07-25 13:53:47.978 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-0', 'second-3', 'second-1', 'second-2']}
2023-07-25 13:53:52.549 INFO 7901152531086753
2023-07-25 13:53:52.755 INFO Before Window: ('user1', 'second-4')
2023-07-25 13:53:52.800 INFO After Combine: ('user1', ['second-0',
'second-3', 'second-1', 'second-4', 'second-2'])
2023-07-25 13:53:53.066 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-0', 'second-3', 'second-1', 'second-4', 'second-2']}
2023-07-25 13:57:07.557 INFO After Combine: ('user1', ['second-0',
'second-3', 'second-1', 'second-4', 'second-2'])
2023-07-25 13:57:07.852 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-0', 'second-3', 'second-1', 'second-4', 'second-2']}
```
This is the output when running with DataFlow runner:
```
2023-07-25 13:37:30.021 INFO Topic
projects/MYPROJECT/topics/beam-test-producer-topic already exists
2023-07-25 13:37:30.211 INFO Topic
projects/MYPROJECT/topics/beam-test-consumer-topic already exists
2023-07-25 13:37:36.752 INFO Subscription
projects/MYPROJECT/subscriptions/beam-test-topic-producer-subscription created
2023-07-25 13:37:48.169 INFO Subscription
projects/MYPROJECT/subscriptions/beam-test-topic-consumer-subscription created
0.00s - Debugger warning: It seems that frozen modules are being used, which
may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1
to disable this validation.
0.00s - Debugger warning: It seems that frozen modules are being used, which
may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1
to disable this validation.
0.00s - Debugger warning: It seems that frozen modules are being used, which
may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1
to disable this validation.
0.00s - Debugger warning: It seems that frozen modules are being used, which
may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1
to disable this validation.
2023-07-25 13:37:54.684 INFO Producing first 5 messages
2023-07-25 13:37:54.684 INFO Consumer is listening to
projects/MYPROJECT/subscriptions/beam-test-topic-consumer-subscription
2023-07-25 13:37:54.954 INFO 8701580110692426
0.00s - Debugger warning: It seems that frozen modules are being used, which
may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1
to disable this validation.
2023-07-25 13:38:00.019 INFO 8701563637160080
2023-07-25 13:38:05.089 INFO 7901135680165217
2023-07-25 13:38:10.191 INFO 7901136060418932
2023-07-25 13:38:15.277 INFO 7901135838881582
2023-07-25 13:38:20.284 INFO Sleeping for 190.0 seconds
2023-07-25 13:41:30.287 INFO Producing next 5 messages
2023-07-25 13:41:30.419 INFO 7901136188227831
2023-07-25 13:41:35.494 INFO 7901135937988428
2023-07-25 13:41:40.588 INFO 7901136623003572
2023-07-25 13:41:45.681 INFO 7901136240116298
2023-07-25 13:41:50.128 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-2']}
2023-07-25 13:41:50.188 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-2', 'second-3']}
2023-07-25 13:41:50.189 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0']}
2023-07-25 13:41:50.192 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-2', 'second-3', 'second-1']}
2023-07-25 13:41:50.242 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-2']}
2023-07-25 13:41:50.295 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-2', 'second-3', 'second-1', 'second-0']}
2023-07-25 13:41:50.343 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-2', 'first-1']}
2023-07-25 13:41:50.383 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-2', 'first-1', 'first-4']}
2023-07-25 13:41:50.385 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-2', 'first-1', 'first-4', 'first-3']}
2023-07-25 13:41:50.742 INFO 7901135828414233
2023-07-25 13:41:52.975 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-2', 'second-3', 'second-1', 'second-0', 'second-4']}
2023-07-25 13:42:00.946 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['first-0', 'first-2', 'first-1', 'first-4', 'first-3']}
2023-07-25 13:45:27.291 INFO !!! MESSAGE !!!: {'user_id': 'user1',
'values': ['second-2', 'second-3', 'second-1', 'second-0', 'second-4']}
```
As you can see, both runners fire the `Sessions` window early on each new
element, accumulating the results, and fire once again when the window is
closed.
I tried the same trigger with the test case I mentioned in the bug report,
and it didn't work:
```python
from datetime import datetime, timedelta, timezone
from typing import Iterable
import pytest
import apache_beam as beam
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from apache_beam.pipeline_test import TestPipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.testing.test_stream import ElementEvent, TestStream,
TimestampedValue, WatermarkEvent
from apache_beam.testing.util import assert_that, equal_to
base_datetime = datetime(2023, 7, 7, 10, 30, 0, 0, timezone.utc)
@pytest.mark.parametrize(
"events, expected",
[
(
[
WatermarkEvent(0),
ElementEvent(
[
TimestampedValue(
("user1", 1),
base_datetime.timestamp(),
),
]
),
WatermarkEvent(base_datetime.timestamp() + 5),
ElementEvent(
[
TimestampedValue(
("user1", 2),
(base_datetime +
timedelta(minutes=10)).timestamp(),
),
]
),
WatermarkEvent((base_datetime +
timedelta(minutes=10)).timestamp() + 5),
ElementEvent(
[
TimestampedValue(
("user1", 5),
(base_datetime + timedelta(days=1,
minutes=20)).timestamp(),
),
]
),
WatermarkEvent((base_datetime + timedelta(days=1,
minutes=20)).timestamp() + 5),
],
[
("user1", [1]),
("user1", [1, 2]),
("user1", [5]),
],
),
],
)
def test_after_count_trigger(events, expected):
with TestPipeline(options=PipelineOptions(allow_unsafe_triggers=True))
as p:
test_stream = p | TestStream(events=events, output_tags={None})
pcoll = (
test_stream
| "Window"
>> beam.WindowInto(
Sessions(int(timedelta(days=1).total_seconds())),
trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
)
| "Combine" >> beam.CombinePerKey(CustomCombine())
)
pcoll | beam.Map(print)
assert_that(pcoll, equal_to(expected))
class CustomCombine(beam.CombineFn):
def create_accumulator(self) -> list:
return []
def add_input(self, accumulator: list, input) -> list:
return accumulator + [input]
def merge_accumulators(self, accumulators: Iterable[list]) -> list:
res = [x for acc in accumulators for x in acc]
return res
def extract_output(self, accumulator):
return accumulator
```
Instead of the expected result:
```python
[
("user1", [1]), # when 1 arrives
("user1", [1, 2]), # when 2 arrives
("user1", [1, 2]), # when the window is closed
("user1", [5]), # when 5 arrives
("user1", [5]), # when the window is closed
],
```
where the window fires on each new element (preserving the previous
elements) plus fires when the window is closed.
It produces this result:
```python
[
("user1", [1, 2]),
("user1", [1, 2]),
("user1", [5]),
("user1", [5]),
],
```
As you can see, it never fires on the very first element alone.
Could you point me please to the part of the code that might be causing this
bug?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]