This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c750f8a2ecd Utilizes file_interceptor parameter and sets it to 
get_normalized_path() function. (#36738)
c750f8a2ecd is described below

commit c750f8a2ecd38e0273e3018157f87e14cfc7a8aa
Author: Praneet Nadella <[email protected]>
AuthorDate: Fri Nov 7 19:37:23 2025 -0500

    Utilizes file_interceptor parameter and sets it to get_normalized_path() 
function. (#36738)
    
    * changes to uses path_normalization with file_interceptor in CONFIG
    
    * pylint, pyformat and originally pushed wrong test cases (fixed)
    
    * pyformatter / pylint fixes
    
    * clean up unused part of cloudpickle_pickler_test
---
 .../apache_beam/internal/cloudpickle_pickler.py    |  7 ++++---
 .../internal/cloudpickle_pickler_test.py           | 24 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py 
b/sdks/python/apache_beam/internal/cloudpickle_pickler.py
index 53cd7aace86..199294f1731 100644
--- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py
+++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py
@@ -37,13 +37,14 @@ import zlib
 
 from apache_beam.internal import code_object_pickler
 from apache_beam.internal.cloudpickle import cloudpickle
+from apache_beam.internal.code_object_pickler import get_normalized_path
 
 DEFAULT_CONFIG = cloudpickle.CloudPickleConfig(
-    skip_reset_dynamic_type_state=True)
-NO_DYNAMIC_CLASS_TRACKING_CONFIG = cloudpickle.CloudPickleConfig(
-    id_generator=None, skip_reset_dynamic_type_state=True)
+    skip_reset_dynamic_type_state=True,
+    filepath_interceptor=get_normalized_path)
 STABLE_CODE_IDENTIFIER_CONFIG = cloudpickle.CloudPickleConfig(
     skip_reset_dynamic_type_state=True,
+    filepath_interceptor=get_normalized_path,
     get_code_object_params=cloudpickle.GetCodeObjectParams(
         get_code_object_identifier=code_object_pickler.
         get_code_object_identifier,
diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py 
b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
index b63ebd6c710..4a51c56c24b 100644
--- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
+++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
@@ -19,11 +19,15 @@
 
 # pytype: skip-file
 
+import os
 import threading
 import types
 import unittest
+from unittest import mock
 
 from apache_beam.coders import proto2_coder_test_messages_pb2
+from apache_beam.internal import cloudpickle_pickler as beam_cloudpickle
+from apache_beam.internal import code_object_pickler
 from apache_beam.internal import module_test
 from apache_beam.internal.cloudpickle_pickler import dumps
 from apache_beam.internal.cloudpickle_pickler import loads
@@ -220,6 +224,26 @@ self.assertEqual(DataClass(datum='abc'), 
loads(dumps(DataClass(datum='abc'))))
           'Ignoring unsupported option: enable_best_effort_determinism',
           '\n'.join(l.output))
 
+  @mock.patch.object(
+      beam_cloudpickle.DEFAULT_CONFIG, 'filepath_interceptor', autospec=True)
+  def test_default_config_interceptor(self, mock_filepath_interceptor):
+    """Tests config.filepath_interceptor is called for CodeType pickling."""
+    mock_filepath_interceptor.side_effect = (
+        code_object_pickler.get_normalized_path)
+
+    def sample_func():
+      return "Beam"
+
+    code_obj = sample_func.__code__
+    original_filename = os.path.abspath(code_obj.co_filename)
+    pickled_code = beam_cloudpickle.dumps(code_obj)
+    unpickled_code = beam_cloudpickle.loads(pickled_code)
+
+    mock_filepath_interceptor.assert_called()
+
+    unpickled_filename = os.path.abspath(unpickled_code.co_filename)
+    self.assertEqual(unpickled_filename, original_filename)
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to