Repository: incubator-beam Updated Branches: refs/heads/python-sdk 41bf6f8b4 -> b2728cf13
Assert transform without side inputs Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c795c10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c795c10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c795c10 Branch: refs/heads/python-sdk Commit: 1c795c10f0017766462cced1db1295c50bf2e32f Parents: 41bf6f8 Author: Vikas Kedigehalli <vika...@google.com> Authored: Tue Dec 27 11:09:26 2016 -0800 Committer: Vikas Kedigehalli <vika...@google.com> Committed: Tue Dec 27 11:09:26 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/util.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c795c10/sdks/python/apache_beam/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 9815996..ad63a02 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -20,10 +20,12 @@ from __future__ import absolute_import -from apache_beam.pvalue import AsList -from apache_beam.transforms import core from apache_beam.transforms import window -from apache_beam.transforms.core import CombinePerKey, Create, Flatten, GroupByKey, Map +from apache_beam.transforms.core import CombinePerKey +from apache_beam.transforms.core import Flatten +from apache_beam.transforms.core import GroupByKey +from apache_beam.transforms.core import Map +from apache_beam.transforms.core import WindowInto from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.ptransform import ptransform_fn @@ -217,17 +219,17 @@ def assert_that(actual, matcher, label='assert_that'): Ignored. """ - def match(_, actual): - matcher(actual) - class AssertThat(PTransform): - def expand(self, pipeline): - return pipeline | 'singleton' >> Create([None]) | Map( - match, - AsList(actual | core.WindowInto(window.GlobalWindows()))) + def expand(self, pcoll): + return (pcoll + | WindowInto(window.GlobalWindows()) + | "ToVoidKey" >> Map(lambda v: (None, v)) + | "Group" >> GroupByKey() + | "UnKey" >> Map(lambda (k, v): v) + | "Match" >> Map(matcher)) def default_label(self): return label - actual.pipeline | AssertThat() # pylint: disable=expression-not-assigned + actual | AssertThat() # pylint: disable=expression-not-assigned