TheNeuralBit commented on issue #29365:
URL: https://github.com/apache/beam/issues/29365#issuecomment-1847646822
I spent a little time looking at this. I found it's possible to minimally
reproduce this with dask only, configured to use a `LocalCluster`.
With this script:
```py
from distributed import Client, LocalCluster
from dask import bag as db
from pprint import pprint
def run():
cluster = LocalCluster()
client = Client(cluster)
b = db.from_sequence([('a', 1),
('a', 2),
('b', 3),
('b', 4)])
grouped = b.groupby(lambda s: s[0])
for i in range(10):
print(i)
pprint(list(grouped))
if __name__ == '__main__':
run()
```
I get this output:
```
0
[('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
1
[('a', [('a', 2)]), ('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
2
[('a', [('a', 2)]), ('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
3
[('b', [('b', 4)]), ('a', [('a', 2)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
4
[('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
5
[('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
6
[('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
7
[('a', [('a', 2)]), ('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
8
[('b', [('b', 4)]), ('a', [('a', 2)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
9
[('b', [('b', 4)]), ('a', [('a', 2)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
```
This is the same behavior I see in a simple beam groupby test. I'm not sure
how to make sense of this. Some possible explanations:
- We're misunderstanding the semantics of groupby and it's not actually
doing a shuffle.
- Encoding python strings for dask shuffle is non-determinstic somehow.
Actually it looks like it must be the latter. Switching to integer keys
works as expected (as you observed in the beam test):
```
❯ python test.py
0
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
1
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
2
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
3
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
4
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
5
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
6
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
7
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
8
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
9
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
```
We should probably raise an issue with dask.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]