tvalentyn commented on code in PR #30146:
URL: https://github.com/apache/beam/pull/30146#discussion_r1473396317


##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -447,22 +408,21 @@ def expand(
       raw_data_metadata = metadata_io.read_metadata(
           os.path.join(self.artifact_location, RAW_DATA_METADATA_DIR))
 
-    keyed_raw_data = (raw_data | beam.ParDo(_ComputeAndAttachUniqueID()))
+    keyed_raw_data = (raw_data)  #  | beam.ParDo(_ComputeAndAttachUniqueID()))
 
     feature_set = [feature.name for feature in 
raw_data_metadata.schema.feature]
-    keyed_columns_not_in_schema = (
-        keyed_raw_data
-        | beam.ParDo(_GetMissingColumns(feature_set)))
 
     # To maintain consistency by outputting numpy array all the time,
     # whether a scalar value or list or np array is passed as input,
     #  we will convert scalar values to list values and TFT will ouput
     # numpy array all the time.
+    raw_data_list = (
+        keyed_raw_data
+        | beam.ParDo(_EncodeDict(exclude_columns=feature_set)))

Review Comment:
   Consider:
   
   ```
   | beam.ParDo(_EncodeUnusedColumns(used_columns=feature_set)))
   ```



##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -83,20 +84,53 @@
 tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]
 
 
+# alternatie: Use a single class for both encoding and decoding and
+# use beam.Map() instead of DoFns?
+class _EncodeDict(beam.DoFn):
+  """
+  Encode a dictionary into bytes and pass it along with the original element
+  using a temporary key.
+
+  Internal use only. No backward compatibility guarantees.
+  """
+  def __init__(self, exclude_columns=None):
+    self._exclude_columns = exclude_columns
+
+  def process(self, element: Dict[str, Any]):
+    data_to_encode = element.copy()
+    for key in self._exclude_columns:
+      if key in data_to_encode:
+        del data_to_encode[key]
+
+    bytes = pickler.dumps(data_to_encode)
+    element[_TEMP_KEY] = bytes
+    yield element
+
+
+class _DecodeDict(beam.DoFn):
+  """
+  Used to decode the dictionary from bytes(encoded using _EncodeDict)
+  Internal use only. No backward compatibility guarantees.
+  """
+  def process(self, element):
+    element.update(pickler.loads(element[_TEMP_KEY].item()))
+    del element[_TEMP_KEY]

Review Comment:
   let's not mutate elements. emit a copy. 



##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -83,20 +84,53 @@
 tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]
 
 
+# alternatie: Use a single class for both encoding and decoding and
+# use beam.Map() instead of DoFns?
+class _EncodeDict(beam.DoFn):
+  """
+  Encode a dictionary into bytes and pass it along with the original element
+  using a temporary key.
+
+  Internal use only. No backward compatibility guarantees.
+  """
+  def __init__(self, exclude_columns=None):
+    self._exclude_columns = exclude_columns
+
+  def process(self, element: Dict[str, Any]):
+    data_to_encode = element.copy()
+    for key in self._exclude_columns:
+      if key in data_to_encode:
+        del data_to_encode[key]
+
+    bytes = pickler.dumps(data_to_encode)

Review Comment:
   It might be more efficient to use type-aware cythonized coder. Performance 
difference can be tested in a microbenchmark.
   
   We might be able to convert elements to Beam Row, and use RowCoder. Seeing 
something similar in:
   
https://github.com/apache/beam/blob/a221f98a5f46be985afcb98a65fcec3b46b81f92/sdks/python/apache_beam/transforms/external.py#L127-L132
   
   Can we expect the schema of the elements in the dict the same ? do we have 
type information from the elements or would need to infer them? 
    
   cc: @robertwb who might have a better idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to