This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 767b1a8 [BEAM-7021] added ToString transform on PythonSDK
new 7943d5f Merge pull request #8698 from mszb/BEAM-7021
767b1a8 is described below
commit 767b1a85ccde2c6d0416cefe156f921ff70d3597
Author: Shoaib <[email protected]>
AuthorDate: Thu May 30 15:46:28 2019 +0500
[BEAM-7021] added ToString transform on PythonSDK
---
sdks/python/apache_beam/transforms/util.py | 58 +++++++++++++++++++++++++
sdks/python/apache_beam/transforms/util_test.py | 33 ++++++++++++++
2 files changed, 91 insertions(+)
diff --git a/sdks/python/apache_beam/transforms/util.py
b/sdks/python/apache_beam/transforms/util.py
index 43d3cf9..3876773 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -61,6 +61,7 @@ __all__ = [
'KvSwap',
'RemoveDuplicates',
'Reshuffle',
+ 'ToString',
'Values',
'WithKeys'
]
@@ -666,3 +667,60 @@ def WithKeys(pcoll, k):
if callable(k):
return pcoll | Map(lambda v: (k(v), v))
return pcoll | Map(lambda v: (k, v))
+
+
+class ToString(object):
+ """
+ PTransform for converting a PCollection element, KV or PCollection Iterable
+ to string.
+ """
+
+ class Kvs(PTransform):
+ """
+ Transforms each element of the PCollection to a string on the key followed
+ by the specific delimiter and the value.
+ """
+
+ def __init__(self, delimiter=None, **kwargs):
+ self.delimiter = delimiter or ","
+
+ def expand(self, pcoll):
+ input_type = typehints.KV[typehints.Any, typehints.Any]
+ output_type = str
+ return (pcoll | ('%s:KeyVaueToString' % self.label >> (Map(
+ lambda x: "{}{}{}".format(x[0], self.delimiter, x[1])))
+ .with_input_types(input_type)
+ .with_output_types(output_type)))
+
+ class Element(PTransform):
+ """
+ Transforms each element of the PCollection to a string.
+ """
+
+ def __init__(self, delimiter=None, **kwargs):
+ self.delimiter = delimiter or ","
+
+ def expand(self, pcoll):
+ input_type = T
+ output_type = str
+ return (pcoll | ('%s:ElementToString' % self.label >> (Map(
+ lambda x: str(x)))
+ .with_input_types(input_type)
+ .with_output_types(output_type)))
+
+ class Iterables(PTransform):
+ """
+ Transforms each item in the iterable of the input of PCollection to a
+ string. There is no trailing delimiter.
+ """
+
+ def __init__(self, delimiter=None, **kwargs):
+ self.delimiter = delimiter or ","
+
+ def expand(self, pcoll):
+ input_type = typehints.Iterable[typehints.Any]
+ output_type = str
+ return (pcoll | ('%s:IterablesToString' % self.label >> (
+ Map(lambda x: self.delimiter.join(str(_x) for _x in x)))
+ .with_input_types(input_type)
+ .with_output_types(output_type)))
diff --git a/sdks/python/apache_beam/transforms/util_test.py
b/sdks/python/apache_beam/transforms/util_test.py
index 68a3c03..419cf7d 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -432,6 +432,39 @@ class WithKeysTest(unittest.TestCase):
assert_that(with_keys, equal_to([(1, 1), (4, 2), (9, 3)]))
+class ToStringTest(unittest.TestCase):
+
+ def test_tostring_elements(self):
+
+ with TestPipeline() as p:
+ result = (p | beam.Create([1, 1, 2, 3]) | util.ToString.Element())
+ assert_that(result, equal_to(["1", "1", "2", "3"]))
+
+ def test_tostring_iterables(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create([("one", "two", "three"),
+ ("four", "five", "six")])
+ | util.ToString.Iterables())
+ assert_that(result, equal_to(["one,two,three", "four,five,six"]))
+
+ def test_tostring_iterables_with_delimeter(self):
+ with TestPipeline() as p:
+ data = [("one", "two", "three"), ("four", "five", "six")]
+ result = (p | beam.Create(data) | util.ToString.Iterables("\t"))
+ assert_that(result, equal_to(["one\ttwo\tthree", "four\tfive\tsix"]))
+
+ def test_tostring_kvs(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create([("one", 1), ("two", 2)]) |
util.ToString.Kvs())
+ assert_that(result, equal_to(["one,1", "two,2"]))
+
+ def test_tostring_kvs_delimeter(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create([("one", 1), ("two", 2)]) |
+ util.ToString.Kvs("\t"))
+ assert_that(result, equal_to(["one\t1", "two\t2"]))
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()