[
https://issues.apache.org/jira/browse/BEAM-13379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alistair Muldal updated BEAM-13379:
-----------------------------------
Description:
For example:
{code:python}
from concurrent import futures
import threading
from absl import app
from absl import logging
import apache_beam as beam
NAMESPACE = 'METRICS_THREADS_REPRO'
main_thread_counter = beam.metrics.Metrics.counter(
NAMESPACE, 'main_thread_counter')
sub_thread_counter = beam.metrics.Metrics.counter(
NAMESPACE, 'sub_thread_counter')
def increment_counter(counter):
counter.inc()
logging.info('Incremented counter %s from thread %s',
counter.metric_name, threading.current_thread().name)
class IncrementCountersFn(beam.DoFn):
def setup(self):
self.executor = futures.ThreadPoolExecutor()
def process(self, idx):
increment_counter(main_thread_counter)
self.executor.submit(increment_counter, sub_thread_counter).result()
logging.info('Processed %i', idx)
def main(argv):
if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.')
p = beam.Pipeline()
_ = (
p
| 'Create' >> beam.Create(range(100))
| 'Process' >> beam.ParDo(IncrementCountersFn()))
result = p.run()
result.wait_until_finish()
filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
filtered_metrics = result.metrics().query(filter_by_namespace)
logging.info('Pipeline finished, metrics logged: %s', filtered_metrics)
if __name__ == '__main__':
app.run(main)
{code}
Only {{main_thread_counter}} is incremented, not {{sub_thread_counter}}:
{noformat}
I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished,
metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process,
metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
counter), labels={}), committed=100, attempted=100)], 'distributions': [],
'gauges': []}
{noformat}
was:
For example:
{code:python}
from concurrent import futures
import threading
from absl import app
from absl import logging
import apache_beam as beam
NAMESPACE = 'METRICS_THREADS_REPRO'
main_thread_counter = beam.metrics.Metrics.counter(
NAMESPACE, 'main_thread_counter')
sub_thread_counter = beam.metrics.Metrics.counter(
NAMESPACE, 'sub_thread_counter')
def increment_counter(counter):
counter.inc()
logging.info('Incremented counter %s from thread %s',
counter.metric_name, threading.current_thread().name)
class IncrementCountersFn(beam.DoFn):
def setup(self):
self.executor = futures.ThreadPoolExecutor()
def process(self, idx):
increment_counter(main_thread_counter)
self.executor.submit(increment_counter, sub_thread_counter).result()
logging.info('Processed %i', idx)
def main(argv):
if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.')
p = beam.Pipeline()
_ = (
p
| 'Create' >> beam.Create(range(100))
| 'Process' >> beam.ParDo(IncrementCountersFn()))
result = p.run()
result.wait_until_finish()
filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
filtered_metrics = result.metrics().query(filter_by_namespace)
logging.info('Pipeline finished, metrics logged: %s', filtered_metrics)
if __name__ == '__main__':
app.run(main)
{code}
Only {{main_thread_counter}} is incremented, not {{sub_thread_counter}}:
{noformat}
I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished,
metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process,
metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
counter), labels={}), committed=100, attempted=100)], 'distributions': [],
'gauges': []}
{noformat}
> Incrementing a counter from a Python subthread doesn't seem to do anything
> --------------------------------------------------------------------------
>
> Key: BEAM-13379
> URL: https://issues.apache.org/jira/browse/BEAM-13379
> Project: Beam
> Issue Type: Bug
> Components: beam-model
> Affects Versions: 2.34.0
> Environment: Debian 5.10.46-5rodete1 (2021-09-28) x86_64 GNU/Linux
> Reporter: Alistair Muldal
> Priority: P2
>
> For example:
> {code:python}
> from concurrent import futures
> import threading
> from absl import app
> from absl import logging
> import apache_beam as beam
> NAMESPACE = 'METRICS_THREADS_REPRO'
> main_thread_counter = beam.metrics.Metrics.counter(
> NAMESPACE, 'main_thread_counter')
> sub_thread_counter = beam.metrics.Metrics.counter(
> NAMESPACE, 'sub_thread_counter')
> def increment_counter(counter):
> counter.inc()
> logging.info('Incremented counter %s from thread %s',
> counter.metric_name, threading.current_thread().name)
> class IncrementCountersFn(beam.DoFn):
> def setup(self):
> self.executor = futures.ThreadPoolExecutor()
> def process(self, idx):
> increment_counter(main_thread_counter)
> self.executor.submit(increment_counter, sub_thread_counter).result()
> logging.info('Processed %i', idx)
> def main(argv):
> if len(argv) > 1:
> raise app.UsageError('Too many command-line arguments.')
> p = beam.Pipeline()
> _ = (
> p
> | 'Create' >> beam.Create(range(100))
> | 'Process' >> beam.ParDo(IncrementCountersFn()))
> result = p.run()
> result.wait_until_finish()
> filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
> filtered_metrics = result.metrics().query(filter_by_namespace)
> logging.info('Pipeline finished, metrics logged: %s', filtered_metrics)
> if __name__ == '__main__':
> app.run(main)
> {code}
> Only {{main_thread_counter}} is incremented, not {{sub_thread_counter}}:
> {noformat}
> I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished,
> metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process,
> metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
> counter), labels={}), committed=100, attempted=100)], 'distributions': [],
> 'gauges': []}
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)