AnandInguva commented on code in PR #26795:
URL: https://github.com/apache/beam/pull/26795#discussion_r1238985850


##########
sdks/python/apache_beam/ml/transforms/tft_transforms.py:
##########
@@ -0,0 +1,416 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module defines a set of data processing transforms that can be used
+to perform common data transformations on a dataset. These transforms are
+implemented using the TensorFlow Transform (TFT) library. The transforms
+in this module are intended to be used in conjunction with the
+beam.ml.MLTransform class, which provides a convenient interface for
+applying a sequence of data processing transforms to a dataset with the
+help of the ProcessHandler class.
+
+See the documentation for beam.ml.MLTransform for more details.
+
+Since the transforms in this module are implemented using TFT, they
+should be wrapped inside a TFTProcessHandler object before being passed
+to the beam.ml.MLTransform class. The ProcessHandler will let MLTransform
+know which type of input is expected and infers the relevant schema required
+for the TFT library.
+
+Note: The data processing transforms defined in this module don't
+perform the transformation immediately. Instead, it returns a
+configured operation object, which encapsulates the details of the
+transformation. The actual computation takes place later in the Apache Beam
+pipeline, after all transformations are set up and the pipeline is run.
+"""
+
+import logging
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Union
+
+from apache_beam.ml.transforms.base import BaseOperation
+import tensorflow as tf
+import tensorflow_transform as tft
+from tensorflow_transform import analyzers
+from tensorflow_transform import common_types
+from tensorflow_transform import tf_utils
+
+__all__ = [
+    'ComputeAndApplyVocabulary',
+    'Scale_To_ZScore',
+    'Scale_To_0_1',
+    'ApplyBuckets',
+    'Bucketize'
+]
+
+
+class TFTOperation(BaseOperation):
+  def __init__(self, columns: List[str], **kwargs):
+    """
+    Base Opertation class for all the TFT operations.
+    """
+    self.columns = columns
+    self._kwargs = kwargs
+
+    if not columns:
+      raise RuntimeError(
+          "Columns are not specified. Please specify the column for the "
+          " op %s" % self)
+
+  def validate_args(self):
+    raise NotImplementedError
+
+  def get_artifacts(self, data: common_types.TensorType,
+                    col_name) -> Optional[Dict[str, tf.Tensor]]:
+    return None
+
+
+class ComputeAndApplyVocabulary(TFTOperation):
+  def __init__(
+      self,
+      columns: List[str],
+      *,
+      default_value: Any = -1,
+      top_k: Optional[int] = None,
+      frequency_threshold: Optional[int] = None,
+      num_oov_buckets: int = 0,
+      vocab_filename: Optional[str] = None,
+      name: Optional[str] = None,
+      **kwargs):
+    """
+    This function computes the vocabulary for the given columns of incoming
+    data. The transformation converts the input values to indices of the
+    vocabulary.
+
+    Args:
+      columns: List of column names to apply the transformation.
+      default_value: (Optional) The value to use for out-of-vocabulary values.
+      top_k: (Optional) The number of most frequent tokens to keep.
+      frequency_threshold: (Optional) Limit the generated vocabulary only to
+        elements whose absolute frequency is >= to the supplied threshold.
+        If set to None, the full vocabulary is generated.
+      num_oov_buckets:  Any lookup of an out-of-vocabulary token will return a
+        bucket ID based on its hash if `num_oov_buckets` is greater than zero.
+        Otherwise it is assigned the `default_value`.
+      vocab_filename: The file name for the vocabulary file. If None,
+        a name based on the scope name in the context of this graph will
+        be used as the file name. If not None, should be unique within
+        a given preprocessing function.
+        NOTE in order to make your pipelines resilient to implementation
+        details please set `vocab_filename` when you are using
+        the vocab_filename on a downstream component.
+    """
+    super().__init__(columns, **kwargs)
+    self._default_value = default_value
+    self._top_k = top_k
+    self._frequency_threshold = frequency_threshold
+    self._num_oov_buckets = num_oov_buckets
+    self._vocab_filename = vocab_filename
+    self._name = name
+
+  def apply(self, data: common_types.TensorType,
+            output_column_name: str) -> Dict[str, common_types.TensorType]:
+    # TODO: Pending outputting artifact.
+    return {
+        output_column_name: tft.compute_and_apply_vocabulary(
+            x=data, **self._kwargs)
+    }
+
+  def __str__(self):
+    return "compute_and_apply_vocabulary"
+
+
+class Scale_To_ZScore(TFTOperation):
+  def __init__(
+      self,
+      columns: List[str],
+      *,
+      elementwise: bool = False,
+      name: Optional[str] = None,
+      **kwargs):
+    """
+    This function performs a scaling transformation on the specified columns of
+    the incoming data. It processes the input data such that it's normalized
+    to have a mean of 0 and a variance of 1. The transformation achieves this
+    by subtracting the mean from the input data and then dividing it by the
+    square root of the variance.
+
+    Args:
+      columns: A list of column names to apply the transformation on.
+      elementwise: If True, the transformation is applied elementwise.
+        Otherwise, the transformation is applied on the entire column.
+      name: A name for the operation (optional).
+
+    scale_to_z_score also outputs additional artifacts. The artifacts are
+    mean, which is the mean value in the column, and var, which is the
+    variance in the column. The artifacts are stored in the column
+    named with the suffix <original_col_name>_mean and <original_col_name>_var
+    respectively.
+    """
+    super().__init__(columns, **kwargs)
+    self.elementwise = elementwise
+    self.name = name
+
+  def apply(self, data: common_types.TensorType,
+            output_column_name: str) -> Dict[str, common_types.TensorType]:
+    artifacts = self.get_artifacts(data, output_column_name)
+    output = {output_column_name: tft.scale_to_z_score(x=data, **self._kwargs)}
+    output_dict = {output_column_name: output}
+    if artifacts is not None:
+      output_dict.update(artifacts)
+    return output_dict
+
+  def get_artifacts(self, data: common_types.TensorType,
+                    col_name: str) -> Dict[str, tf.Tensor]:
+    mean_var = tft.analyzers._mean_and_var(data)
+    shape = [tf.shape(data)[0], 1]
+    return {
+        col_name + '_mean': tf.broadcast_to(mean_var[0], shape),
+        col_name + '_var': tf.broadcast_to(mean_var[1], shape),
+    }
+
+  def __str__(self):
+    return "scale_to_z_score"
+
+
+class Scale_To_0_1(TFTOperation):
+  def __init__(
+      self,
+      columns: List[str],
+      elementwise: bool = False,
+      name: Optional[str] = None,
+      **kwargs):
+    """
+    This function applies a scaling transformation on the given columns
+    of incoming data. The transformation scales the input values to the
+    range [0, 1] by dividing each value by the maximum value in the
+    column.
+
+    Args:
+      columns: A list of column names to apply the transformation on.
+      elementwise: If True, the transformation is applied elementwise.

Review Comment:
   From TFT documentation on tft.scale_to_0_1
   
   If true, scale each element of the tensor independently.
   
   elementwise: IIUC, if true, it calculates scale_to_0_1 on entire dataset. if 
false, it calculates on individual tensors.
   
   I am just passing it if any user might be interested. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to