tvalentyn commented on code in PR #30146:
URL: https://github.com/apache/beam/pull/30146#discussion_r1473426188


##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -83,20 +84,53 @@
 tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]
 
 
+# alternatie: Use a single class for both encoding and decoding and
+# use beam.Map() instead of DoFns?
+class _EncodeDict(beam.DoFn):
+  """
+  Encode a dictionary into bytes and pass it along with the original element
+  using a temporary key.
+
+  Internal use only. No backward compatibility guarantees.
+  """
+  def __init__(self, exclude_columns=None):
+    self._exclude_columns = exclude_columns
+
+  def process(self, element: Dict[str, Any]):
+    data_to_encode = element.copy()
+    for key in self._exclude_columns:
+      if key in data_to_encode:
+        del data_to_encode[key]
+
+    bytes = pickler.dumps(data_to_encode)
+    element[_TEMP_KEY] = bytes
+    yield element
+
+
+class _DecodeDict(beam.DoFn):
+  """
+  Used to decode the dictionary from bytes(encoded using _EncodeDict)
+  Internal use only. No backward compatibility guarantees.
+  """
+  def process(self, element):
+    element.update(pickler.loads(element[_TEMP_KEY].item()))
+    del element[_TEMP_KEY]

Review Comment:
   rationale: 
https://beam.apache.org/documentation/programming-guide/#immutability



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to