Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 41bf6f8b4 -> b2728cf13


Assert transform without side inputs


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c795c10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c795c10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c795c10

Branch: refs/heads/python-sdk
Commit: 1c795c10f0017766462cced1db1295c50bf2e32f
Parents: 41bf6f8
Author: Vikas Kedigehalli <vika...@google.com>
Authored: Tue Dec 27 11:09:26 2016 -0800
Committer: Vikas Kedigehalli <vika...@google.com>
Committed: Tue Dec 27 11:09:26 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/util.py | 24 +++++++++++++-----------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c795c10/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 9815996..ad63a02 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -20,10 +20,12 @@
 
 from __future__ import absolute_import
 
-from apache_beam.pvalue import AsList
-from apache_beam.transforms import core
 from apache_beam.transforms import window
-from apache_beam.transforms.core import CombinePerKey, Create, Flatten, 
GroupByKey, Map
+from apache_beam.transforms.core import CombinePerKey
+from apache_beam.transforms.core import Flatten
+from apache_beam.transforms.core import GroupByKey
+from apache_beam.transforms.core import Map
+from apache_beam.transforms.core import WindowInto
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.ptransform import ptransform_fn
 
@@ -217,17 +219,17 @@ def assert_that(actual, matcher, label='assert_that'):
     Ignored.
   """
 
-  def match(_, actual):
-    matcher(actual)
-
   class AssertThat(PTransform):
 
-    def expand(self, pipeline):
-      return pipeline | 'singleton' >> Create([None]) | Map(
-          match,
-          AsList(actual | core.WindowInto(window.GlobalWindows())))
+    def expand(self, pcoll):
+      return (pcoll
+              | WindowInto(window.GlobalWindows())
+              | "ToVoidKey" >> Map(lambda v: (None, v))
+              | "Group" >> GroupByKey()
+              | "UnKey" >> Map(lambda (k, v): v)
+              | "Match" >> Map(matcher))
 
     def default_label(self):
       return label
 
-  actual.pipeline | AssertThat()  # pylint: disable=expression-not-assigned
+  actual | AssertThat()  # pylint: disable=expression-not-assigned

Reply via email to