TheNeuralBit commented on issue #29365:
URL: https://github.com/apache/beam/issues/29365#issuecomment-1847646822

   I spent a little time looking at this. I found it's possible to minimally 
reproduce this with dask only, configured to use a `LocalCluster`.
   
   With this script:
   ```py
   from distributed import Client, LocalCluster
   from dask import bag as db
   from pprint import pprint
   
   
   def run():
     cluster = LocalCluster()
     client = Client(cluster)
   
     b = db.from_sequence([('a', 1),
                           ('a', 2),
                           ('b', 3),
                           ('b', 4)])
   
     grouped = b.groupby(lambda s: s[0])
   
     for i in range(10):
       print(i)
       pprint(list(grouped))
   
   
   if __name__ == '__main__':
       run()
   ```
   
   I get this output:
   ```
   0
   [('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
   1
   [('a', [('a', 2)]), ('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
   2
   [('a', [('a', 2)]), ('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
   3
   [('b', [('b', 4)]), ('a', [('a', 2)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
   4
   [('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
   5
   [('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
   6
   [('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
   7
   [('a', [('a', 2)]), ('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
   8
   [('b', [('b', 4)]), ('a', [('a', 2)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
   9
   [('b', [('b', 4)]), ('a', [('a', 2)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
   ```
   
   This is the same behavior I see in a simple beam groupby test. I'm not sure 
how to make sense of this. Some possible explanations:
   - We're misunderstanding the semantics of groupby and it's not actually 
doing a shuffle.
   - Encoding python strings for dask shuffle is non-determinstic somehow.
   
   Actually it looks like it must be the latter. Switching to integer keys 
works as expected (as you observed in the beam test):
   ```
   ❯ python test.py
   0
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   1
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   2
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   3
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   4
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   5
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   6
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   7
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   8
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   9
   [(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
   ```
   
   We should probably raise an issue with dask.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to