Remove pipeline.apply(pvalue, callable)
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e68eb05e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e68eb05e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e68eb05e Branch: refs/heads/python-sdk Commit: e68eb05e5ffa366f83478807bbac3af83ea8cda5 Parents: 5daab7f Author: Robert Bradshaw <rober...@google.com> Authored: Mon Jul 18 10:23:24 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Jul 18 17:46:16 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/dataflow_test.py | 5 +++-- sdks/python/apache_beam/pipeline.py | 22 +++----------------- sdks/python/apache_beam/pipeline_test.py | 18 ---------------- .../python/apache_beam/transforms/ptransform.py | 2 ++ 4 files changed, 8 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/dataflow_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index c4933af..9bbb5ff 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -23,6 +23,7 @@ import logging import re import unittest +import apache_beam as beam from apache_beam.pipeline import Pipeline from apache_beam.pvalue import AsDict from apache_beam.pvalue import AsIter as AllOf @@ -51,8 +52,8 @@ class DataflowTest(unittest.TestCase): # TODO(silviuc): Figure out a nice way to specify labels for stages so that # internal steps get prepended with surorunding stage names. - @staticmethod - def Count(pcoll): # pylint: disable=invalid-name + @beam.ptransform_fn + def Count(pcoll): # pylint: disable=invalid-name, no-self-argument """A Count transform: v, ... => (v, n), ...""" return (pcoll | Map('AddCount', lambda x: (x, 1)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 012d4d9..bc1feb2 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -47,7 +47,6 @@ import logging import os import shutil import tempfile -import types from apache_beam import pvalue from apache_beam import typehints @@ -187,20 +186,17 @@ class Pipeline(object): """Applies a custom transform using the pvalueish specified. Args: - transform: the PTranform (or callable) to apply. + transform: the PTranform to apply. pvalueish: the input for the PTransform (typically a PCollection). Raises: TypeError: if the transform object extracted from the argument list is - not a callable type or a descendant from PTransform. + not a PTransform. RuntimeError: if the transform object was already applied to this pipeline and needs to be cloned in order to apply again. """ if not isinstance(transform, ptransform.PTransform): - if isinstance(transform, (type, types.ClassType)): - raise TypeError("%s is not a PTransform instance, did you mean %s()?" - % (transform, transform.__name__)) - transform = _CallableWrapperPTransform(transform) + raise TypeError("Expected a PTransform object, got %s" % transform) full_label = format_full_label(self._current_transform(), transform) if full_label in self.applied_labels: @@ -286,18 +282,6 @@ class Pipeline(object): return pvalueish_result -class _CallableWrapperPTransform(ptransform.PTransform): - - def __init__(self, callee): - assert callable(callee) - super(_CallableWrapperPTransform, self).__init__( - label=getattr(callee, '__name__', 'Callable')) - self._callee = callee - - def apply(self, *args, **kwargs): - return self._callee(*args, **kwargs) - - class PipelineVisitor(object): """Visitor pattern class used to traverse a DAG of transforms. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 8598737..04cd2ee 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -32,7 +32,6 @@ from apache_beam.transforms import Create from apache_beam.transforms import FlatMap from apache_beam.transforms import Flatten from apache_beam.transforms import Map -from apache_beam.transforms import GroupByKey from apache_beam.transforms import PTransform from apache_beam.transforms import Read from apache_beam.transforms.util import assert_that, equal_to @@ -172,23 +171,6 @@ class PipelineTest(unittest.TestCase): assert_that(result2, equal_to([5, 6, 7]), label='r2') pipeline.run() - def test_apply_custom_callable(self): - pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Create('pcoll', [1, 2, 3]) - result = pcoll | PipelineTest.custom_callable - assert_that(result, equal_to([2, 3, 4])) - pipeline.run() - - def test_apply_custom_callable_error(self): - pipeline = Pipeline(self.runner_name) - pcoll = pipeline | Create('pcoll', [1, 2, 3]) - with self.assertRaises(TypeError) as cm: - pcoll | GroupByKey # Note the missing ()'s - self.assertEqual( - cm.exception.message, - "<class 'apache_beam.transforms.core.GroupByKey'> is not " - "a PTransform instance, did you mean GroupByKey()?") - def test_transform_no_super_init(self): class AddSuffix(PTransform): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 04dd9b3..1457bec 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -601,6 +601,8 @@ class CallablePTransform(PTransform): # is called (and __call__ invoked) we will have all the information # needed to initialize the super class. self.fn = fn + self._args = () + self._kwargs = {} def __call__(self, *args, **kwargs): if args and args[0] is None: