[
https://issues.apache.org/jira/browse/BEAM-13379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479758#comment-17479758
]
Alistair Muldal commented on BEAM-13379:
----------------------------------------
bq. This is on the direct runner?
Yes
bq. It is not really expected for Beam users to do their own multithreading,
but to have the runner scale the number of threads up and down.
I agree that's usually the sensible thing to do performance-wise, but sometimes
additional threads are needed for other reasons (in our case we are making
calls into a third-party interface that would deadlock unless made from
different threads).
> Incrementing a counter from a Python subthread doesn't seem to do anything
> --------------------------------------------------------------------------
>
> Key: BEAM-13379
> URL: https://issues.apache.org/jira/browse/BEAM-13379
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Affects Versions: 2.34.0
> Environment: Debian 5.10.46-5rodete1 (2021-09-28) x86_64 GNU/Linux
> Reporter: Alistair Muldal
> Priority: P2
>
> For example:
> {code:python}
> from concurrent import futures
> import threading
> from absl import app
> from absl import logging
> import apache_beam as beam
> NAMESPACE = 'METRICS_THREADS_REPRO'
> main_thread_counter = beam.metrics.Metrics.counter(
> NAMESPACE, 'main_thread_counter')
> sub_thread_counter = beam.metrics.Metrics.counter(
> NAMESPACE, 'sub_thread_counter')
> def increment_counter(counter):
> counter.inc()
> logging.info('Incremented counter %s from thread %s',
> counter.metric_name, threading.current_thread().name)
> class IncrementCountersFn(beam.DoFn):
> def setup(self):
> self.executor = futures.ThreadPoolExecutor()
> def process(self, idx):
> increment_counter(main_thread_counter)
> self.executor.submit(increment_counter, sub_thread_counter).result()
> logging.info('Processed %i', idx)
> def main(argv):
> if len(argv) > 1:
> raise app.UsageError('Too many command-line arguments.')
> p = beam.Pipeline()
> _ = (
> p
> | 'Create' >> beam.Create(range(100))
> | 'Process' >> beam.ParDo(IncrementCountersFn()))
> result = p.run()
> result.wait_until_finish()
> filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
> filtered_metrics = result.metrics().query(filter_by_namespace)
> logging.info('Pipeline finished, metrics logged: %s', filtered_metrics)
> if __name__ == '__main__':
> app.run(main)
> {code}
> Only {{main_thread_counter}} is incremented, not {{sub_thread_counter}}:
> {noformat}
> I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished,
> metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process,
> metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
> counter), labels={}), committed=100, attempted=100)], 'distributions': [],
> 'gauges': []}
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)