[ https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16896575#comment-16896575 ]
Hannah Jiang edited comment on BEAM-3645 at 8/2/19 6:24 PM: ------------------------------------------------------------ Direct runner can now process map tasks across multiple workers. Depending on running environment, these workers are running in multithreading or multiprocessing mode. *How to run with multiprocessing mode*: {code:java} # using subprocess runner p = beam.Pipeline(options=pipeline_options, runner=fn_api_runner.FnApiRunner( default_environment=beam_runner_api_pb2.Environment( urn=python_urns.SUBPROCESS_SDK, payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' % sys.executable.encode('ascii')))) {code} *How to run with multithreading mode:* {code:java} # using in memory embedded runner p = beam.Pipeline(options=pipeline_options) {code} {code:java} # using embedded grpc runner p = beam.Pipeline(options=pipeline_options, runner=fn_api_runner.FnApiRunner( default_environment=beam_runner_api_pb2.Environment( urn=python_urns.EMBEDDED_PYTHON_GRPC, payload=b'1'))) # payload is # of threads of each worker.{code} *--direct_num_workers* option is used to control number of workers. Default value is 1. {code:java} # an example to pass from CLI. python wordcount.py --input xx --output xx --direct_num_workers 2 # an example to set it with pipeline options directly. from apache_beam.options.pipeline_options import PipelineOptions pipeline_options = PipelineOptions(['--direct_num_workers', '2']) # an example add it to existing pipeline options. pipeline_options = xxx pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code} was (Author: hannahjiang): Direct runner can now process map tasks across multiple workers. Depending on running environment, these workers are running in multithreading or multiprocessing mode. *How to run with multiprocessing mode*: {code:java} # using subprocess runner p = beam.Pipeline(options=pipeline_options, runner=fn_api_runner.FnApiRunner( default_environment=beam_runner_api_pb2.Environment( urn=python_urns.SUBPROCESS_SDK, payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' % sys.executable.encode('ascii')))) {code} *How to run with multithreading mode:* {code:java} # using in memory embedded runner p = beam.Pipeline(options=pipeline_options) {code} {code:java} # using embedded grpc runner p = beam.Pipeline(options=pipeline_options, runner=fn_api_runner.FnApiRunner( default_environment=beam_runner_api_pb2.Environment( urn=python_urns.EMBEDDED_PYTHON_GRPC, payload=b'1'))) # payload is # of threads of each worker.{code} *--direct_num_workers* option is used to control number of workers. Default value is 1. {code:java} # an example to pass --direct_num_workers to a job. python wordcount.py --input xx --output xx --direct_num_workers 2 # an example to set it with pipeline options directly. from apache_beam.options.pipeline_options import PipelineOptions pipeline_options = PipelineOptions(['--direct_num_workers', '2']) # an example add it to existing pipeline options. pipeline_options = xxx pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code} > Support multi-process execution on the FnApiRunner > -------------------------------------------------- > > Key: BEAM-3645 > URL: https://issues.apache.org/jira/browse/BEAM-3645 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core > Affects Versions: 2.2.0, 2.3.0 > Reporter: Charles Chen > Assignee: Hannah Jiang > Priority: Major > Fix For: 2.15.0 > > Time Spent: 35h 20m > Remaining Estimate: 0h > > https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance > gain over the previous DirectRunner. We can do even better in multi-core > environments by supporting multi-process execution in the FnApiRunner, to > scale past Python GIL limitations. -- This message was sent by Atlassian JIRA (v7.6.14#76016)