tvalentyn commented on code in PR #29542:
URL: https://github.com/apache/beam/pull/29542#discussion_r1414462326
##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -121,76 +123,74 @@ def expand(
return pcoll | beam.Map(lambda x: x._asdict())
-class ComputeAndAttachHashKey(beam.DoFn):
+class _ComputeAndAttachUniqueID(beam.DoFn):
"""
- Computes and attaches a hash key to the element.
- Only for internal use. No backwards compatibility guarantees.
+ Computes and attaches a unique id to each element in the PCollection.
"""
def process(self, element):
- hash_object = hashlib.sha256()
- for _, value in element.items():
- # handle the case where value is a list or numpy array
- if isinstance(value, (list, np.ndarray)):
- hash_object.update(str(list(value)).encode())
- else: # assume value is a primitive that can be turned into str
- hash_object.update(str(value).encode())
- yield (hash_object.hexdigest(), element)
+ # UUID1 includes machine-specific bits and has a counter. As long as not
too
+ # many are generated at the same time, they should be unique.
+ # UUID4 generation should be unique in practice as long as underlying
random
+ # number generation is not compromised.
+ # A combintation of both should avoid the anecdotal pitfalls where
+ # replacing one with the other has helped some users.
+ # UUID collision will result in data loss, but we can detect that and fail.
+
+ # TODO(https://github.com/apache/beam/issues/29593): Evaluate MLTransform
+ # implementation without CoGBK.
+ unique_key = uuid.uuid1().bytes + uuid.uuid4().bytes
+ yield (unique_key, element)
-class GetMissingColumnsPColl(beam.DoFn):
+class _GetMissingColumns(beam.DoFn):
"""
Returns data containing only the columns that are not
present in the schema. This is needed since TFT only outputs
columns that are transformed by any of the data processing transforms.
-
- Only for internal use. No backwards compatibility guarantees.
"""
def __init__(self, existing_columns):
self.existing_columns = existing_columns
def process(self, element):
- new_dict = {}
- hash_key, element = element
- for key, value in element.items():
- if key not in self.existing_columns:
- new_dict[key] = value
- yield (hash_key, new_dict)
+ id, row_dict = element
+ new_dict = {
+ k:v for k, v in row_dict.items() if k not in self.existing_columns
Review Comment:
```suggestion
k: v
for k, v in row_dict.items() if k not in self.existing_columns
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]