This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 838d030 [BEAM-6730] Refactor generate_sequence Python wrapper
new edf37e1 Merge pull request #8249: [BEAM-6730] Refactor
generate_sequence Python wrapper
838d030 is described below
commit 838d0300b3a2634e2cac4cd98afd5285ac9df3ed
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed Mar 13 15:11:32 2019 +0100
[BEAM-6730] Refactor generate_sequence Python wrapper
This uses `PTransform#expand` instead of extending `ExternalTransform`.
---
.../apache_beam/io/external/generate_sequence.py | 49 ++++++++++++++--------
1 file changed, 32 insertions(+), 17 deletions(-)
diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py
b/sdks/python/apache_beam/io/external/generate_sequence.py
index f1b6a37..4ad6b46 100644
--- a/sdks/python/apache_beam/io/external/generate_sequence.py
+++ b/sdks/python/apache_beam/io/external/generate_sequence.py
@@ -15,45 +15,60 @@
# limitations under the License.
#
-"""
-A PTransform that provides a bounded or unbounded stream of integers.
-"""
from __future__ import absolute_import
from apache_beam import ExternalTransform
+from apache_beam import pvalue
from apache_beam.coders import VarIntCoder
from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
from apache_beam.portability.api.external_transforms_pb2 import
ExternalConfigurationPayload
+from apache_beam.transforms import ptransform
-class GenerateSequence(ExternalTransform):
+class GenerateSequence(ptransform.PTransform):
+ """
+ A PTransform that provides a bounded or unbounded stream of integers.
+ """
def __init__(self, start, stop=None,
elements_per_period=None, max_read_time=None,
expansion_service=None):
+ super(GenerateSequence, self).__init__()
+ self._urn = 'beam:external:java:generate_sequence:v1'
+ self.start = start
+ self.stop = stop
+ self.elements_per_period = elements_per_period
+ self.max_read_time = max_read_time
+ self.expansion_service = expansion_service
+
+ def expand(self, pbegin):
+ if not isinstance(pbegin, pvalue.PBegin):
+ raise Exception("GenerateSequence must be a root transform")
+
coder = VarIntCoder()
coder_urn = ['beam:coder:varint:v1']
args = {
'start':
- ConfigValue(
- coder_urn=coder_urn,
- payload=coder.encode(start))
+ ConfigValue(
+ coder_urn=coder_urn,
+ payload=coder.encode(self.start))
}
- if stop:
+ if self.stop:
args['stop'] = ConfigValue(
coder_urn=coder_urn,
- payload=coder.encode(stop))
- if elements_per_period:
+ payload=coder.encode(self.stop))
+ if self.elements_per_period:
args['elements_per_period'] = ConfigValue(
coder_urn=coder_urn,
- payload=coder.encode(elements_per_period))
- if max_read_time:
+ payload=coder.encode(self.elements_per_period))
+ if self.max_read_time:
args['max_read_time'] = ConfigValue(
coder_urn=coder_urn,
- payload=coder.encode(max_read_time))
+ payload=coder.encode(self.max_read_time))
payload = ExternalConfigurationPayload(configuration=args)
- super(GenerateSequence, self).__init__(
- 'beam:external:java:generate_sequence:v1',
- payload.SerializeToString(),
- expansion_service)
+ return pbegin.apply(
+ ExternalTransform(
+ self._urn,
+ payload.SerializeToString(),
+ self.expansion_service))