jd185367 opened a new issue, #32448:
URL: https://github.com/apache/beam/issues/32448
### What happened?
A unit test using `TestPipeline` in the Python SDK had different behavior
when running on Linux vs Windows machines. If I assign a PCollection to a
variable (say, `result`) and then modify that data later after it's been
partitioned (but do *not* modify `result` directly), on Linux machines, the
original `result` data remains unchanged. On Windows machines, though, the
`result` data is altered.
This led to a few hours of "works on my machine" debugging confusion (where
all the tests were passing on my laptop but failing on my CI server).
My environment details:
- Python: 3.10.5 (Windows) / 3.10.14 (Linux)
- `beam`: 2.59.0
- Linux Machine: Ubuntu 20.04.4 LTS
- Windows Machine: Microsoft Windows 11 Enterprise, Version 10.0.22631 Build
22631
Example code to reproduce:
```python
from dataclasses import dataclass
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
@dataclass
class SomeObject:
id: str
type: int
@classmethod
def modify_and_return(cls, obj: "SomeObject") -> "SomeObject":
obj.id = "modified"
return obj
class ModifyByReference(beam.PTransform):
def expand(self, data: beam.PCollection[SomeObject]) ->
beam.PCollection[SomeObject]:
result = data
type1, _ = result | "Split by type" >> beam.Partition(lambda x, _:
x.type, 2)
type1 | beam.Map(SomeObject.modify_and_return)
return result
def test_results_are_unmodified():
test_data = [SomeObject("test_id", type=0), SomeObject("test_id",
type=1)]
with TestPipeline() as pipeline:
result_ids = (
pipeline
| beam.Create(test_data)
| ModifyByReference()
| beam.Map(lambda x: x.id)
)
# this passes on Linux, but fails on Windows
assert_that(result_ids, equal_to(["test_id", "test_id"]))
```
Steps to reproduce:
1. Install Python 3.10 (unsure how relevant this version is)
2. Run `pip install apache-beam==2.59.0 pytest==8.3.3`
3. Copy the example code to a file (e.g. `test.py`)
4. Run `pytest test.py`
5. Repeat steps 1-4 on Windows and Linux OS, respectively
### Linux Output
```
======================================== test session starts
=========================================
platform linux -- Python 3.10.14, pytest-8.3.3, pluggy-1.5.0
rootdir: /<redacted>/reproduce-beam-bug
collected 1 item
test.py .
[100%]
========================================== warnings summary
==========================================
.venv/lib/python3.10/site-packages/apache_beam/testing/test_pipeline.py:36
/<redacted>/reproduce-beam-bug/.venv/lib/python3.10/site-packages/apache_beam/testing/test_pipeline.py:36:
PytestCollectionWarning: cannot collect test class 'TestPipeline' because it
has a __init__ constructor (from: test.py)
class TestPipeline(Pipeline):
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
==================================== 1 passed, 1 warning in 5.50s
====================================
```
### Windows Output
```
======================================== test session starts
========================================
platform win32 -- Python 3.10.5, pytest-8.3.3, pluggy-1.5.0
rootdir: <redacted>\reproduce-bug
collected 1 item
test.py F
[100%]
============================================= FAILURES
==============================================
____________________________________ test_results_are_unmodified
____________________________________
> ???
apache_beam\\runners\\common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam\\runners\\common.py:913: in
apache_beam.runners.common.PerWindowInvoker.invoke_process
???
apache_beam\\runners\\common.py:1055: in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
???
.venv\lib\site-packages\apache_beam\transforms\core.py:2063: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _
actual = ['modified', 'test_id']
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at
0x00000170DB1681F0>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't
have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the
inequality.
# This ensures we don't raise any false negatives for types that
don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert:
['test_id', 'test_id'] == ['modified', 'test_id'], unexpected elements
['modified'], missing elements ['test_id']
.venv\lib\site-packages\apache_beam\testing\util.py:192: BeamAssertException
During handling of the above exception, another exception occurred:
def test_results_are_unmodified():
test_data = [SomeObject("test_id", type=0), SomeObject("test_id",
type=1)]
> with TestPipeline() as pipeline:
test.py:26:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _
.venv\lib\site-packages\apache_beam\pipeline.py:620: in __exit__
self.result = self.run()
.venv\lib\site-packages\apache_beam\testing\test_pipeline.py:115: in run
result = super().run(
.venv\lib\site-packages\apache_beam\pipeline.py:570: in run
self._options).run(False)
.venv\lib\site-packages\apache_beam\pipeline.py:594: in run
return self.runner.run_pipeline(self, self._options)
.venv\lib\site-packages\apache_beam\runners\direct\direct_runner.py:128: in
run_pipeline
return runner.run_pipeline(pipeline, options)
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:195:
in run_pipeline
self._latest_run_result = self.run_via_runner_api(
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:221:
in run_via_runner_api
return self.run_stages(stage_context, stages)
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:468:
in run_stages
bundle_results = self._execute_bundle(
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:793:
in _execute_bundle
self._run_bundle(
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:1032:
in _run_bundle
result, splits = bundle_manager.process_bundle(
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py:1358:
in process_bundle
result_future =
self._worker_handler.control_conn.push(process_bundle_req)
.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py:384:
in push
response = self.worker.do_instruction(request)
.venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py:656: in
do_instruction
return getattr(self, request_type)(
.venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py:694: in
process_bundle
bundle_processor.process_bundle(instruction_id))
.venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py:1119:
in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
.venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py:237:
in process_encoded
self.output(decoded_value)
apache_beam\\runners\\worker\\operations.py:567: in
apache_beam.runners.worker.operations.Operation.output
???
apache_beam\\runners\\worker\\operations.py:569: in
apache_beam.runners.worker.operations.Operation.output
???
apache_beam\\runners\\worker\\operations.py:260: in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
???
apache_beam\\runners\\worker\\operations.py:263: in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
???
apache_beam\\runners\\worker\\operations.py:950: in
apache_beam.runners.worker.operations.DoOperation.process
???
apache_beam\\runners\\worker\\operations.py:951: in
apache_beam.runners.worker.operations.DoOperation.process
???
apache_beam\\runners\\common.py:1497: in
apache_beam.runners.common.DoFnRunner.process
???
apache_beam\\runners\\common.py:1586: in
apache_beam.runners.common.DoFnRunner._reraise_augmented
???
apache_beam\\runners\\common.py:1495: in
apache_beam.runners.common.DoFnRunner.process
???
apache_beam\\runners\\common.py:687: in
apache_beam.runners.common.SimpleInvoker.invoke_process
???
apache_beam\\runners\\common.py:1681: in
apache_beam.runners.common._OutputHandler.handle_process_outputs
???
apache_beam\\runners\\common.py:1794: in
apache_beam.runners.common._OutputHandler._write_value_to_tag
???
apache_beam\\runners\\worker\\operations.py:263: in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
???
apache_beam\\runners\\worker\\operations.py:950: in
apache_beam.runners.worker.operations.DoOperation.process
???
apache_beam\\runners\\worker\\operations.py:951: in
apache_beam.runners.worker.operations.DoOperation.process
???
apache_beam\\runners\\common.py:1497: in
apache_beam.runners.common.DoFnRunner.process
???
apache_beam\\runners\\common.py:1586: in
apache_beam.runners.common.DoFnRunner._reraise_augmented
???
apache_beam\\runners\\common.py:1495: in
apache_beam.runners.common.DoFnRunner.process
???
apache_beam\\runners\\common.py:687: in
apache_beam.runners.common.SimpleInvoker.invoke_process
???
apache_beam\\runners\\common.py:1681: in
apache_beam.runners.common._OutputHandler.handle_process_outputs
???
apache_beam\\runners\\common.py:1794: in
apache_beam.runners.common._OutputHandler._write_value_to_tag
???
apache_beam\\runners\\worker\\operations.py:263: in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
???
apache_beam\\runners\\worker\\operations.py:950: in
apache_beam.runners.worker.operations.DoOperation.process
???
apache_beam\\runners\\worker\\operations.py:951: in
apache_beam.runners.worker.operations.DoOperation.process
???
apache_beam\\runners\\common.py:1497: in
apache_beam.runners.common.DoFnRunner.process
???
apache_beam\\runners\\common.py:1586: in
apache_beam.runners.common.DoFnRunner._reraise_augmented
???
apache_beam\\runners\\common.py:1495: in
apache_beam.runners.common.DoFnRunner.process
???
apache_beam\\runners\\common.py:687: in
apache_beam.runners.common.SimpleInvoker.invoke_process
???
apache_beam\\runners\\common.py:1681: in
apache_beam.runners.common._OutputHandler.handle_process_outputs
???
apache_beam\\runners\\common.py:1794: in
apache_beam.runners.common._OutputHandler._write_value_to_tag
???
apache_beam\\runners\\worker\\operations.py:263: in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
???
apache_beam\\runners\\worker\\operations.py:950: in
apache_beam.runners.worker.operations.DoOperation.process
???
apache_beam\\runners\\worker\\operations.py:951: in
apache_beam.runners.worker.operations.DoOperation.process
???
apache_beam\\runners\\common.py:1497: in
apache_beam.runners.common.DoFnRunner.process
???
apache_beam\\runners\\common.py:1607: in
apache_beam.runners.common.DoFnRunner._reraise_augmented
???
apache_beam\\runners\\common.py:1495: in
apache_beam.runners.common.DoFnRunner.process
???
apache_beam\\runners\\common.py:913: in
apache_beam.runners.common.PerWindowInvoker.invoke_process
???
apache_beam\\runners\\common.py:1055: in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
???
.venv\lib\site-packages\apache_beam\transforms\core.py:2063: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _
actual = ['modified', 'test_id']
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at
0x00000170DB1681F0>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't
have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the
inequality.
# This ensures we don't raise any false negatives for types that
don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert:
['test_id', 'test_id'] == ['modified', 'test_id'], unexpected elements
['modified'], missing elements ['test_id'] [while running 'assert_that/Match']
.venv\lib\site-packages\apache_beam\testing\util.py:192: BeamAssertException
----------------------------------------- Captured log call
-----------------------------------------
ERROR apache_beam.runners.common:bundle_processor.py:237 Failed assert:
['test_id', 'test_id'] == ['modified', 'test_id'], unexpected elements
['modified'], missing elements ['test_id'] [while running 'assert_that/Match']
Traceback (most recent call last):
File "apache_beam\\runners\\common.py", line 1495, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam\\runners\\common.py", line 913, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam\\runners\\common.py", line 1055, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File
"<redacted>\reproduce-bug\.venv\lib\site-packages\apache_beam\transforms\core.py",
line 2063, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File
"<redacted>\reproduce-bug\.venv\lib\site-packages\apache_beam\testing\util.py",
line 192, in _equal
raise BeamAssertException(msg)
apache_beam.testing.util.BeamAssertException: Failed assert: ['test_id',
'test_id'] == ['modified', 'test_id'], unexpected elements ['modified'],
missing elements ['test_id']
========================================= warnings summary
==========================================
.venv\lib\site-packages\apache_beam\testing\test_pipeline.py:36
<redacted>\reproduce-bug\.venv\lib\site-packages\apache_beam\testing\test_pipeline.py:36:
PytestCollectionWarning: cannot collect test class 'TestPipeline' because it
has a __init__ constructor (from: test.py)
class TestPipeline(Pipeline):
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
====================================== short test summary info
======================================
FAILED test.py::test_results_are_unmodified -
apache_beam.testing.util.BeamAssertException: Failed assert: ['test_id',
'test_id'] == ['modified...
=================================== 1 failed, 1 warning in 2.48s
====================================
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]