ConverJens commented on pull request #13723:
URL: https://github.com/apache/beam/pull/13723#issuecomment-770759368
@dandy10 For reference, this is how the beam pipeline is initiated in TFX:
```
def _make_beam_pipeline(self) -> beam_Pipeline: # pytype:
disable=invalid-annotation
"""Makes beam pipeline."""
if not beam:
raise Exception(
'Apache Beam must be installed to use this functionality.')
# pylint: disable=g-import-not-at-top
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.portability import fn_api_runner
# pylint: enable=g-import-not-at-top
pipeline_options = PipelineOptions(self._beam_pipeline_args)
if pipeline_options.view_as(StandardOptions).runner:
return beam.Pipeline(argv=self._beam_pipeline_args)
# TODO(b/159468583): move this warning to Beam.
direct_running_mode = pipeline_options.view_as(
DirectOptions).direct_running_mode
direct_num_workers = pipeline_options.view_as(
DirectOptions).direct_num_workers
if direct_running_mode == 'in_memory' and direct_num_workers != 1:
absl.logging.warning(
'If direct_num_workers is not equal to 1, direct_running_mode
should '
'be `multi_processing` or `multi_threading` instead of `in_memory`
'
'in order for it to have the desired worker parallelism effect.')
return beam.Pipeline(
options=pipeline_options, runner=fn_api_runner.FnApiRunner())
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]