tvalentyn commented on code in PR #36737:
URL: https://github.com/apache/beam/pull/36737#discussion_r2524596345


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -444,12 +445,19 @@ def __init__(
     self.metrics_container = MetricsContainer(self.name_context.metrics_name())
 
     self.state_sampler = state_sampler
-    self.scoped_start_state = self.state_sampler.scoped_state(
-        self.name_context, 'start', metrics_container=self.metrics_container)
-    self.scoped_process_state = self.state_sampler.scoped_state(
-        self.name_context, 'process', metrics_container=self.metrics_container)
-    self.scoped_finish_state = self.state_sampler.scoped_state(
-        self.name_context, 'finish', metrics_container=self.metrics_container)
+    if self.state_sampler:

Review Comment:
   > While I was debugging CI failures, I noticed that the state_sampler is 
None when operations are run in a non-streaming context. The autocomplete_test 
was an example of this.
   
   I am not seeing these errors - am I missing something?
   
   I do notice the following errors:
   
   ```
   2025-11-13T00:34:53.8931580Z =================================== FAILURES 
===================================
   2025-11-13T00:34:53.8932180Z ___________ 
StateSamplerTest.test_process_timers_metric_is_recorded ____________
   2025-11-13T00:34:53.8933040Z [gw3] darwin -- Python 3.11.9 
/Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
   2025-11-13T00:34:53.8933480Z 
   2025-11-13T00:34:53.8933840Z self = 
<apache_beam.runners.worker.statesampler_test.StateSamplerTest 
testMethod=test_process_timers_metric_is_recorded>
   2025-11-13T00:34:53.8934320Z 
   2025-11-13T00:34:53.8934560Z     @retry(reraise=True, 
stop=stop_after_attempt(3))
   2025-11-13T00:34:53.8934960Z     def 
test_process_timers_metric_is_recorded(self):
   2025-11-13T00:34:53.8935330Z       """
   2025-11-13T00:34:53.8936770Z       Tests that the 'process-timers-msecs' 
metric is correctly recorded
   2025-11-13T00:34:53.8937260Z       when a state sampler is active.
   2025-11-13T00:34:53.8937700Z       """
   2025-11-13T00:34:53.8938000Z       # Set up a real state sampler and counter 
factory.
   2025-11-13T00:34:53.8938440Z       counter_factory = CounterFactory()
   2025-11-13T00:34:53.8938970Z       sampler = statesampler.StateSampler(
   2025-11-13T00:34:53.8939420Z           'test_stage', counter_factory, 
sampling_period_ms=1)
   2025-11-13T00:34:53.8939770Z     
   2025-11-13T00:34:53.8941420Z       state_duration_ms = 100
   2025-11-13T00:34:53.8942440Z       margin_of_error = 0.25
   2025-11-13T00:34:53.8943340Z     
   2025-11-13T00:34:53.8944230Z       # Run a workload inside the 
'process-timers' scoped state.
   2025-11-13T00:34:53.8945180Z       sampler.start()
   2025-11-13T00:34:53.8946500Z       with sampler.scoped_state('test_step', 
'process-timers'):
   2025-11-13T00:34:53.8947520Z         time.sleep(state_duration_ms / 1000.0)
   2025-11-13T00:34:53.8948220Z       sampler.stop()
   2025-11-13T00:34:53.8949110Z       sampler.commit_counters()
   2025-11-13T00:34:53.8949860Z     
   2025-11-13T00:34:53.8950580Z       if not statesampler.FAST_SAMPLER:
   2025-11-13T00:34:53.8950920Z         return
   2025-11-13T00:34:53.8951840Z     
   2025-11-13T00:34:53.8952350Z       # Verify that the counter was created 
with the correct name and value.
   2025-11-13T00:34:53.8952790Z       expected_counter_name = CounterName(
   2025-11-13T00:34:53.8953250Z           'process-timers-msecs', 
step_name='test_step', stage_name='test_stage')
   2025-11-13T00:34:53.8953720Z     
   2025-11-13T00:34:53.8954090Z       # Find the specific counter we are 
looking for.
   2025-11-13T00:34:53.8954450Z       found_counter = None
   2025-11-13T00:34:53.8954800Z       for counter in 
counter_factory.get_counters():
   2025-11-13T00:34:53.8955480Z         if counter.name == 
expected_counter_name:
   2025-11-13T00:34:53.8955830Z           found_counter = counter
   2025-11-13T00:34:53.8956180Z           break
   2025-11-13T00:34:53.8956550Z     
   2025-11-13T00:34:53.8956860Z       self.assertIsNotNone(
   2025-11-13T00:34:53.8957230Z           found_counter,
   2025-11-13T00:34:53.8957580Z           f"The expected counter 
'{expected_counter_name}' was not created.")
   2025-11-13T00:34:53.8957970Z     
   2025-11-13T00:34:53.8959130Z       # Check that its value is approximately 
correct.
   2025-11-13T00:34:53.8959570Z       actual_value = found_counter.value()
   2025-11-13T00:34:53.8959980Z       expected_value = state_duration_ms
   2025-11-13T00:34:53.8960370Z       self.assertGreater(
   2025-11-13T00:34:53.8960660Z           actual_value,
   2025-11-13T00:34:53.8960990Z           expected_value * (1.0 - 
margin_of_error),
   2025-11-13T00:34:53.8961450Z           "The timer metric was lower than 
expected.")
   2025-11-13T00:34:53.8961800Z >     self.assertLess(
   2025-11-13T00:34:53.8962150Z           actual_value,
   2025-11-13T00:34:53.8962460Z           expected_value * (1.0 + 
margin_of_error),
   2025-11-13T00:34:53.8962870Z           "The timer metric was higher than 
expected.")
   2025-11-13T00:34:53.8963770Z E     AssertionError: 189 not less than 125.0 : 
The timer metric was higher than expected.
   2025-11-13T00:34:53.8964090Z 
   2025-11-13T00:34:53.8964380Z 
apache_beam/runners/worker/statesampler_test.py:216: AssertionError
   2025-11-13T00:34:53.8964950Z _____________________ 
StateSamplerTest.test_timer_sampler ______________________
   2025-11-13T00:34:53.8965520Z [gw3] darwin -- Python 3.11.9 
/Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
   2025-11-13T00:34:53.8965930Z 
   2025-11-13T00:34:53.8966240Z self = 
<apache_beam.runners.worker.statesampler_test.StateSamplerTest 
testMethod=test_timer_sampler>
   2025-11-13T00:34:53.8966710Z 
   2025-11-13T00:34:53.8966980Z     @retry(reraise=True, 
stop=stop_after_attempt(3))
   2025-11-13T00:34:53.8967370Z     def test_timer_sampler(self):
   2025-11-13T00:34:53.8967730Z       # Set up state sampler.
   2025-11-13T00:34:53.8968130Z       counter_factory = CounterFactory()
   2025-11-13T00:34:53.8968520Z       sampler = statesampler.StateSampler(
   2025-11-13T00:34:53.8969930Z           'timer', counter_factory, 
sampling_period_ms=1)
   2025-11-13T00:34:53.8970590Z     
   2025-11-13T00:34:53.8970840Z       # Duration of the timer processing.
   2025-11-13T00:34:53.9067190Z       state_duration_ms = 100
   2025-11-13T00:34:53.9067910Z       margin_of_error = 0.25
   2025-11-13T00:34:53.9068290Z     
   2025-11-13T00:34:53.9074160Z       sampler.start()
   2025-11-13T00:34:53.9143540Z       with sampler.scoped_state('step1', 
'process-timers'):
   2025-11-13T00:34:53.9146170Z         time.sleep(state_duration_ms / 1000)
   2025-11-13T00:34:53.9146930Z       sampler.stop()
   2025-11-13T00:34:53.9147550Z       sampler.commit_counters()
   2025-11-13T00:34:53.9149250Z     
   2025-11-13T00:34:53.9150470Z       if not statesampler.FAST_SAMPLER:
   2025-11-13T00:34:53.9151410Z         # The slow sampler does not implement 
sampling, so we won't test it.
   2025-11-13T00:34:53.9153370Z         return
   2025-11-13T00:34:53.9153980Z     
   2025-11-13T00:34:53.9155420Z       # Test that sampled state timings are 
close to their expected values.
   2025-11-13T00:34:53.9156630Z       c = CounterName(
   2025-11-13T00:34:53.9158070Z           'process-timers-msecs', 
step_name='step1', stage_name='timer')
   2025-11-13T00:34:53.9160900Z       expected_counter_values = {
   2025-11-13T00:34:53.9161540Z           c: state_duration_ms,
   2025-11-13T00:34:53.9161870Z       }
   2025-11-13T00:34:53.9198870Z       for counter in 
counter_factory.get_counters():
   2025-11-13T00:34:53.9199820Z         self.assertIn(counter.name, 
expected_counter_values)
   2025-11-13T00:34:53.9201380Z         expected_value = 
expected_counter_values[counter.name]
   2025-11-13T00:34:53.9201880Z         actual_value = counter.value()
   2025-11-13T00:34:53.9202310Z         deviation = float(abs(actual_value - 
expected_value)) / expected_value
   2025-11-13T00:34:53.9203330Z         _LOGGER.info('Sampling deviation from 
expectation: %f', deviation)
   2025-11-13T00:34:53.9203910Z         self.assertGreater(actual_value, 
expected_value * (1.0 - margin_of_error))
   2025-11-13T00:34:53.9204430Z >       self.assertLess(actual_value, 
expected_value * (1.0 + margin_of_error))
   2025-11-13T00:34:53.9204860Z E       AssertionError: 240 not less than 125.0
   2025-11-13T00:34:53.9205140Z 
   2025-11-13T00:34:53.9205430Z 
apache_beam/runners/worker/statesampler_test.py:168: AssertionError
   2025-11-13T00:34:53.9205890Z ------------------------------ Captured log 
call -------------------------------
   2025-11-13T00:34:53.9206560Z INFO     
apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling 
deviation from expectation: 1.340000
   2025-11-13T00:34:53.9207610Z INFO     
apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling 
deviation from expectation: 1.480000
   2025-11-13T00:34:53.9208450Z INFO     
apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling 
deviation from expectation: 1.40000
   ```
   
   This suggests that the `test_process_timers_metric_is_recorded` test is 
flaky. PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to