claudevdm commented on code in PR #36271:
URL: https://github.com/apache/beam/pull/36271#discussion_r2583559735
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -704,8 +709,24 @@ def copy(self):
copy.by_urn.update(self.by_urn)
copy.by_logical_type.update(self.by_logical_type)
copy.by_language_type.update(self.by_language_type)
+ copy._custom_urns.update(self._custom_urns)
return copy
+ def copy_custom(self):
+ copy = LogicalTypeRegistry()
+ for urn in self._custom_urns:
+ logical_type = self.by_urn[urn]
+ copy.by_urn[urn] = logical_type
+ copy.by_logical_type[logical_type] = urn
+ copy.by_language_type[logical_type.language_type()] = logical_type
+ copy._custom_urns.add(urn)
Review Comment:
return copy?
##########
sdks/python/apache_beam/internal/cloudpickle_pickler.py:
##########
@@ -252,12 +252,35 @@ def _lock_reducer(obj):
def dump_session(file_path):
- # It is possible to dump session with cloudpickle. However, since references
- # are saved it should not be necessary. See
https://s.apache.org/beam-picklers
- pass
+ # Since References are saved (https://s.apache.org/beam-picklers), we only
+ # dump supported Beam Registries (currently only logical type registry)
+ from apache_beam.typehints import schemas
+ from apache_beam.coders import typecoders
+
+ with _pickle_lock, open(file_path, 'wb') as file:
+ coder_reg = typecoders.registry.get_custom_type_coder_tuples()
+ logical_type_reg = schemas.LogicalType._known_logical_types.copy_custom()
+
+ pickler = cloudpickle.CloudPickler(file)
+ # TODO(https://github.com/apache/beam/issues/18500) add file system
registry
+ # once implemented
+ pickler.dump({"coder": coder_reg, "logical_type": logical_type_reg})
def load_session(file_path):
- # It is possible to load_session with cloudpickle. However, since references
- # are saved it should not be necessary. See
https://s.apache.org/beam-picklers
- pass
+ from apache_beam.typehints import schemas
+ from apache_beam.coders import typecoders
+
+ with _pickle_lock, open(file_path, 'rb') as file:
+ registries = cloudpickle.load(file)
+ if type(registries) != dict:
+ raise ValueError(
+ "Faled loading session: expected dict, get {}", type(registries))
Review Comment:
replace "get" with "got"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]