This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 767b1a8  [BEAM-7021] added ToString transform on PythonSDK
     new 7943d5f  Merge pull request #8698 from mszb/BEAM-7021
767b1a8 is described below

commit 767b1a85ccde2c6d0416cefe156f921ff70d3597
Author: Shoaib <[email protected]>
AuthorDate: Thu May 30 15:46:28 2019 +0500

    [BEAM-7021] added ToString transform on PythonSDK
---
 sdks/python/apache_beam/transforms/util.py      | 58 +++++++++++++++++++++++++
 sdks/python/apache_beam/transforms/util_test.py | 33 ++++++++++++++
 2 files changed, 91 insertions(+)

diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 43d3cf9..3876773 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -61,6 +61,7 @@ __all__ = [
     'KvSwap',
     'RemoveDuplicates',
     'Reshuffle',
+    'ToString',
     'Values',
     'WithKeys'
     ]
@@ -666,3 +667,60 @@ def WithKeys(pcoll, k):
   if callable(k):
     return pcoll | Map(lambda v: (k(v), v))
   return pcoll | Map(lambda v: (k, v))
+
+
+class ToString(object):
+  """
+  PTransform for converting a PCollection element, KV or PCollection Iterable
+  to string.
+  """
+
+  class Kvs(PTransform):
+    """
+    Transforms each element of the PCollection to a string on the key followed
+    by the specific delimiter and the value.
+    """
+
+    def __init__(self, delimiter=None, **kwargs):
+      self.delimiter = delimiter or ","
+
+    def expand(self, pcoll):
+      input_type = typehints.KV[typehints.Any, typehints.Any]
+      output_type = str
+      return (pcoll | ('%s:KeyVaueToString' % self.label >> (Map(
+          lambda x: "{}{}{}".format(x[0], self.delimiter, x[1])))
+                       .with_input_types(input_type)
+                       .with_output_types(output_type)))
+
+  class Element(PTransform):
+    """
+    Transforms each element of the PCollection to a string.
+    """
+
+    def __init__(self, delimiter=None, **kwargs):
+      self.delimiter = delimiter or ","
+
+    def expand(self, pcoll):
+      input_type = T
+      output_type = str
+      return (pcoll | ('%s:ElementToString' % self.label >> (Map(
+          lambda x: str(x)))
+                       .with_input_types(input_type)
+                       .with_output_types(output_type)))
+
+  class Iterables(PTransform):
+    """
+    Transforms each item in the iterable of the input of PCollection to a
+    string. There is no trailing delimiter.
+    """
+
+    def __init__(self, delimiter=None, **kwargs):
+      self.delimiter = delimiter or ","
+
+    def expand(self, pcoll):
+      input_type = typehints.Iterable[typehints.Any]
+      output_type = str
+      return (pcoll | ('%s:IterablesToString' % self.label >> (
+          Map(lambda x: self.delimiter.join(str(_x) for _x in x)))
+                       .with_input_types(input_type)
+                       .with_output_types(output_type)))
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
index 68a3c03..419cf7d 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -432,6 +432,39 @@ class WithKeysTest(unittest.TestCase):
     assert_that(with_keys, equal_to([(1, 1), (4, 2), (9, 3)]))
 
 
+class ToStringTest(unittest.TestCase):
+
+  def test_tostring_elements(self):
+
+    with TestPipeline() as p:
+      result = (p | beam.Create([1, 1, 2, 3]) | util.ToString.Element())
+      assert_that(result, equal_to(["1", "1", "2", "3"]))
+
+  def test_tostring_iterables(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create([("one", "two", "three"),
+                                 ("four", "five", "six")])
+                | util.ToString.Iterables())
+      assert_that(result, equal_to(["one,two,three", "four,five,six"]))
+
+  def test_tostring_iterables_with_delimeter(self):
+    with TestPipeline() as p:
+      data = [("one", "two", "three"), ("four", "five", "six")]
+      result = (p | beam.Create(data) | util.ToString.Iterables("\t"))
+      assert_that(result, equal_to(["one\ttwo\tthree", "four\tfive\tsix"]))
+
+  def test_tostring_kvs(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create([("one", 1), ("two", 2)]) | 
util.ToString.Kvs())
+      assert_that(result, equal_to(["one,1", "two,2"]))
+
+  def test_tostring_kvs_delimeter(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create([("one", 1), ("two", 2)]) |
+                util.ToString.Kvs("\t"))
+      assert_that(result, equal_to(["one\t1", "two\t2"]))
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to