This is an automated email from the ASF dual-hosted git repository.

anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ffc7821511d Add BagOfWords to tft data transforms (#28188)
ffc7821511d is described below

commit ffc7821511db89c87373acbc28aa3c5b08a9c87a
Author: Anand Inguva <[email protected]>
AuthorDate: Tue Aug 29 15:20:19 2023 +0000

    Add BagOfWords to tft data transforms (#28188)
    
    * add bag of words
    
    * add Ngrams default test
    
    * fix test
---
 sdks/python/apache_beam/ml/transforms/tft.py      |  59 ++++++++-
 sdks/python/apache_beam/ml/transforms/tft_test.py | 142 ++++++++++++++++++++++
 2 files changed, 199 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/ml/transforms/tft.py 
b/sdks/python/apache_beam/ml/transforms/tft.py
index e3260d604d4..b59c1715b7d 100644
--- a/sdks/python/apache_beam/ml/transforms/tft.py
+++ b/sdks/python/apache_beam/ml/transforms/tft.py
@@ -523,8 +523,8 @@ class NGrams(TFTOperation):
       columns: List[str],
       split_string_by_delimiter: Optional[str] = None,
       *,
-      ngram_range: Tuple[int, int],
-      ngrams_separator: str,
+      ngram_range: Tuple[int, int] = (1, 1),
+      ngrams_separator: Optional[str] = None,
       name: Optional[str] = None):
     """
     An n-gram is a contiguous sequence of n items from a given sample of text
@@ -547,6 +547,10 @@ class NGrams(TFTOperation):
     self.name = name
     self.split_string_by_delimiter = split_string_by_delimiter
 
+    if ngram_range != (1, 1) and not ngrams_separator:
+      raise ValueError(
+          'ngrams_separator must be specified when ngram_range is not (1, 1)')
+
   def apply_transform(
       self, data: common_types.TensorType,
       output_column_name: str) -> Dict[str, common_types.TensorType]:
@@ -555,3 +559,54 @@ class NGrams(TFTOperation):
           data, self.split_string_by_delimiter)
     output = tft.ngrams(data, self.ngram_range, self.ngrams_separator)
     return {output_column_name: output}
+
+
+@register_input_dtype(str)
+class BagOfWords(TFTOperation):
+  def __init__(
+      self,
+      columns: List[str],
+      split_string_by_delimiter: Optional[str] = None,
+      *,
+      ngram_range: Tuple[int, int] = (1, 1),
+      ngrams_separator: Optional[str] = None,
+      name: Optional[str] = None):
+    """
+    Bag of words contains the unique words present in the input text.
+    This operation applies a bag of words transformation to specified
+    columns of incoming data. Also, the transformation accepts a Tuple of
+    integers specifying the range of n-gram sizes. The transformation
+    splits the input data into a set of consecutive n-grams if ngram_range
+    is specified. The n-grams are then converted to a bag of words.
+    Also, you can specify a seperator string that will be inserted between
+    each ngram.
+
+    Args:
+      columns: A list of column names to apply the transformation on.
+      split_string_by_delimiter: (Optional) A string that specifies the
+        delimiter to split the input strings before computing ngrams.
+      ngram_range: A tuple of integers(inclusive) specifying the range of
+        n-gram sizes.
+      seperator: A string that will be inserted between each ngram.
+      name: A name for the operation (optional).
+
+    Note that original order of the input may not be preserved.
+    """
+
+    self.columns = columns
+    self.ngram_range = ngram_range
+    self.ngrams_separator = ngrams_separator
+    self.name = name
+    self.split_string_by_delimiter = split_string_by_delimiter
+
+    if ngram_range != (1, 1) and not ngrams_separator:
+      raise ValueError(
+          'ngrams_separator must be specified when ngram_range is not (1, 1)')
+
+  def apply_transform(self, data: tf.SparseTensor, output_col_name: str):
+    if self.split_string_by_delimiter:
+      data = self._split_string_with_delimiter(
+          data, self.split_string_by_delimiter)
+    output = tft.bag_of_words(
+        data, self.ngram_range, self.ngrams_separator, self.name)
+    return {output_col_name: output}
diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py 
b/sdks/python/apache_beam/ml/transforms/tft_test.py
index eff366cf330..9be41895701 100644
--- a/sdks/python/apache_beam/ml/transforms/tft_test.py
+++ b/sdks/python/apache_beam/ml/transforms/tft_test.py
@@ -499,6 +499,26 @@ class NGramsTest(unittest.TestCase):
   def tearDown(self):
     shutil.rmtree(self.artifact_location)
 
+  def test_ngrams_on_list_separated_words_default_args(self):
+    data = [{
+        'x': ['I', 'like', 'pie'],
+    }, {
+        'x': ['yum', 'yum', 'pie']
+    }]
+    with beam.Pipeline() as p:
+      result = (
+          p
+          | "Create" >> beam.Create(data)
+          | "MLTransform" >> base.MLTransform(
+              write_artifact_location=self.artifact_location,
+              transforms=[tft.NGrams(columns=['x'])]))
+      result = result | beam.Map(lambda x: x.x)
+      expected_data = [
+          np.array([b'I', b'like', b'pie'], dtype=object),
+          np.array([b'yum', b'yum', b'pie'], dtype=object)
+      ]
+      assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
   def test_ngrams_on_list_separated_words(self):
     data = [{
         'x': ['I', 'like', 'pie'],
@@ -589,5 +609,127 @@ class NGramsTest(unittest.TestCase):
       assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
 
 
+class BagOfWordsTest(unittest.TestCase):
+  def setUp(self) -> None:
+    self.artifact_location = tempfile.mkdtemp()
+
+  def tearDown(self):
+    shutil.rmtree(self.artifact_location)
+
+  def test_bag_of_words_on_list_seperated_words_default_ngrams(self):
+    data = [{
+        'x': ['I', 'like', 'pie', 'pie', 'pie'],
+    }, {
+        'x': ['yum', 'yum', 'pie']
+    }]
+
+    with beam.Pipeline() as p:
+      result = (
+          p
+          | "Create" >> beam.Create(data)
+          | "MLTransform" >> base.MLTransform(
+              write_artifact_location=self.artifact_location,
+              transforms=[tft.BagOfWords(columns=['x'])]))
+      result = result | beam.Map(lambda x: x.x)
+
+      expected_data = [
+          np.array([b'I', b'like', b'pie'], dtype=object),
+          np.array([b'yum', b'pie'], dtype=object)
+      ]
+      assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
+  def test_bag_of_words_on_list_seperated_words_custom_ngrams(self):
+    data = [{
+        'x': ['I', 'like', 'pie', 'I', 'like', 'pie'],
+    }, {
+        'x': ['yum', 'yum', 'pie']
+    }]
+    with beam.Pipeline() as p:
+      result = (
+          p
+          | "Create" >> beam.Create(data)
+          | "MLTransform" >> base.MLTransform(
+              write_artifact_location=self.artifact_location,
+              transforms=[
+                  tft.BagOfWords(
+                      columns=['x'], ngram_range=(1, 3), ngrams_separator=' ')
+              ]))
+      result = result | beam.Map(lambda x: x.x)
+
+      expected_data = [[
+          b'I',
+          b'I like',
+          b'I like pie',
+          b'like',
+          b'like pie',
+          b'like pie I',
+          b'pie',
+          b'pie I',
+          b'pie I like'
+      ], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
+      assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
+  def test_bag_of_words_on_numpy_data(self):
+    data = [{
+        'x': np.array(['I', 'like', 'pie', 'I', 'like', 'pie'], dtype=object),
+    }, {
+        'x': np.array(['yum', 'yum', 'pie'], dtype=object)
+    }]
+    with beam.Pipeline() as p:
+      result = (
+          p
+          | "Create" >> beam.Create(data)
+          | "MLTransform" >> base.MLTransform(
+              write_artifact_location=self.artifact_location,
+              transforms=[
+                  tft.BagOfWords(
+                      columns=['x'], ngram_range=(1, 3), ngrams_separator=' ')
+              ]))
+      result = result | beam.Map(lambda x: x.x)
+
+      expected_data = [[
+          b'I',
+          b'I like',
+          b'I like pie',
+          b'like',
+          b'like pie',
+          b'like pie I',
+          b'pie',
+          b'pie I',
+          b'pie I like'
+      ], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
+      assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
+  def test_bag_of_words_on_by_splitting_input_text(self):
+    data = [{'x': 'I like pie I like pie'}, {'x': 'yum yum pie'}]
+    with beam.Pipeline() as p:
+      result = (
+          p
+          | "Create" >> beam.Create(data)
+          | "MLTransform" >> base.MLTransform(
+              write_artifact_location=self.artifact_location,
+              transforms=[
+                  tft.BagOfWords(
+                      columns=['x'],
+                      split_string_by_delimiter=' ',
+                      ngram_range=(1, 3),
+                      ngrams_separator=' ')
+              ]))
+      result = result | beam.Map(lambda x: x.x)
+
+      expected_data = [[
+          b'I',
+          b'I like',
+          b'I like pie',
+          b'like',
+          b'like pie',
+          b'like pie I',
+          b'pie',
+          b'pie I',
+          b'pie I like'
+      ], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
+      assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
+
 if __name__ == '__main__':
   unittest.main()

Reply via email to