This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 72714c2 [BEAM-2939] Add split_and_size method for SDFs.
new 0869790 Merge pull request #8235 [BEAM-2939] Add split_and_size
method for SDFs.
72714c2 is described below
commit 72714c2a4bbb399eb03bf038bc628df0ad3b4a02
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Apr 5 13:25:52 2019 +0200
[BEAM-2939] Add split_and_size method for SDFs.
---
sdks/python/apache_beam/runners/worker/bundle_processor.py | 6 +++---
sdks/python/apache_beam/transforms/core.py | 14 +++++++++-----
2 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 1a3667d..583f1bb 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -960,9 +960,9 @@ def create(*args):
def process(self, element_restriction, *args, **kwargs):
element, restriction = element_restriction
- for part in self.restriction_provider.split(element, restriction):
- yield ((element, part),
- self.restriction_provider.restriction_size(element, part))
+ for part, size in self.restriction_provider.split_and_size(
+ element, restriction):
+ yield ((element, part), size)
return _create_sdf_operation(SplitAndSizeRestrictions, *args)
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index 4639f7c..0ee672f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -196,15 +196,16 @@ class RestrictionProvider(object):
an instance of ``RestrictionProvider``.
The provided ``RestrictionProvider`` instance must provide suitable overrides
- for the following methods.
+ for the following methods:
* create_tracker()
* initial_restriction()
Optionally, ``RestrictionProvider`` may override default implementations of
- following methods.
+ following methods:
* restriction_coder()
* restriction_size()
* split()
+ * split_and_size()
** Pausing and resuming processing of an element **
@@ -254,9 +255,6 @@ class RestrictionProvider(object):
reading input element for each of the returned restrictions should be the
same as the total set of elements produced by reading the input element for
the input restriction.
-
- TODO(chamikara): give suitable hints for performing splitting, for example
- number of parts or size in bytes.
"""
yield restriction
@@ -279,6 +277,12 @@ class RestrictionProvider(object):
"""
return self.create_tracker(restriction).default_size()
+ def split_and_size(self, element, restriction):
+ """Like split, but also does sizing, returning (restriction, size) pairs.
+ """
+ for part in self.split(element, restriction):
+ yield part, self.restriction_size(element, part)
+
def get_function_arguments(obj, func):
"""Return the function arguments based on the name provided. If they have