This is an automated email from the ASF dual-hosted git repository.
anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ffc7821511d Add BagOfWords to tft data transforms (#28188)
ffc7821511d is described below
commit ffc7821511db89c87373acbc28aa3c5b08a9c87a
Author: Anand Inguva <[email protected]>
AuthorDate: Tue Aug 29 15:20:19 2023 +0000
Add BagOfWords to tft data transforms (#28188)
* add bag of words
* add Ngrams default test
* fix test
---
sdks/python/apache_beam/ml/transforms/tft.py | 59 ++++++++-
sdks/python/apache_beam/ml/transforms/tft_test.py | 142 ++++++++++++++++++++++
2 files changed, 199 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/ml/transforms/tft.py
b/sdks/python/apache_beam/ml/transforms/tft.py
index e3260d604d4..b59c1715b7d 100644
--- a/sdks/python/apache_beam/ml/transforms/tft.py
+++ b/sdks/python/apache_beam/ml/transforms/tft.py
@@ -523,8 +523,8 @@ class NGrams(TFTOperation):
columns: List[str],
split_string_by_delimiter: Optional[str] = None,
*,
- ngram_range: Tuple[int, int],
- ngrams_separator: str,
+ ngram_range: Tuple[int, int] = (1, 1),
+ ngrams_separator: Optional[str] = None,
name: Optional[str] = None):
"""
An n-gram is a contiguous sequence of n items from a given sample of text
@@ -547,6 +547,10 @@ class NGrams(TFTOperation):
self.name = name
self.split_string_by_delimiter = split_string_by_delimiter
+ if ngram_range != (1, 1) and not ngrams_separator:
+ raise ValueError(
+ 'ngrams_separator must be specified when ngram_range is not (1, 1)')
+
def apply_transform(
self, data: common_types.TensorType,
output_column_name: str) -> Dict[str, common_types.TensorType]:
@@ -555,3 +559,54 @@ class NGrams(TFTOperation):
data, self.split_string_by_delimiter)
output = tft.ngrams(data, self.ngram_range, self.ngrams_separator)
return {output_column_name: output}
+
+
+@register_input_dtype(str)
+class BagOfWords(TFTOperation):
+ def __init__(
+ self,
+ columns: List[str],
+ split_string_by_delimiter: Optional[str] = None,
+ *,
+ ngram_range: Tuple[int, int] = (1, 1),
+ ngrams_separator: Optional[str] = None,
+ name: Optional[str] = None):
+ """
+ Bag of words contains the unique words present in the input text.
+ This operation applies a bag of words transformation to specified
+ columns of incoming data. Also, the transformation accepts a Tuple of
+ integers specifying the range of n-gram sizes. The transformation
+ splits the input data into a set of consecutive n-grams if ngram_range
+ is specified. The n-grams are then converted to a bag of words.
+ Also, you can specify a seperator string that will be inserted between
+ each ngram.
+
+ Args:
+ columns: A list of column names to apply the transformation on.
+ split_string_by_delimiter: (Optional) A string that specifies the
+ delimiter to split the input strings before computing ngrams.
+ ngram_range: A tuple of integers(inclusive) specifying the range of
+ n-gram sizes.
+ seperator: A string that will be inserted between each ngram.
+ name: A name for the operation (optional).
+
+ Note that original order of the input may not be preserved.
+ """
+
+ self.columns = columns
+ self.ngram_range = ngram_range
+ self.ngrams_separator = ngrams_separator
+ self.name = name
+ self.split_string_by_delimiter = split_string_by_delimiter
+
+ if ngram_range != (1, 1) and not ngrams_separator:
+ raise ValueError(
+ 'ngrams_separator must be specified when ngram_range is not (1, 1)')
+
+ def apply_transform(self, data: tf.SparseTensor, output_col_name: str):
+ if self.split_string_by_delimiter:
+ data = self._split_string_with_delimiter(
+ data, self.split_string_by_delimiter)
+ output = tft.bag_of_words(
+ data, self.ngram_range, self.ngrams_separator, self.name)
+ return {output_col_name: output}
diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py
b/sdks/python/apache_beam/ml/transforms/tft_test.py
index eff366cf330..9be41895701 100644
--- a/sdks/python/apache_beam/ml/transforms/tft_test.py
+++ b/sdks/python/apache_beam/ml/transforms/tft_test.py
@@ -499,6 +499,26 @@ class NGramsTest(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.artifact_location)
+ def test_ngrams_on_list_separated_words_default_args(self):
+ data = [{
+ 'x': ['I', 'like', 'pie'],
+ }, {
+ 'x': ['yum', 'yum', 'pie']
+ }]
+ with beam.Pipeline() as p:
+ result = (
+ p
+ | "Create" >> beam.Create(data)
+ | "MLTransform" >> base.MLTransform(
+ write_artifact_location=self.artifact_location,
+ transforms=[tft.NGrams(columns=['x'])]))
+ result = result | beam.Map(lambda x: x.x)
+ expected_data = [
+ np.array([b'I', b'like', b'pie'], dtype=object),
+ np.array([b'yum', b'yum', b'pie'], dtype=object)
+ ]
+ assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
def test_ngrams_on_list_separated_words(self):
data = [{
'x': ['I', 'like', 'pie'],
@@ -589,5 +609,127 @@ class NGramsTest(unittest.TestCase):
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+class BagOfWordsTest(unittest.TestCase):
+ def setUp(self) -> None:
+ self.artifact_location = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.artifact_location)
+
+ def test_bag_of_words_on_list_seperated_words_default_ngrams(self):
+ data = [{
+ 'x': ['I', 'like', 'pie', 'pie', 'pie'],
+ }, {
+ 'x': ['yum', 'yum', 'pie']
+ }]
+
+ with beam.Pipeline() as p:
+ result = (
+ p
+ | "Create" >> beam.Create(data)
+ | "MLTransform" >> base.MLTransform(
+ write_artifact_location=self.artifact_location,
+ transforms=[tft.BagOfWords(columns=['x'])]))
+ result = result | beam.Map(lambda x: x.x)
+
+ expected_data = [
+ np.array([b'I', b'like', b'pie'], dtype=object),
+ np.array([b'yum', b'pie'], dtype=object)
+ ]
+ assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
+ def test_bag_of_words_on_list_seperated_words_custom_ngrams(self):
+ data = [{
+ 'x': ['I', 'like', 'pie', 'I', 'like', 'pie'],
+ }, {
+ 'x': ['yum', 'yum', 'pie']
+ }]
+ with beam.Pipeline() as p:
+ result = (
+ p
+ | "Create" >> beam.Create(data)
+ | "MLTransform" >> base.MLTransform(
+ write_artifact_location=self.artifact_location,
+ transforms=[
+ tft.BagOfWords(
+ columns=['x'], ngram_range=(1, 3), ngrams_separator=' ')
+ ]))
+ result = result | beam.Map(lambda x: x.x)
+
+ expected_data = [[
+ b'I',
+ b'I like',
+ b'I like pie',
+ b'like',
+ b'like pie',
+ b'like pie I',
+ b'pie',
+ b'pie I',
+ b'pie I like'
+ ], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
+ assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
+ def test_bag_of_words_on_numpy_data(self):
+ data = [{
+ 'x': np.array(['I', 'like', 'pie', 'I', 'like', 'pie'], dtype=object),
+ }, {
+ 'x': np.array(['yum', 'yum', 'pie'], dtype=object)
+ }]
+ with beam.Pipeline() as p:
+ result = (
+ p
+ | "Create" >> beam.Create(data)
+ | "MLTransform" >> base.MLTransform(
+ write_artifact_location=self.artifact_location,
+ transforms=[
+ tft.BagOfWords(
+ columns=['x'], ngram_range=(1, 3), ngrams_separator=' ')
+ ]))
+ result = result | beam.Map(lambda x: x.x)
+
+ expected_data = [[
+ b'I',
+ b'I like',
+ b'I like pie',
+ b'like',
+ b'like pie',
+ b'like pie I',
+ b'pie',
+ b'pie I',
+ b'pie I like'
+ ], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
+ assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
+ def test_bag_of_words_on_by_splitting_input_text(self):
+ data = [{'x': 'I like pie I like pie'}, {'x': 'yum yum pie'}]
+ with beam.Pipeline() as p:
+ result = (
+ p
+ | "Create" >> beam.Create(data)
+ | "MLTransform" >> base.MLTransform(
+ write_artifact_location=self.artifact_location,
+ transforms=[
+ tft.BagOfWords(
+ columns=['x'],
+ split_string_by_delimiter=' ',
+ ngram_range=(1, 3),
+ ngrams_separator=' ')
+ ]))
+ result = result | beam.Map(lambda x: x.x)
+
+ expected_data = [[
+ b'I',
+ b'I like',
+ b'I like pie',
+ b'like',
+ b'like pie',
+ b'like pie I',
+ b'pie',
+ b'pie I',
+ b'pie I like'
+ ], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
+ assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+
+
if __name__ == '__main__':
unittest.main()