SamHjelmfelt opened a new issue, #25315:
URL: https://github.com/apache/beam/issues/25315

   API: python
   apache-beam version: 2.44.0 (installed via pip)
   Python version: 3.10.9
   
   The first step of my pipeline collects all files ready for processing using 
fileio.MatchAll(). It is possible that no files will be available, and 
file.MatchAll() will output zero records. If zero records are output, the 
expected behavior is that no processing will be done, but if 
direct_running_mode is set to any value, an exception is thrown by 
GroupByKey(). 
   
   Here is sample code to reproduce the error and examples for all 8 scenarios:
   
   Without direct_running_mode:
   ```
   with apache_beam.Pipeline() as p:
       p | apache_beam.Create([('test', 'test')]) | apache_beam.Filter(lambda 
x: True) | apache_beam.GroupByKey() | apache_beam.Map(print)
   #output: ('test_key', ['test_value'])
   
   with apache_beam.Pipeline() as p:
       p | apache_beam.Create([('test', 'test')]) | apache_beam.Filter(lambda 
x: False) | apache_beam.GroupByKey() | apache_beam.Map(print)
   #No output. Execution succeeds
   ```
   
   With direct running mode:
   ```
   with 
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_processing"))
 as p:
       p | apache_beam.Create([('test_key', 'test_value')]) | 
apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() | 
apache_beam.Map(print)
   #output: ('test_key', ['test_value'])
   
   with 
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="in_memory")) 
as p:
       p | apache_beam.Create([('test_key', 'test_value')]) | 
apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() | 
apache_beam.Map(print)
   #output: ('test_key', ['test_value'])
   
   with 
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_threading"))
 as p:
       p | apache_beam.Create([('test_key', 'test_value')]) | 
apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() | 
apache_beam.Map(print)
   #output: ('test_key', ['test_value'])
   
   
   with 
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_processing"))
 as p:
       p | apache_beam.Create([('test_key', 'test_value')]) | 
apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() | 
apache_beam.Map(print)
   #Exception (similar to below)
   
   with 
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="in_memory")) 
as p:
       p | apache_beam.Create([('test_key', 'test_value')]) | 
apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() | 
apache_beam.Map(print)
   #Exception (similar to below)
   
   with 
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_threading"))
 as p:
       p | apache_beam.Create([('test_key', 'test_value')]) | 
apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() | 
apache_beam.Map(print)
   #Exception 
   Traceback (most recent call last):
     File 
"/opt/homebrew/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py",
 line 196, in _run_module_as_main
       return _run_code(code, main_globals, None,
     File 
"/opt/homebrew/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py",
 line 86, in _run_code
       exec(code, run_globals)
     File 
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/__main__.py",
 line 39, in <module>
       cli.main()
     File 
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py",
 line 430, in main
       run()
     File 
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py",
 line 284, in run_file
       runpy.run_path(target, run_name="__main__")
     File 
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py",
 line 321, in run_path
       return _run_module_code(code, init_globals, run_name,
     File 
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py",
 line 135, in _run_module_code
       _run_code(code, mod_globals, init_globals,
     File 
"/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py",
 line 124, in _run_code
       exec(code, run_globals)
     File "main.py", line 6, in <module>
       with 
apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_threading"))
 as p:
     File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/pipeline.py", 
line 600, in __exit__
       self.result = self.run()
     File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/pipeline.py", 
line 577, in run
       return self.runner.run_pipeline(self, self._options)
     File 
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 131, in run_pipeline
       return runner.run_pipeline(pipeline, options)
     File 
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 201, in run_pipeline
       self._latest_run_result = self.run_via_runner_api(
     File 
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 222, in run_via_runner_api
       return self.run_stages(stage_context, stages)
     File 
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 453, in run_stages
       bundle_results = self._execute_bundle(
     File 
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 781, in _execute_bundle
       self._run_bundle(
     File 
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1010, in _run_bundle
       result, splits = bundle_manager.process_bundle(
     File 
"/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1430, in process_bundle
       for ix, part in enumerate(input.partition(self._num_workers)):
   AttributeError: 'NoneType' object has no attribute 'partition'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to