AnandInguva commented on code in PR #30146:
URL: https://github.com/apache/beam/pull/30146#discussion_r1486198651


##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -83,20 +85,48 @@
 tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]
 
 
+class DataCoder:
+  def __init__(
+      self,
+      exclude_columns,
+      coder=coders.registry.get_coder(Any),
+  ):
+    """
+    Uses PickleCoder to encode/decode the dictonaries.
+    Args:
+      exclude_columns: list of columns to exclude from the encoding.
+    """
+    self.coder = coder
+    self.exclude_columns = exclude_columns
+
+  def encode(self, element):
+    data_to_encode = element.copy()
+    for key in self.exclude_columns:
+      if key in data_to_encode:
+        del data_to_encode[key]
+    element[_TEMP_KEY] = self.coder.encode(data_to_encode)
+    return element
+
+  def decode(self, element):
+    clone = copy.copy(element)
+    clone.update(self.coder.decode(clone[_TEMP_KEY].item()))

Review Comment:
   Type of clone[_TEMP_KEY] is a numpy array and .item() returns underlying 
element of the numpy array. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to