Oscar Rodriguez created BEAM-10382:
--------------------------------------
Summary: The Python SDK is not handling the sessions properly.
Key: BEAM-10382
URL: https://issues.apache.org/jira/browse/BEAM-10382
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Affects Versions: 2.22.0, 2.20.0
Environment: Direct Runner and Dataflow
Reporter: Oscar Rodriguez
I tried this with Apache Beam 2.20 and 2.22
1) What I want to achieve:
I have a pipeline that is reading from Google Pub/Sub. The messages have user
and product information. In the end, I need to analyse the data so I can know,
for each user's session, how many products of each type there are.
2) What I did:
The first thing I do in my pipeline is a "Group by Key", using the user as a
key and using "beam.WindowInto(beam.window.Sessions(15))" as windows. Then, as
I need to aggregate over products for each user/session, I do another "Group by
Key", this time with the product as key.
3) What I expect to happen:
With the first "Group by key", the pipeline creates a different window for each
user/session combination. So, for the second "Group by key", I expect that it
doesn't
mix elements that come from different windows.
4) What actually happens:
If the messages are at least 1 second apart from each other, the pipeline works
as I expect.
However, if I publish all the messages at the same time, all the sessions and
users get mixed.
Here https://github.com/Oscar-Rod/apache-beam-testing you have a complete
working example.
To publish the messages I do the following:
{code:java}
def generate_message(user, products):
return json.dumps({"user": user, "products": products,}).encode("utf-8")
messages = [
generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
]
for message in messages:
publisher.publish(topic_path, data=message)
# time.sleep(1)
{code}
This will publish 9 messages. As the sessions are configured with a length of
15 seconds, it should create one session for each user. In the end, the user 1
should have 3 "prod_1", the user 2 should have 6 "prod_1" and the user 3 should
have 9 "prod_1".
The first step in the pipeline is reading from Pub/Sub:
{code:java}
messages = (
pipeline
| "read messages" >> beam.io.ReadFromPubSub(topic=options.input_topic)
| "parse to messages" >> beam.ParDo(ParseMessage())
){code}
It will parse the messages to the following Class:
{code:java}
class Product(BaseModel):
id: str
quantity: str
class Message(BaseModel):
user: str
products: List[Product]
timestamp: datetime
{code}
Then, I apply the sessions and the Group By Key:
{code:java}
sessions = (
messages
| "window" >> beam.WindowInto(beam.window.Sessions(15))
| "add key" >> beam.Map(lambda element: (element.user, element.products))
| "group by user" >> beam.GroupByKey()
){code}
After this, I am getting the following elements:
{code:java}
('user_1', [[Product(id='prod_1', quantity='1')], [Product(id='prod_1',
quantity='1')], [Product(id='prod_1', quantity='1')]])
('user_2', [[Product(id='prod_1', quantity='2')], [Product(id='prod_1',
quantity='2')], [Product(id='prod_1', quantity='2')]])
('user_3', [[Product(id='prod_1', quantity='3')], [Product(id='prod_1',
quantity='3')], [Product(id='prod_1', quantity='3')]])
{code}
To aggregate for each product, I need the product as a key, so I modified the
previous step to flatten the elements:
{code:java}
def flat_function(key, elements):
for element in elements:
yield (key, element)
sessions = (
messages
| "window" >> beam.WindowInto(beam.window.Sessions(15))
| "add key" >> beam.Map(lambda element: (element.user, element.products))
| "group by user" >> beam.GroupByKey()
| "first flatten" >> beam.FlatMapTuple(flat_function)
| "second flatten" >> beam.FlatMapTuple(flat_function)
)
{code}
And I am getting the following:
{code:java}
('user_1', Product(id='prod_1', quantity='1'))
('user_1', Product(id='prod_1', quantity='1'))
('user_1', Product(id='prod_1', quantity='1'))
('user_2', Product(id='prod_1', quantity='2'))
('user_2', Product(id='prod_1', quantity='2'))
('user_2', Product(id='prod_1', quantity='2'))
('user_3', Product(id='prod_1', quantity='3'))
('user_3', Product(id='prod_1', quantity='3'))
('user_3', Product(id='prod_1', quantity='3'))
{code}
Now, the last step:
{code:java}
products = (
sessions
| "add new key" >> beam.Map(lambda session: (session[1].id, (session[1],
session[0])))
| "group by product" >> beam.GroupByKey()
){code}
And here is where the issue happens. If the messages are published at least 1
second apart, this is what I get:
{code:java}
('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'),
(Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1',
quantity='1'), 'user_1')])
('prod_1', [(Product(id='prod_1', quantity='2'), 'user_2'),
(Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1',
quantity='2'), 'user_2')])
('prod_1', [(Product(id='prod_1', quantity='3'), 'user_3'),
(Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1',
quantity='3'), 'user_3')])
{code}
The result is what I expect, 3 elements, one per each user's session. And
looking at the "quantity" we can confirm that the result is correct. All
elements with "quantity=3" are in the same element, as they come from the same
user/session. The same applies to the elements with "quantity=2" and
"quantity=1".
However, if I publish the messages all at the same time, this is what I get:
{code:java}
('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'),
(Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1',
quantity='1'), 'user_1'), (Product(id='prod_1', quantity='2'), 'user_2'),
(Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1',
quantity='2'), 'user_2'), (Product(id='prod_1', quantity='3'), 'user_3'),
(Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1',
quantity='3'), 'user_3')]){code}
Only 1 element, with all the messages in it. So clearly, when the timestamp of
the messages is too close, Apache Beam can't put them in different sessions.
The fact the the behaviour of the pipeline changes when the timestamp of the
messages changes, makes me think that this is a bug in Apache Beam. What do you
think? Is it possible? Does anyone have an explanation as to why this happens?
Can this somehow be expected behaviour?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)