tvalentyn commented on code in PR #30146:
URL: https://github.com/apache/beam/pull/30146#discussion_r1483706176
##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -83,20 +85,48 @@
tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]
+class DataCoder:
+ def __init__(
+ self,
+ exclude_columns,
+ coder=coders.registry.get_coder(Any),
+ ):
+ """
+ Uses PickleCoder to encode/decode the dictonaries.
+ Args:
+ exclude_columns: list of columns to exclude from the encoding.
+ """
+ self.coder = coder
+ self.exclude_columns = exclude_columns
+
+ def encode(self, element):
+ data_to_encode = element.copy()
+ for key in self.exclude_columns:
+ if key in data_to_encode:
+ del data_to_encode[key]
+ element[_TEMP_KEY] = self.coder.encode(data_to_encode)
Review Comment:
let's not mutate the element here as well, and return a clone.
##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -83,20 +85,48 @@
tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]
+class DataCoder:
+ def __init__(
+ self,
+ exclude_columns,
+ coder=coders.registry.get_coder(Any),
+ ):
+ """
+ Uses PickleCoder to encode/decode the dictonaries.
+ Args:
+ exclude_columns: list of columns to exclude from the encoding.
+ """
+ self.coder = coder
+ self.exclude_columns = exclude_columns
+
+ def encode(self, element):
+ data_to_encode = element.copy()
+ for key in self.exclude_columns:
+ if key in data_to_encode:
+ del data_to_encode[key]
+ element[_TEMP_KEY] = self.coder.encode(data_to_encode)
+ return element
+
+ def decode(self, element):
+ clone = copy.copy(element)
+ clone.update(self.coder.decode(clone[_TEMP_KEY].item()))
Review Comment:
what is the function of `.item()` here? what is the type of
clone[_TEMP_KEY]? are the elements in given that we call .item() here - will
elements in `clone` have consistent type after decoding?
##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -83,20 +85,48 @@
tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]
+class DataCoder:
Review Comment:
_DataCoder since this is internal to this module.
##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -83,20 +85,48 @@
tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]
+class DataCoder:
+ def __init__(
+ self,
+ exclude_columns,
+ coder=coders.registry.get_coder(Any),
+ ):
+ """
+ Uses PickleCoder to encode/decode the dictonaries.
Review Comment:
```suggestion
Encodes/decodes items of a dictionary into a single element.
```
We use FastPrimitiveCoder. you could say something generic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]