alexjolig opened a new issue, #29275:
URL: https://github.com/apache/beam/issues/29275
I have a `Dofn` function in my pipeline which is running in GCP dataflow and
is suppose to do some process per products in parallel.
class Step1(DoFn):
def process(self, element):
# Get a list of products
for idx, item in enumerate(product_list):
yield product, idx
class Step2(DoFn):
def process(self, element):
# Get index and product
logger.info(f"::: Processing product number {index} STARTED at
{datetime.now()}:::::")
# Do some process ....
logger.info(f"::: FINISHED product number {index} at
{datetime.now()}:::::")
with Pipeline(options=pipeline_options) as pipeline:
results = (
pipeline
| "Read from PubSub" >> io.ReadFromPubSub()
| "Product list" >> ParDo(Step1())
| "Process Product" >> ParDo(Step2())
| "Group data" >> GroupBy()
...
)
So `Step2` is suppose to run per product in parallel. But actually what I
get in logs is:
::: Processing product number 0 STARTED at <some_time> :::::
::: FINISHED product number 0 at <some_time>:::::
::: Processing product number 1 STARTED at <some_time> :::::
::: FINISHED product number 1 at <some_time>:::::
::: Processing product number 2 STARTED at <some_time> :::::
::: FINISHED product number 2 at <some_time>:::::
::: Processing product number 3 STARTED at <some_time> :::::
::: FINISHED product number 3 at <some_time>:::::
...
That shows that Instead of running `Step2` in parallel, everything is
running sequentially, which takes a long time to finish for huge amount of
products.
As [apache beam documentation][1] suggests, I tried the following options in
PipelineOptions, and I double checked if they are actually set in the job in
GCP but the result was the same:
* `direct_num_workers=0`
* `direct_running_mode='multi_threading'`
* `direct_running_mode='multi_processing'`
Also as I've mentioned in the question title the runner is Google cloud's
`DataflowRunner`.
Is there something I'm missing here? Aren't `ParDo` functions suppose to run
in parallel?
[1]: https://beam.apache.org/documentation/runners/direct/#execution-mode
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]