This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 46276b5 Change GroupIntoBatches to group for real new 8f86cec Merge pull request #12129 from aaltay/gib 46276b5 is described below commit 46276b5191bd21359c05e03b692dbcb40e688e02 Author: Ahmet Altay <al...@google.com> AuthorDate: Mon Jun 29 19:58:12 2020 -0700 Change GroupIntoBatches to group for real --- CHANGES.md | 4 +++- .../snippets/transforms/aggregation/groupintobatches_test.py | 12 ++++++------ sdks/python/apache_beam/transforms/util.py | 9 +++++++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a8aceeb..7586734 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -77,7 +77,9 @@ `{"foo": "bar", "baz": null}`, whereas an implicit null like `{"foo": "bar"}` would raise an exception. Now both JSON strings will yield the same result by default. This behavior can be overriden with `RowJson.RowJsonDeserializer#withNullBehavior`. - +* Fixed a bug in `GroupIntoBatches` experimental transform in Python to actually group batches by key. + This changes the output type for this transform ([BEAM-6696](https://issues.apache.org/jira/browse/BEAM-6696)). +* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). ## Deprecations diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py index 374401d..76c94da 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py @@ -31,14 +31,14 @@ from . import groupintobatches def check_batches_with_keys(actual): expected = '''[START batches_with_keys] -[('spring', '🍓'), ('spring', '🥕'), ('spring', '🍆')] -[('summer', '🥕'), ('summer', '🍅'), ('summer', '🌽')] -[('spring', '🍅')] -[('fall', '🥕'), ('fall', '🍅')] -[('winter', '🍆')] +('spring', ['🍓', '🥕', '🍆']) +('summer', ['🥕', '🍅', '🌽']) +('spring', ['🍅']) +('fall', ['🥕', '🍅']) +('winter', ['🍆']) [END batches_with_keys]'''.splitlines()[1:-1] assert_matches_stdout( - actual, expected, lambda batch: (batch[0][0], len(batch))) + actual, expected, lambda batch: (batch[0], len(batch[1]))) @mock.patch('apache_beam.Pipeline', TestPipeline) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index afb4192..2f68d08 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -741,6 +741,7 @@ def WithKeys(pcoll, k): @experimental() @typehints.with_input_types(Tuple[K, V]) +@typehints.with_output_types(Tuple[K, Iterable[V]]) class GroupIntoBatches(PTransform): """PTransform that batches the input into desired batch size. Elements are buffered until they are equal to batch size provided in the argument at which @@ -786,7 +787,9 @@ def _pardo_group_into_batches(batch_size, input_coder): count = count_state.read() if count >= batch_size: batch = [element for element in element_state.read()] - yield batch + key, _ = batch[0] + batch_values = [v for (k, v) in batch] + yield (key, batch_values) element_state.clear() count_state.clear() @@ -797,7 +800,9 @@ def _pardo_group_into_batches(batch_size, input_coder): count_state=DoFn.StateParam(COUNT_STATE)): batch = [element for element in element_state.read()] if batch: - yield batch + key, _ = batch[0] + batch_values = [v for (k, v) in batch] + yield (key, batch_values) element_state.clear() count_state.clear()