SamHjelmfelt opened a new issue, #25315:
URL: https://github.com/apache/beam/issues/25315
API: python
apache-beam version: 2.44.0 (installed via pip)
Python version: 3.10.9
The first step of my pipeline collects all files ready for processing using
fileio.MatchAll(). It is possible that no files will be available, and
file.MatchAll() will output zero records. If zero records are output, the
expected behavior is that no processing will be done, but if
direct_running_mode is set to any value, an exception is thrown by
GroupByKey().
Here is sample code to reproduce the error and examples for all 8 scenarios:
Without direct_running_mode:
```
with apache_beam.Pipeline() as p:
p | apache_beam.Create([('test', 'test')]) | apache_beam.Filter(lambda
x: True) | apache_beam.GroupByKey() | apache_beam.Map(print)
#output: ('test_key', ['test_value'])
with apache_beam.Pipeline() as p:
p | apache_beam.Create([('test', 'test')]) | apache_beam.Filter(lambda
x: False) | apache_beam.GroupByKey() | apache_beam.Map(print)
#No output. Execution succeeds
```
With direct running mode:
```
with
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_processing"))
as p:
p | apache_beam.Create([('test_key', 'test_value')]) |
apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() |
apache_beam.Map(print)
#output: ('test_key', ['test_value'])
with
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="in_memory"))
as p:
p | apache_beam.Create([('test_key', 'test_value')]) |
apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() |
apache_beam.Map(print)
#output: ('test_key', ['test_value'])
with
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_threading"))
as p:
p | apache_beam.Create([('test_key', 'test_value')]) |
apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() |
apache_beam.Map(print)
#output: ('test_key', ['test_value'])
with
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_processing"))
as p:
p | apache_beam.Create([('test_key', 'test_value')]) |
apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() |
apache_beam.Map(print)
#Exception (similar to below)
with
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="in_memory"))
as p:
p | apache_beam.Create([('test_key', 'test_value')]) |
apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() |
apache_beam.Map(print)
#Exception (similar to below)
with
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_threading"))
as p:
p | apache_beam.Create([('test_key', 'test_value')]) |
apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() |
apache_beam.Map(print)
#Exception
Traceback (most recent call last):
File
"/opt/homebrew/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py",
line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File
"/opt/homebrew/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py",
line 86, in _run_code
exec(code, run_globals)
File
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/__main__.py",
line 39, in <module>
cli.main()
File
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py",
line 430, in main
run()
File
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py",
line 284, in run_file
runpy.run_path(target, run_name="__main__")
File
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py",
line 321, in run_path
return _run_module_code(code, init_globals, run_name,
File
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py",
line 135, in _run_module_code
_run_code(code, mod_globals, init_globals,
File
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py",
line 124, in _run_code
exec(code, run_globals)
File "main.py", line 6, in <module>
with
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_threading"))
as p:
File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 600, in __exit__
self.result = self.run()
File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 577, in run
return self.runner.run_pipeline(self, self._options)
File
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py",
line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 201, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 222, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 453, in run_stages
bundle_results = self._execute_bundle(
File
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 781, in _execute_bundle
self._run_bundle(
File
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1010, in _run_bundle
result, splits = bundle_manager.process_bundle(
File
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1430, in process_bundle
for ix, part in enumerate(input.partition(self._num_workers)):
AttributeError: 'NoneType' object has no attribute 'partition'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]