[ 
https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16896575#comment-16896575
 ] 

Hannah Jiang edited comment on BEAM-3645 at 7/31/19 4:33 PM:
-------------------------------------------------------------

Direct runner can now process map tasks across multiple workers. Depending on 
running environment, these workers are running at multi-threads or 
multi-processes environment.

In order to run map tasks with multi-processes, we should use subprocess 
environment. Workers running at other environment are running at multi-thread.
{code:java}
# create a pipeline at subprocess environment
p = beam.Pipeline(options=pipeline_options,
  runner=fn_api_runner.FnApiRunner(
    default_environment=beam_runner_api_pb2.Environment(
      urn=python_urns.SUBPROCESS_SDK,
      payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
              sys.executable.encode('ascii'))))
{code}
 

*--direct_num_workers* option is used to define number of threads/processes for 
each map task. Default value is 1. 
{code:java}
# an example to pass --direct_num_workers to a job.
python wordcount.py --input xx --output xx --direct_num_workers 2
{code}


was (Author: hannahjiang):
_This summary is pending review to make sure it's 100% correct._

Previously, shuffle data, which we call buffer as well with FnApi, is processed 
by a single worker. Now, the shuffle data, except WindowGroupingBuffer, is 
partitioned to N chunks and each chunk is processed by a worker, thus 
processing performance is improved. Depending on running environment, 
partitioned shuffles can be multi-thread or multi-process processed.

 

Fn Api provides following running environment.

1. EmbeddedWorkerHandler

2. EmbeddedGrpcWorkerHandler

3. SubprocessSdkWorkerHandler

4.  DockerSdkWorkerHandler

5.  ExternalWorkerHandler

 

Workers running with SubprocessSdkWorkerHandler are running at processes, so if 
we want to multi-process shuffle data, we should use this environment. Workers 
running with other worker handlers are running at a single process.

*--direct_num_workers* option is used to define number of partitions of 
shuffle, default value is 1.

> Support multi-process execution on the FnApiRunner
> --------------------------------------------------
>
>                 Key: BEAM-3645
>                 URL: https://issues.apache.org/jira/browse/BEAM-3645
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Charles Chen
>            Assignee: Hannah Jiang
>            Priority: Major
>             Fix For: 2.15.0
>
>          Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to