[
https://issues.apache.org/jira/browse/BEAM-12552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490480#comment-17490480
]
Robert Bradshaw edited comment on BEAM-12552 at 2/10/22, 8:02 PM:
------------------------------------------------------------------
More recent SDKs reject this with
{{ValueError: MapCoder[StrUtf8Coder, MapCoder[StrUtf8Coder, VarIntCoder]]
cannot be made deterministic for 'Group By Key'.}}
or
{{TypeError: Unable to deterministically encode '{'key': {'image_id': 1,
'context_id': 2}}' of type '<class 'dict'>', please provide a type hint for the
input of 'Group By Key' [while running 'Create Dataset/Map(decode)']}}
if we weren't able to infer the types.
was (Author: robertwb):
More recent SDKs reject this with
```
ValueError: MapCoder[StrUtf8Coder, MapCoder[StrUtf8Coder, VarIntCoder]] cannot
be made deterministic for 'Group By Key'.
```
> GroupBy and GroupByKey doesn't group composite keys
> ---------------------------------------------------
>
> Key: BEAM-12552
> URL: https://issues.apache.org/jira/browse/BEAM-12552
> Project: Beam
> Issue Type: Bug
> Components: beam-model, sdk-py-core
> Affects Versions: 2.24.0
> Environment: 1. MacOS BigSur 11.4, Python 3.9
> 2. Google DataFlow, Python 3.9
> Reporter: Praneeth Peiris
> Priority: P3
> Labels: newbie, starter
>
> *Background*
> Even though GroupByKey and GroupBy work for simple key fields, it doesn't
> work for composite keys where the key is another dictionary.
>
> *Example:*
> {code:java}
> import apache_beam as beam
> def run(consumer_args, pipeline_args):
> with beam.Pipeline() as pipeline:
> pipeline | 'Create Dataset' >> beam.Create([
> ({'key': {'image_id': 1, 'context_id': 2}}, 1),
> ({'key': {'image_id': 2, 'context_id': 2}}, 2),
> ({'key': {'image_id': 3, 'context_id': 2}}, 3),
> ({'key': {'image_id': 4, 'context_id': 2}}, 4),
> ({'key': {'image_id': 5, 'context_id': 2}}, 5),
> ({'key': {'context_id': 2, 'image_id': 1}}, 6),
> ({'key': {'context_id': 2, 'image_id': 2}}, 7),
> ({'key': {'context_id': 2, 'image_id': 3}}, 8),
> ({'key': {'context_id': 2, 'image_id': 4}}, 9),
> ({'key': {'context_id': 2, 'image_id': 5}}, 10),
> ]) | 'Group By Key' >> beam.GroupByKey() \
> | 'Print Stuff' >> beam.Map(print)
> pipeline.run()
> {code}
> Note there are records with the same keys (semantically) but in a different
> order.
> The same issue occurs for {{GroupBy}} when we use a mapper function.
> {code:java}
> | 'Group By Key' >> beam.GroupByKey(lambda record: record[0]['key'])
> {code}
>
> *Expected output:*
> {code:java}
> ({'key': {'image_id': 1, 'context_id': 2}}, [1, 6])
> ({'key': {'image_id': 2, 'context_id': 2}}, [2, 7])
> ({'key': {'image_id': 3, 'context_id': 2}}, [3, 8])
> ({'key': {'image_id': 4, 'context_id': 2}}, [4, 9])
> ({'key': {'image_id': 5, 'context_id': 2}}, [5, 10]){code}
>
> *Actual output:*
> {code:java}
> ({'key': {'image_id': 1, 'context_id': 2}}, [1])
> ({'key': {'image_id': 2, 'context_id': 2}}, [2])
> ({'key': {'image_id': 3, 'context_id': 2}}, [3])
> ({'key': {'image_id': 4, 'context_id': 2}}, [4])
> ({'key': {'image_id': 5, 'context_id': 2}}, [5])
> ({'key': {'context_id': 2, 'image_id': 1}}, [6])
> ({'key': {'context_id': 2, 'image_id': 2}}, [7])
> ({'key': {'context_id': 2, 'image_id': 3}}, [8])
> ({'key': {'context_id': 2, 'image_id': 4}}, [9])
> ({'key': {'context_id': 2, 'image_id': 5}}, [10])
> {code}
> *Workaround:*
> {code:java}
> def _generate_key(key_object):
> # Generate an Ordered String Key only for the GroupBy
> fields = list(key_object.keys())
> fields.sort()
> return ",".join([f"{field}:{key_object[field]}" for field in fields])
> ...
> ...| "Group" >> beam.GroupBy(lambda record: _generate_key(record['key']))
> ...
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)