[ 
https://issues.apache.org/jira/browse/BEAM-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated BEAM-10848:
--------------------------------------
    Status: Open  (was: Triage Needed)

> Gauge metrics error when setting timers
> ---------------------------------------
>
>                 Key: BEAM-10848
>                 URL: https://issues.apache.org/jira/browse/BEAM-10848
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>            Reporter: Maximilian Michels
>            Priority: P2
>
> Gauges are affected by setting timers leading to {{None}} values:
> {noformat}
> ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147. 
> Original traceback is
> Traceback (most recent call last):
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 253, in _execute
>     response = task()
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 516, in process_bundle
>     monitoring_infos = bundle_processor.monitoring_infos()
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1107, in monitoring_infos
>     op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
>   File "apache_beam/runners/worker/operations.py", line 340, in 
> apache_beam.runners.worker.operations.Operation.monitoring_infos
>   File "apache_beam/runners/worker/operations.py", line 347, in 
> apache_beam.runners.worker.operations.Operation.monitoring_infos
>   File "apache_beam/runners/worker/operations.py", line 386, in 
> apache_beam.runners.worker.operations.Operation.user_monitoring_infos
>   File "apache_beam/metrics/execution.py", line 261, in 
> apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos
>   File "apache_beam/metrics/cells.py", line 222, in 
> apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py",
>  line 222, in int64_user_gauge
>     payload = _encode_gauge(coder, timestamp, value)
>   File 
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py",
>  line 397, in _encode_gauge
>     coder.get_impl().encode_to_stream(value, stream, True)
>   File "apache_beam/coders/coder_impl.py", line 690, in 
> apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
>   File "apache_beam/coders/coder_impl.py", line 692, in 
> apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
> TypeError: an integer is required
> {noformat}
> The transform has the following structure and errors when the lines following 
> {{TODO}} have been uncommented:
> {code:python}
> class StatefulOperation(beam.DoFn):
>   def __init__(self, state_size_per_key_bytes, use_processing_timer=False):
>     self.state_size_per_key_bytes = state_size_per_key_bytes
>     self.str_coder = StrUtf8Coder().get_impl()
>     self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes')
>     self.elements_gauge = Metrics.gauge('synthetic', 'state_elements')
>     self.use_processing_timer = use_processing_timer
>   state_spec = userstate.BagStateSpec('state', StrUtf8Coder())
>   state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes', 
> combine_fn=sum)
>   state_spec3 = userstate.CombiningValueStateSpec('num_state_entries', 
> combine_fn=sum)
>   event_timer_spec = userstate.TimerSpec('event_timer', 
> beam.TimeDomain.WATERMARK)
>   processing_timer_spec = userstate.TimerSpec('proc_timer', 
> beam.TimeDomain.REAL_TIME)
>   def process(self,
>               element,
>               timestamp=beam.DoFn.TimestampParam,
>               state=beam.DoFn.StateParam(state_spec),
>               state_num_bytes=beam.DoFn.StateParam(state_spec2),
>               state_num_entries=beam.DoFn.StateParam(state_spec3),
>               event_timer=beam.DoFn.TimerParam(event_timer_spec),
>               processing_timer=beam.DoFn.TimerParam(processing_timer_spec)):
>     # Append stringified element to state until the threshold has been reached
>     # The cleanup timer will then clean up and the process will repeat.
>     if state_num_bytes.read() <= self.state_size_per_key_bytes:
>       state_element = str(element)
>       state.add(state_element)
>       bytes_added = len(self.str_coder.encode_nested(state_element))
>       state_num_bytes.add(bytes_added)
>       state_num_entries.add(1)
>       timer = processing_timer if self.use_processing_timer else event_timer
>       # Set a timer which will clear the state if it grows too large
>       timer.set(timestamp.micros // 1000000 + 5)
>     # Metrics
>     # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
>     #self.bytes_gauge.set(state_num_bytes.read())
>     #self.elements_gauge.set(state_num_entries.read())
>     yield element
>   @userstate.on_timer(event_timer_spec)
>   def on_event_timer(self,
>                      key=beam.DoFn.KeyParam,
>                      state=beam.DoFn.StateParam(state_spec),
>                      state_num_bytes=beam.DoFn.StateParam(state_spec2),
>                      state_num_entries=beam.DoFn.StateParam(state_spec3)):
>     return self.timer_callback(state, state_num_bytes, state_num_entries)
>   @userstate.on_timer(processing_timer_spec)
>   def on_processing_timer(self,
>                           state=beam.DoFn.StateParam(state_spec),
>                           state_num_bytes=beam.DoFn.StateParam(state_spec2),
>                           
> state_num_entries=beam.DoFn.StateParam(state_spec3)):
>     return self.timer_callback(state, state_num_bytes, state_num_entries)
>   def timer_callback(self, state, state_num_bytes, state_num_entries):
>     count = 0
>     for _ in state.read():
>       count += 1
>     state_count = state_num_entries.read()
>     if count != state_count:
>       raise Exception("Actual number of entries (%s) did not match expected 
> (%s)" % (count, state_count))
>     # Reset state bags
>     state.clear()
>     state_num_bytes.clear()
>     state_num_entries.clear()
>     # Reset metrics
>     # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
>     #self.bytes_gauge.set(0)
>     #self.elements_gauge.set(0)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to