This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9f582e564fe Add key param support for helper Combiners (#25895)
9f582e564fe is described below
commit 9f582e564fededdd4506157a21bef66ada303529
Author: harrisonlimh <[email protected]>
AuthorDate: Wed Mar 29 14:08:31 2023 -0700
Add key param support for helper Combiners (#25895)
Co-authored-by: tvalentyn <[email protected]>
---
CHANGES.md | 1 +
sdks/python/apache_beam/transforms/combiners.py | 20 ++++++++++----------
sdks/python/apache_beam/transforms/combiners_test.py | 14 ++++++++++++++
3 files changed, 25 insertions(+), 10 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 0f54d233ad6..5e6c99f4adb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -77,6 +77,7 @@
* If a main session fails to load, the pipeline will now fail at worker
startup. ([#25401](https://github.com/apache/beam/issues/25401)).
* Python pipeline options will now ignore unparsed command line flags prefixed
with a single dash. ([#25943](https://github.com/apache/beam/issues/25943)).
+* The SmallestPerKey combiner now requires keyword-only arguments for
specifying optional parameters, such as `key` and `reverse`.
([#25888](https://github.com/apache/beam/issues/25888)).
## Deprecations
diff --git a/sdks/python/apache_beam/transforms/combiners.py
b/sdks/python/apache_beam/transforms/combiners.py
index 7afa4e46e75..0e5f01196e6 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -293,33 +293,33 @@ class Top(object):
@staticmethod
@ptransform.ptransform_fn
- def Largest(pcoll, n, has_defaults=True):
+ def Largest(pcoll, n, has_defaults=True, key=None):
"""Obtain a list of the greatest N elements in a PCollection."""
if has_defaults:
- return pcoll | Top.Of(n)
+ return pcoll | Top.Of(n, key)
else:
- return pcoll | Top.Of(n).without_defaults()
+ return pcoll | Top.Of(n, key).without_defaults()
@staticmethod
@ptransform.ptransform_fn
- def Smallest(pcoll, n, has_defaults=True):
+ def Smallest(pcoll, n, has_defaults=True, key=None):
"""Obtain a list of the least N elements in a PCollection."""
if has_defaults:
- return pcoll | Top.Of(n, reverse=True)
+ return pcoll | Top.Of(n, key, reverse=True)
else:
- return pcoll | Top.Of(n, reverse=True).without_defaults()
+ return pcoll | Top.Of(n, key, reverse=True).without_defaults()
@staticmethod
@ptransform.ptransform_fn
- def LargestPerKey(pcoll, n):
+ def LargestPerKey(pcoll, n, key=None):
"""Identifies the N greatest elements associated with each key."""
- return pcoll | Top.PerKey(n)
+ return pcoll | Top.PerKey(n, key)
@staticmethod
@ptransform.ptransform_fn
- def SmallestPerKey(pcoll, n, reverse=True):
+ def SmallestPerKey(pcoll, n, *, key=None, reverse=None):
"""Identifies the N least elements associated with each key."""
- return pcoll | Top.PerKey(n, reverse=True)
+ return pcoll | Top.PerKey(n, key, reverse=True)
@with_input_types(T)
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py
b/sdks/python/apache_beam/transforms/combiners_test.py
index 34fb463d00a..385b3332e0c 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -191,6 +191,20 @@ class CombineTest(unittest.TestCase):
| combine.Top.Of(3, key=len, reverse=True),
[['c', 'aa', 'bbb']])
+ self.assertEqual(['xc', 'zb', 'yd', 'wa']
+ | combine.Top.Largest(3, key=lambda x: x[-1]),
+ [['yd', 'xc', 'zb']])
+ self.assertEqual(['xc', 'zb', 'yd', 'wa']
+ | combine.Top.Smallest(3, key=lambda x: x[-1]),
+ [['wa', 'zb', 'xc']])
+
+ self.assertEqual([('a', x) for x in [1, 2, 3, 4, 1, 1]]
+ | combine.Top.LargestPerKey(3, key=lambda x: -x),
+ [('a', [1, 1, 1])])
+ self.assertEqual([('a', x) for x in [1, 2, 3, 4, 1, 1]]
+ | combine.Top.SmallestPerKey(3, key=lambda x: -x),
+ [('a', [4, 3, 2])])
+
def test_sharded_top_combine_fn(self):
def test_combine_fn(combine_fn, shards, expected):
accumulators = [