tvalentyn commented on code in PR #36737:
URL: https://github.com/apache/beam/pull/36737#discussion_r2524596345
##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -444,12 +445,19 @@ def __init__(
self.metrics_container = MetricsContainer(self.name_context.metrics_name())
self.state_sampler = state_sampler
- self.scoped_start_state = self.state_sampler.scoped_state(
- self.name_context, 'start', metrics_container=self.metrics_container)
- self.scoped_process_state = self.state_sampler.scoped_state(
- self.name_context, 'process', metrics_container=self.metrics_container)
- self.scoped_finish_state = self.state_sampler.scoped_state(
- self.name_context, 'finish', metrics_container=self.metrics_container)
+ if self.state_sampler:
Review Comment:
> While I was debugging CI failures, I noticed that the state_sampler is
None when operations are run in a non-streaming context. The autocomplete_test
was an example of this.
I am not seeing these errors - am I missing something?
I do notice the following errors:
```
2025-11-13T00:34:53.8931580Z =================================== FAILURES
===================================
2025-11-13T00:34:53.8932180Z ___________
StateSamplerTest.test_process_timers_metric_is_recorded ____________
2025-11-13T00:34:53.8933040Z [gw3] darwin -- Python 3.11.9
/Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
2025-11-13T00:34:53.8933480Z
2025-11-13T00:34:53.8933840Z self =
<apache_beam.runners.worker.statesampler_test.StateSamplerTest
testMethod=test_process_timers_metric_is_recorded>
2025-11-13T00:34:53.8934320Z
2025-11-13T00:34:53.8934560Z @retry(reraise=True,
stop=stop_after_attempt(3))
2025-11-13T00:34:53.8934960Z def
test_process_timers_metric_is_recorded(self):
2025-11-13T00:34:53.8935330Z """
2025-11-13T00:34:53.8936770Z Tests that the 'process-timers-msecs'
metric is correctly recorded
2025-11-13T00:34:53.8937260Z when a state sampler is active.
2025-11-13T00:34:53.8937700Z """
2025-11-13T00:34:53.8938000Z # Set up a real state sampler and counter
factory.
2025-11-13T00:34:53.8938440Z counter_factory = CounterFactory()
2025-11-13T00:34:53.8938970Z sampler = statesampler.StateSampler(
2025-11-13T00:34:53.8939420Z 'test_stage', counter_factory,
sampling_period_ms=1)
2025-11-13T00:34:53.8939770Z
2025-11-13T00:34:53.8941420Z state_duration_ms = 100
2025-11-13T00:34:53.8942440Z margin_of_error = 0.25
2025-11-13T00:34:53.8943340Z
2025-11-13T00:34:53.8944230Z # Run a workload inside the
'process-timers' scoped state.
2025-11-13T00:34:53.8945180Z sampler.start()
2025-11-13T00:34:53.8946500Z with sampler.scoped_state('test_step',
'process-timers'):
2025-11-13T00:34:53.8947520Z time.sleep(state_duration_ms / 1000.0)
2025-11-13T00:34:53.8948220Z sampler.stop()
2025-11-13T00:34:53.8949110Z sampler.commit_counters()
2025-11-13T00:34:53.8949860Z
2025-11-13T00:34:53.8950580Z if not statesampler.FAST_SAMPLER:
2025-11-13T00:34:53.8950920Z return
2025-11-13T00:34:53.8951840Z
2025-11-13T00:34:53.8952350Z # Verify that the counter was created
with the correct name and value.
2025-11-13T00:34:53.8952790Z expected_counter_name = CounterName(
2025-11-13T00:34:53.8953250Z 'process-timers-msecs',
step_name='test_step', stage_name='test_stage')
2025-11-13T00:34:53.8953720Z
2025-11-13T00:34:53.8954090Z # Find the specific counter we are
looking for.
2025-11-13T00:34:53.8954450Z found_counter = None
2025-11-13T00:34:53.8954800Z for counter in
counter_factory.get_counters():
2025-11-13T00:34:53.8955480Z if counter.name ==
expected_counter_name:
2025-11-13T00:34:53.8955830Z found_counter = counter
2025-11-13T00:34:53.8956180Z break
2025-11-13T00:34:53.8956550Z
2025-11-13T00:34:53.8956860Z self.assertIsNotNone(
2025-11-13T00:34:53.8957230Z found_counter,
2025-11-13T00:34:53.8957580Z f"The expected counter
'{expected_counter_name}' was not created.")
2025-11-13T00:34:53.8957970Z
2025-11-13T00:34:53.8959130Z # Check that its value is approximately
correct.
2025-11-13T00:34:53.8959570Z actual_value = found_counter.value()
2025-11-13T00:34:53.8959980Z expected_value = state_duration_ms
2025-11-13T00:34:53.8960370Z self.assertGreater(
2025-11-13T00:34:53.8960660Z actual_value,
2025-11-13T00:34:53.8960990Z expected_value * (1.0 -
margin_of_error),
2025-11-13T00:34:53.8961450Z "The timer metric was lower than
expected.")
2025-11-13T00:34:53.8961800Z > self.assertLess(
2025-11-13T00:34:53.8962150Z actual_value,
2025-11-13T00:34:53.8962460Z expected_value * (1.0 +
margin_of_error),
2025-11-13T00:34:53.8962870Z "The timer metric was higher than
expected.")
2025-11-13T00:34:53.8963770Z E AssertionError: 189 not less than 125.0 :
The timer metric was higher than expected.
2025-11-13T00:34:53.8964090Z
2025-11-13T00:34:53.8964380Z
apache_beam/runners/worker/statesampler_test.py:216: AssertionError
2025-11-13T00:34:53.8964950Z _____________________
StateSamplerTest.test_timer_sampler ______________________
2025-11-13T00:34:53.8965520Z [gw3] darwin -- Python 3.11.9
/Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
2025-11-13T00:34:53.8965930Z
2025-11-13T00:34:53.8966240Z self =
<apache_beam.runners.worker.statesampler_test.StateSamplerTest
testMethod=test_timer_sampler>
2025-11-13T00:34:53.8966710Z
2025-11-13T00:34:53.8966980Z @retry(reraise=True,
stop=stop_after_attempt(3))
2025-11-13T00:34:53.8967370Z def test_timer_sampler(self):
2025-11-13T00:34:53.8967730Z # Set up state sampler.
2025-11-13T00:34:53.8968130Z counter_factory = CounterFactory()
2025-11-13T00:34:53.8968520Z sampler = statesampler.StateSampler(
2025-11-13T00:34:53.8969930Z 'timer', counter_factory,
sampling_period_ms=1)
2025-11-13T00:34:53.8970590Z
2025-11-13T00:34:53.8970840Z # Duration of the timer processing.
2025-11-13T00:34:53.9067190Z state_duration_ms = 100
2025-11-13T00:34:53.9067910Z margin_of_error = 0.25
2025-11-13T00:34:53.9068290Z
2025-11-13T00:34:53.9074160Z sampler.start()
2025-11-13T00:34:53.9143540Z with sampler.scoped_state('step1',
'process-timers'):
2025-11-13T00:34:53.9146170Z time.sleep(state_duration_ms / 1000)
2025-11-13T00:34:53.9146930Z sampler.stop()
2025-11-13T00:34:53.9147550Z sampler.commit_counters()
2025-11-13T00:34:53.9149250Z
2025-11-13T00:34:53.9150470Z if not statesampler.FAST_SAMPLER:
2025-11-13T00:34:53.9151410Z # The slow sampler does not implement
sampling, so we won't test it.
2025-11-13T00:34:53.9153370Z return
2025-11-13T00:34:53.9153980Z
2025-11-13T00:34:53.9155420Z # Test that sampled state timings are
close to their expected values.
2025-11-13T00:34:53.9156630Z c = CounterName(
2025-11-13T00:34:53.9158070Z 'process-timers-msecs',
step_name='step1', stage_name='timer')
2025-11-13T00:34:53.9160900Z expected_counter_values = {
2025-11-13T00:34:53.9161540Z c: state_duration_ms,
2025-11-13T00:34:53.9161870Z }
2025-11-13T00:34:53.9198870Z for counter in
counter_factory.get_counters():
2025-11-13T00:34:53.9199820Z self.assertIn(counter.name,
expected_counter_values)
2025-11-13T00:34:53.9201380Z expected_value =
expected_counter_values[counter.name]
2025-11-13T00:34:53.9201880Z actual_value = counter.value()
2025-11-13T00:34:53.9202310Z deviation = float(abs(actual_value -
expected_value)) / expected_value
2025-11-13T00:34:53.9203330Z _LOGGER.info('Sampling deviation from
expectation: %f', deviation)
2025-11-13T00:34:53.9203910Z self.assertGreater(actual_value,
expected_value * (1.0 - margin_of_error))
2025-11-13T00:34:53.9204430Z > self.assertLess(actual_value,
expected_value * (1.0 + margin_of_error))
2025-11-13T00:34:53.9204860Z E AssertionError: 240 not less than 125.0
2025-11-13T00:34:53.9205140Z
2025-11-13T00:34:53.9205430Z
apache_beam/runners/worker/statesampler_test.py:168: AssertionError
2025-11-13T00:34:53.9205890Z ------------------------------ Captured log
call -------------------------------
2025-11-13T00:34:53.9206560Z INFO
apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling
deviation from expectation: 1.340000
2025-11-13T00:34:53.9207610Z INFO
apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling
deviation from expectation: 1.480000
2025-11-13T00:34:53.9208450Z INFO
apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling
deviation from expectation: 1.40000
```
This suggests that the `test_process_timers_metric_is_recorded` test is
flaky. PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]