jd185367 opened a new issue, #32448:
URL: https://github.com/apache/beam/issues/32448

   ### What happened?
   
   A unit test using `TestPipeline` in the Python SDK had different behavior 
when running on Linux vs Windows machines. If I assign a PCollection to a 
variable (say, `result`) and then modify that data later after it's been 
partitioned (but do *not* modify `result` directly), on Linux machines, the 
original `result` data remains unchanged. On Windows machines, though, the 
`result` data is altered.
   
   This led to a few hours of "works on my machine" debugging confusion (where 
all the tests were passing on my laptop but failing on my CI server).
   
   My environment details:
   
   - Python: 3.10.5 (Windows) / 3.10.14 (Linux)
   - `beam`: 2.59.0
   - Linux Machine: Ubuntu 20.04.4 LTS
   - Windows Machine: Microsoft Windows 11 Enterprise, Version 10.0.22631 Build 
22631
   
   Example code to reproduce:
   
   ```python
   from dataclasses import dataclass
   import apache_beam as beam
   from apache_beam.testing.test_pipeline import TestPipeline
   from apache_beam.testing.util import assert_that, equal_to
   
   @dataclass
   class SomeObject:
       id: str
       type: int
   
       @classmethod
       def modify_and_return(cls, obj: "SomeObject") -> "SomeObject":
           obj.id = "modified"
           return obj
   
   class ModifyByReference(beam.PTransform):
       def expand(self, data: beam.PCollection[SomeObject]) -> 
beam.PCollection[SomeObject]:
           result = data
           type1, _ = result | "Split by type" >> beam.Partition(lambda x, _: 
x.type, 2)
           type1 | beam.Map(SomeObject.modify_and_return)
           return result
   
   def test_results_are_unmodified():
       test_data = [SomeObject("test_id", type=0), SomeObject("test_id", 
type=1)]
   
       with TestPipeline() as pipeline:
           result_ids = (
               pipeline
               | beam.Create(test_data)
               | ModifyByReference()
               | beam.Map(lambda x: x.id)
           )
   
           # this passes on Linux, but fails on Windows
           assert_that(result_ids, equal_to(["test_id", "test_id"]))
   ```
   
   Steps to reproduce:
   
   1. Install Python 3.10 (unsure how relevant this version is)
   2. Run `pip install apache-beam==2.59.0 pytest==8.3.3`
   3. Copy the example code to a file (e.g. `test.py`)
   4. Run `pytest test.py`
   5. Repeat steps 1-4 on Windows and Linux OS, respectively
   
   ### Linux Output
   
   ```
   ======================================== test session starts 
=========================================
   platform linux -- Python 3.10.14, pytest-8.3.3, pluggy-1.5.0
   rootdir: /<redacted>/reproduce-beam-bug
   collected 1 item
   
   test.py .                                                                    
                  [100%]
   
   ========================================== warnings summary 
==========================================
   .venv/lib/python3.10/site-packages/apache_beam/testing/test_pipeline.py:36
     
/<redacted>/reproduce-beam-bug/.venv/lib/python3.10/site-packages/apache_beam/testing/test_pipeline.py:36:
 PytestCollectionWarning: cannot collect test class 'TestPipeline' because it 
has a __init__ constructor (from: test.py)
       class TestPipeline(Pipeline):
   
   -- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
   ==================================== 1 passed, 1 warning in 5.50s 
====================================
   ```
   
   ### Windows Output
   
   ```
   ======================================== test session starts 
========================================
   platform win32 -- Python 3.10.5, pytest-8.3.3, pluggy-1.5.0
   rootdir: <redacted>\reproduce-bug
   collected 1 item
   
   test.py F                                                                    
                  [100%]
   
   ============================================= FAILURES 
==============================================
   ____________________________________ test_results_are_unmodified 
____________________________________
   
   >   ???
   
   apache_beam\\runners\\common.py:1495:
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _
   apache_beam\\runners\\common.py:913: in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
       ???
   apache_beam\\runners\\common.py:1055: in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
       ???
   .venv\lib\site-packages\apache_beam\transforms\core.py:2063: in <lambda>
       wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _
   
   actual = ['modified', 'test_id']
   equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 
0x00000170DB1681F0>
   
       def _equal(actual, equals_fn=equals_fn):
         expected_list = list(expected)
   
         # Try to compare actual and expected by sorting. This fails with a
         # TypeError in Python 3 if different types are present in the same
         # collection. It can also raise false negatives for types that don't 
have
         # a deterministic sort order, like pyarrow Tables as of 0.14.1
         if not equals_fn:
           equals_fn = lambda e, a: e == a
           try:
             sorted_expected = sorted(expected)
             sorted_actual = sorted(actual)
             if sorted_expected == sorted_actual:
               return
           except TypeError:
             pass
         # Slower method, used in two cases:
         # 1) If sorted expected != actual, use this method to verify the 
inequality.
         #    This ensures we don't raise any false negatives for types that 
don't
         #    have a deterministic sort order.
         # 2) As a fallback if we encounter a TypeError in python 3. this method
         #    works on collections that have different types.
         unexpected = []
         for element in actual:
           found = False
           for i, v in enumerate(expected_list):
             if equals_fn(v, element):
               found = True
               expected_list.pop(i)
               break
           if not found:
             unexpected.append(element)
         if unexpected or expected_list:
           msg = 'Failed assert: %r == %r' % (expected, actual)
           if unexpected:
             msg = msg + ', unexpected elements %r' % unexpected
           if expected_list:
             msg = msg + ', missing elements %r' % expected_list
   >       raise BeamAssertException(msg)
   E       apache_beam.testing.util.BeamAssertException: Failed assert: 
['test_id', 'test_id'] == ['modified', 'test_id'], unexpected elements 
['modified'], missing elements ['test_id']
   
   .venv\lib\site-packages\apache_beam\testing\util.py:192: BeamAssertException
   
   During handling of the above exception, another exception occurred:
   
       def test_results_are_unmodified():
           test_data = [SomeObject("test_id", type=0), SomeObject("test_id", 
type=1)]
   
   >       with TestPipeline() as pipeline:
   
   test.py:26:
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _
   .venv\lib\site-packages\apache_beam\pipeline.py:620: in __exit__
       self.result = self.run()
   .venv\lib\site-packages\apache_beam\testing\test_pipeline.py:115: in run
       result = super().run(
   .venv\lib\site-packages\apache_beam\pipeline.py:570: in run
       self._options).run(False)
   .venv\lib\site-packages\apache_beam\pipeline.py:594: in run
       return self.runner.run_pipeline(self, self._options)
   .venv\lib\site-packages\apache_beam\runners\direct\direct_runner.py:128: in 
run_pipeline
       return runner.run_pipeline(pipeline, options)
   
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:195:
 in run_pipeline
       self._latest_run_result = self.run_via_runner_api(
   
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:221:
 in run_via_runner_api
       return self.run_stages(stage_context, stages)
   
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:468:
 in run_stages
       bundle_results = self._execute_bundle(
   
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:793:
 in _execute_bundle
       self._run_bundle(
   
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:1032:
 in _run_bundle
       result, splits = bundle_manager.process_bundle(
   
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:1358:
 in process_bundle
       result_future = 
self._worker_handler.control_conn.push(process_bundle_req)
   
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py:384:
 in push
       response = self.worker.do_instruction(request)
   .venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py:656: in 
do_instruction
       return getattr(self, request_type)(
   .venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py:694: in 
process_bundle
       bundle_processor.process_bundle(instruction_id))
   .venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py:1119: 
in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
   .venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py:237: 
in process_encoded
       self.output(decoded_value)
   apache_beam\\runners\\worker\\operations.py:567: in 
apache_beam.runners.worker.operations.Operation.output
       ???
   apache_beam\\runners\\worker\\operations.py:569: in 
apache_beam.runners.worker.operations.Operation.output
       ???
   apache_beam\\runners\\worker\\operations.py:260: in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
       ???
   apache_beam\\runners\\worker\\operations.py:263: in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
       ???
   apache_beam\\runners\\worker\\operations.py:950: in 
apache_beam.runners.worker.operations.DoOperation.process
       ???
   apache_beam\\runners\\worker\\operations.py:951: in 
apache_beam.runners.worker.operations.DoOperation.process
       ???
   apache_beam\\runners\\common.py:1497: in 
apache_beam.runners.common.DoFnRunner.process
       ???
   apache_beam\\runners\\common.py:1586: in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
       ???
   apache_beam\\runners\\common.py:1495: in 
apache_beam.runners.common.DoFnRunner.process
       ???
   apache_beam\\runners\\common.py:687: in 
apache_beam.runners.common.SimpleInvoker.invoke_process
       ???
   apache_beam\\runners\\common.py:1681: in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
       ???
   apache_beam\\runners\\common.py:1794: in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
       ???
   apache_beam\\runners\\worker\\operations.py:263: in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
       ???
   apache_beam\\runners\\worker\\operations.py:950: in 
apache_beam.runners.worker.operations.DoOperation.process
       ???
   apache_beam\\runners\\worker\\operations.py:951: in 
apache_beam.runners.worker.operations.DoOperation.process
       ???
   apache_beam\\runners\\common.py:1497: in 
apache_beam.runners.common.DoFnRunner.process
       ???
   apache_beam\\runners\\common.py:1586: in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
       ???
   apache_beam\\runners\\common.py:1495: in 
apache_beam.runners.common.DoFnRunner.process
       ???
   apache_beam\\runners\\common.py:687: in 
apache_beam.runners.common.SimpleInvoker.invoke_process
       ???
   apache_beam\\runners\\common.py:1681: in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
       ???
   apache_beam\\runners\\common.py:1794: in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
       ???
   apache_beam\\runners\\worker\\operations.py:263: in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
       ???
   apache_beam\\runners\\worker\\operations.py:950: in 
apache_beam.runners.worker.operations.DoOperation.process
       ???
   apache_beam\\runners\\worker\\operations.py:951: in 
apache_beam.runners.worker.operations.DoOperation.process
       ???
   apache_beam\\runners\\common.py:1497: in 
apache_beam.runners.common.DoFnRunner.process
       ???
   apache_beam\\runners\\common.py:1586: in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
       ???
   apache_beam\\runners\\common.py:1495: in 
apache_beam.runners.common.DoFnRunner.process
       ???
   apache_beam\\runners\\common.py:687: in 
apache_beam.runners.common.SimpleInvoker.invoke_process
       ???
   apache_beam\\runners\\common.py:1681: in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
       ???
   apache_beam\\runners\\common.py:1794: in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
       ???
   apache_beam\\runners\\worker\\operations.py:263: in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
       ???
   apache_beam\\runners\\worker\\operations.py:950: in 
apache_beam.runners.worker.operations.DoOperation.process
       ???
   apache_beam\\runners\\worker\\operations.py:951: in 
apache_beam.runners.worker.operations.DoOperation.process
       ???
   apache_beam\\runners\\common.py:1497: in 
apache_beam.runners.common.DoFnRunner.process
       ???
   apache_beam\\runners\\common.py:1607: in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
       ???
   apache_beam\\runners\\common.py:1495: in 
apache_beam.runners.common.DoFnRunner.process
       ???
   apache_beam\\runners\\common.py:913: in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
       ???
   apache_beam\\runners\\common.py:1055: in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
       ???
   .venv\lib\site-packages\apache_beam\transforms\core.py:2063: in <lambda>
       wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _
   
   actual = ['modified', 'test_id']
   equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 
0x00000170DB1681F0>
   
       def _equal(actual, equals_fn=equals_fn):
         expected_list = list(expected)
   
         # Try to compare actual and expected by sorting. This fails with a
         # TypeError in Python 3 if different types are present in the same
         # collection. It can also raise false negatives for types that don't 
have
         # a deterministic sort order, like pyarrow Tables as of 0.14.1
         if not equals_fn:
           equals_fn = lambda e, a: e == a
           try:
             sorted_expected = sorted(expected)
             sorted_actual = sorted(actual)
             if sorted_expected == sorted_actual:
               return
           except TypeError:
             pass
         # Slower method, used in two cases:
         # 1) If sorted expected != actual, use this method to verify the 
inequality.
         #    This ensures we don't raise any false negatives for types that 
don't
         #    have a deterministic sort order.
         # 2) As a fallback if we encounter a TypeError in python 3. this method
         #    works on collections that have different types.
         unexpected = []
         for element in actual:
           found = False
           for i, v in enumerate(expected_list):
             if equals_fn(v, element):
               found = True
               expected_list.pop(i)
               break
           if not found:
             unexpected.append(element)
         if unexpected or expected_list:
           msg = 'Failed assert: %r == %r' % (expected, actual)
           if unexpected:
             msg = msg + ', unexpected elements %r' % unexpected
           if expected_list:
             msg = msg + ', missing elements %r' % expected_list
   >       raise BeamAssertException(msg)
   E       apache_beam.testing.util.BeamAssertException: Failed assert: 
['test_id', 'test_id'] == ['modified', 'test_id'], unexpected elements 
['modified'], missing elements ['test_id'] [while running 'assert_that/Match']
   
   .venv\lib\site-packages\apache_beam\testing\util.py:192: BeamAssertException
   ----------------------------------------- Captured log call 
-----------------------------------------
   ERROR    apache_beam.runners.common:bundle_processor.py:237 Failed assert: 
['test_id', 'test_id'] == ['modified', 'test_id'], unexpected elements 
['modified'], missing elements ['test_id'] [while running 'assert_that/Match']
   Traceback (most recent call last):
     File "apache_beam\\runners\\common.py", line 1495, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\\runners\\common.py", line 913, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
     File "apache_beam\\runners\\common.py", line 1055, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
     File 
"<redacted>\reproduce-bug\.venv\lib\site-packages\apache_beam\transforms\core.py",
 line 2063, in <lambda>
       wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
     File 
"<redacted>\reproduce-bug\.venv\lib\site-packages\apache_beam\testing\util.py", 
line 192, in _equal
       raise BeamAssertException(msg)
   apache_beam.testing.util.BeamAssertException: Failed assert: ['test_id', 
'test_id'] == ['modified', 'test_id'], unexpected elements ['modified'], 
missing elements ['test_id']
   ========================================= warnings summary 
==========================================
   .venv\lib\site-packages\apache_beam\testing\test_pipeline.py:36
     
<redacted>\reproduce-bug\.venv\lib\site-packages\apache_beam\testing\test_pipeline.py:36:
 PytestCollectionWarning: cannot collect test class 'TestPipeline' because it 
has a __init__ constructor (from: test.py)
       class TestPipeline(Pipeline):
   
   -- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
   ====================================== short test summary info 
======================================
   FAILED test.py::test_results_are_unmodified - 
apache_beam.testing.util.BeamAssertException: Failed assert: ['test_id', 
'test_id'] == ['modified...
   =================================== 1 failed, 1 warning in 2.48s 
====================================
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to