tvalentyn commented on code in PR #30146:
URL: https://github.com/apache/beam/pull/30146#discussion_r1473554003


##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -83,20 +84,53 @@
 tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]
 
 
+# alternatie: Use a single class for both encoding and decoding and
+# use beam.Map() instead of DoFns?
+class _EncodeDict(beam.DoFn):
+  """
+  Encode a dictionary into bytes and pass it along with the original element
+  using a temporary key.
+
+  Internal use only. No backward compatibility guarantees.
+  """
+  def __init__(self, exclude_columns=None):
+    self._exclude_columns = exclude_columns
+
+  def process(self, element: Dict[str, Any]):
+    data_to_encode = element.copy()
+    for key in self._exclude_columns:
+      if key in data_to_encode:
+        del data_to_encode[key]
+
+    bytes = pickler.dumps(data_to_encode)

Review Comment:
   > For the unused elements, we won't know what the schema of the elements.
   
   According to MLTransform docs, elements end up being Rows:
   
   > To define a data processing transformation by using MLTransform, create 
instances of data processing transforms with columns as input parameters. The 
data in the specified columns is transformed and outputted to the beam.Row 
object.
   
   do we infer the types later then?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to