This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c750f8a2ecd Utilizes file_interceptor parameter and sets it to
get_normalized_path() function. (#36738)
c750f8a2ecd is described below
commit c750f8a2ecd38e0273e3018157f87e14cfc7a8aa
Author: Praneet Nadella <[email protected]>
AuthorDate: Fri Nov 7 19:37:23 2025 -0500
Utilizes file_interceptor parameter and sets it to get_normalized_path()
function. (#36738)
* changes to uses path_normalization with file_interceptor in CONFIG
* pylint, pyformat and originally pushed wrong test cases (fixed)
* pyformatter / pylint fixes
* clean up unused part of cloudpickle_pickler_test
---
.../apache_beam/internal/cloudpickle_pickler.py | 7 ++++---
.../internal/cloudpickle_pickler_test.py | 24 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py
b/sdks/python/apache_beam/internal/cloudpickle_pickler.py
index 53cd7aace86..199294f1731 100644
--- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py
+++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py
@@ -37,13 +37,14 @@ import zlib
from apache_beam.internal import code_object_pickler
from apache_beam.internal.cloudpickle import cloudpickle
+from apache_beam.internal.code_object_pickler import get_normalized_path
DEFAULT_CONFIG = cloudpickle.CloudPickleConfig(
- skip_reset_dynamic_type_state=True)
-NO_DYNAMIC_CLASS_TRACKING_CONFIG = cloudpickle.CloudPickleConfig(
- id_generator=None, skip_reset_dynamic_type_state=True)
+ skip_reset_dynamic_type_state=True,
+ filepath_interceptor=get_normalized_path)
STABLE_CODE_IDENTIFIER_CONFIG = cloudpickle.CloudPickleConfig(
skip_reset_dynamic_type_state=True,
+ filepath_interceptor=get_normalized_path,
get_code_object_params=cloudpickle.GetCodeObjectParams(
get_code_object_identifier=code_object_pickler.
get_code_object_identifier,
diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
index b63ebd6c710..4a51c56c24b 100644
--- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
+++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
@@ -19,11 +19,15 @@
# pytype: skip-file
+import os
import threading
import types
import unittest
+from unittest import mock
from apache_beam.coders import proto2_coder_test_messages_pb2
+from apache_beam.internal import cloudpickle_pickler as beam_cloudpickle
+from apache_beam.internal import code_object_pickler
from apache_beam.internal import module_test
from apache_beam.internal.cloudpickle_pickler import dumps
from apache_beam.internal.cloudpickle_pickler import loads
@@ -220,6 +224,26 @@ self.assertEqual(DataClass(datum='abc'),
loads(dumps(DataClass(datum='abc'))))
'Ignoring unsupported option: enable_best_effort_determinism',
'\n'.join(l.output))
+ @mock.patch.object(
+ beam_cloudpickle.DEFAULT_CONFIG, 'filepath_interceptor', autospec=True)
+ def test_default_config_interceptor(self, mock_filepath_interceptor):
+ """Tests config.filepath_interceptor is called for CodeType pickling."""
+ mock_filepath_interceptor.side_effect = (
+ code_object_pickler.get_normalized_path)
+
+ def sample_func():
+ return "Beam"
+
+ code_obj = sample_func.__code__
+ original_filename = os.path.abspath(code_obj.co_filename)
+ pickled_code = beam_cloudpickle.dumps(code_obj)
+ unpickled_code = beam_cloudpickle.loads(pickled_code)
+
+ mock_filepath_interceptor.assert_called()
+
+ unpickled_filename = os.path.abspath(unpickled_code.co_filename)
+ self.assertEqual(unpickled_filename, original_filename)
+
if __name__ == '__main__':
unittest.main()