This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 838d030  [BEAM-6730] Refactor generate_sequence Python wrapper
     new edf37e1  Merge pull request #8249: [BEAM-6730] Refactor 
generate_sequence Python wrapper
838d030 is described below

commit 838d0300b3a2634e2cac4cd98afd5285ac9df3ed
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed Mar 13 15:11:32 2019 +0100

    [BEAM-6730] Refactor generate_sequence Python wrapper
    
    This uses `PTransform#expand` instead of extending `ExternalTransform`.
---
 .../apache_beam/io/external/generate_sequence.py   | 49 ++++++++++++++--------
 1 file changed, 32 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py 
b/sdks/python/apache_beam/io/external/generate_sequence.py
index f1b6a37..4ad6b46 100644
--- a/sdks/python/apache_beam/io/external/generate_sequence.py
+++ b/sdks/python/apache_beam/io/external/generate_sequence.py
@@ -15,45 +15,60 @@
 # limitations under the License.
 #
 
-"""
-A PTransform that provides a bounded or unbounded stream of integers.
-"""
 from __future__ import absolute_import
 
 from apache_beam import ExternalTransform
+from apache_beam import pvalue
 from apache_beam.coders import VarIntCoder
 from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
 from apache_beam.portability.api.external_transforms_pb2 import 
ExternalConfigurationPayload
+from apache_beam.transforms import ptransform
 
 
-class GenerateSequence(ExternalTransform):
+class GenerateSequence(ptransform.PTransform):
+  """
+    A PTransform that provides a bounded or unbounded stream of integers.
+  """
 
   def __init__(self, start, stop=None,
                elements_per_period=None, max_read_time=None,
                expansion_service=None):
+    super(GenerateSequence, self).__init__()
+    self._urn = 'beam:external:java:generate_sequence:v1'
+    self.start = start
+    self.stop = stop
+    self.elements_per_period = elements_per_period
+    self.max_read_time = max_read_time
+    self.expansion_service = expansion_service
+
+  def expand(self, pbegin):
+    if not isinstance(pbegin, pvalue.PBegin):
+      raise Exception("GenerateSequence must be a root transform")
+
     coder = VarIntCoder()
     coder_urn = ['beam:coder:varint:v1']
     args = {
         'start':
-            ConfigValue(
-                coder_urn=coder_urn,
-                payload=coder.encode(start))
+        ConfigValue(
+            coder_urn=coder_urn,
+            payload=coder.encode(self.start))
     }
-    if stop:
+    if self.stop:
       args['stop'] = ConfigValue(
           coder_urn=coder_urn,
-          payload=coder.encode(stop))
-    if elements_per_period:
+          payload=coder.encode(self.stop))
+    if self.elements_per_period:
       args['elements_per_period'] = ConfigValue(
           coder_urn=coder_urn,
-          payload=coder.encode(elements_per_period))
-    if max_read_time:
+          payload=coder.encode(self.elements_per_period))
+    if self.max_read_time:
       args['max_read_time'] = ConfigValue(
           coder_urn=coder_urn,
-          payload=coder.encode(max_read_time))
+          payload=coder.encode(self.max_read_time))
 
     payload = ExternalConfigurationPayload(configuration=args)
-    super(GenerateSequence, self).__init__(
-        'beam:external:java:generate_sequence:v1',
-        payload.SerializeToString(),
-        expansion_service)
+    return pbegin.apply(
+        ExternalTransform(
+            self._urn,
+            payload.SerializeToString(),
+            self.expansion_service))

Reply via email to