[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 1/10/20 8:15 PM:
-------------------------------------------------------------

*{color:#ff0000}Update on 01/10/2019{color}*

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
direct_running_mode can be one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with PipelineOptions().
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}

*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode multi_threading

known_args, pipeline_args = parser.parse_known_args(argv)
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}
 

{color:#ff0000}*Update on 30/06/2018.*{color}

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.SUBPROCESS_SDK,
      payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
              sys.executable.encode('ascii'))))
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}


was (Author: hannahjiang):
*{color:#ff0000}Update on 01/10/2019{color}*

We can use --direct_running_mode to switch between multi_threading and 
multi_processing.
direct_running_mode can be one of ['in_memory',  'multi_threading', 
'multi_processing']. Default mode is in_memory.

*in_memory*: it is multi threading mode, worker and runners' communication 
happens in the memory (not through gRPC).
 *multi_threading*: it is multi threading mode, worker and runners communicate 
through gRPC.
 *multi_processing*: it is multi processing, worker and runners communicate 
through gRPC.

Here is how to set the direct_running_mode.
 *Option 1*: set it with PipelineOptions().
{code:java}
 pipeline_options = PipelineOptions(direct_num_workers=2, 
direct_running_mode='multi_threading')
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}

*Option 2*: pass it with CLI.
{code:java}
 python xxx --direct_num_workers 2 - -direct_running_mode 'multi_threading'

known_args, pipeline_args = parser.parse_known_args(argv)
 p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(),
         options=pipeline_options)
{code}
 

{color:#ff0000}*Update on 30/06/2018.*{color}

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running in multithreading or 
multiprocessing mode.

_*It is supported from Beam 2.15.*_

*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.SUBPROCESS_SDK,
      payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
              sys.executable.encode('ascii'))))
{code}
 

*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.EMBEDDED_PYTHON_GRPC,
      payload=b'1'))) # payload is # of threads of each worker.{code}
 

*--direct_num_workers* option is used to control parallelism. Default value is 
1. 
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2

# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}

> Support multi-process execution on the FnApiRunner
> --------------------------------------------------
>
>                 Key: BEAM-3645
>                 URL: https://issues.apache.org/jira/browse/BEAM-3645
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Charles Chen
>            Assignee: Hannah Jiang
>            Priority: Major
>             Fix For: 2.15.0
>
>          Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to