[
https://issues.apache.org/jira/browse/BEAM-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226845#comment-17226845
]
Beam JIRA Bot commented on BEAM-10848:
--------------------------------------
This issue is assigned but has not received an update in 30 days so it has been
labeled "stale-assigned". If you are still working on the issue, please give an
update and remove the label. If you are no longer working on the issue, please
unassign so someone else may work on it. In 7 days the issue will be
automatically unassigned.
> Gauge metrics error when setting timers
> ---------------------------------------
>
> Key: BEAM-10848
> URL: https://issues.apache.org/jira/browse/BEAM-10848
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Reporter: Maximilian Michels
> Assignee: maghamravikiran
> Priority: P2
> Labels: stale-assigned
> Time Spent: 3h 40m
> Remaining Estimate: 0h
>
> Gauges are affected by setting timers leading to {{None}} values:
> {noformat}
> ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147.
> Original traceback is
> Traceback (most recent call last):
> File
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
> response = task()
> File
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 516, in process_bundle
> monitoring_infos = bundle_processor.monitoring_infos()
> File
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1107, in monitoring_infos
> op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
> File "apache_beam/runners/worker/operations.py", line 340, in
> apache_beam.runners.worker.operations.Operation.monitoring_infos
> File "apache_beam/runners/worker/operations.py", line 347, in
> apache_beam.runners.worker.operations.Operation.monitoring_infos
> File "apache_beam/runners/worker/operations.py", line 386, in
> apache_beam.runners.worker.operations.Operation.user_monitoring_infos
> File "apache_beam/metrics/execution.py", line 261, in
> apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos
> File "apache_beam/metrics/cells.py", line 222, in
> apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info
> File
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py",
> line 222, in int64_user_gauge
> payload = _encode_gauge(coder, timestamp, value)
> File
> "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py",
> line 397, in _encode_gauge
> coder.get_impl().encode_to_stream(value, stream, True)
> File "apache_beam/coders/coder_impl.py", line 690, in
> apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
> File "apache_beam/coders/coder_impl.py", line 692, in
> apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
> TypeError: an integer is required
> {noformat}
> The transform has the following structure and errors when the lines following
> {{TODO}} have been uncommented:
> {code:python}
> class StatefulOperation(beam.DoFn):
> def __init__(self, state_size_per_key_bytes, use_processing_timer=False):
> self.state_size_per_key_bytes = state_size_per_key_bytes
> self.str_coder = StrUtf8Coder().get_impl()
> self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes')
> self.elements_gauge = Metrics.gauge('synthetic', 'state_elements')
> self.use_processing_timer = use_processing_timer
> state_spec = userstate.BagStateSpec('state', StrUtf8Coder())
> state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes',
> combine_fn=sum)
> state_spec3 = userstate.CombiningValueStateSpec('num_state_entries',
> combine_fn=sum)
> event_timer_spec = userstate.TimerSpec('event_timer',
> beam.TimeDomain.WATERMARK)
> processing_timer_spec = userstate.TimerSpec('proc_timer',
> beam.TimeDomain.REAL_TIME)
> def process(self,
> element,
> timestamp=beam.DoFn.TimestampParam,
> state=beam.DoFn.StateParam(state_spec),
> state_num_bytes=beam.DoFn.StateParam(state_spec2),
> state_num_entries=beam.DoFn.StateParam(state_spec3),
> event_timer=beam.DoFn.TimerParam(event_timer_spec),
> processing_timer=beam.DoFn.TimerParam(processing_timer_spec)):
> # Append stringified element to state until the threshold has been reached
> # The cleanup timer will then clean up and the process will repeat.
> if state_num_bytes.read() <= self.state_size_per_key_bytes:
> state_element = str(element)
> state.add(state_element)
> bytes_added = len(self.str_coder.encode_nested(state_element))
> state_num_bytes.add(bytes_added)
> state_num_entries.add(1)
> timer = processing_timer if self.use_processing_timer else event_timer
> # Set a timer which will clear the state if it grows too large
> timer.set(timestamp.micros // 1000000 + 5)
> # Metrics
> # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
> #self.bytes_gauge.set(state_num_bytes.read())
> #self.elements_gauge.set(state_num_entries.read())
> yield element
> @userstate.on_timer(event_timer_spec)
> def on_event_timer(self,
> key=beam.DoFn.KeyParam,
> state=beam.DoFn.StateParam(state_spec),
> state_num_bytes=beam.DoFn.StateParam(state_spec2),
> state_num_entries=beam.DoFn.StateParam(state_spec3)):
> return self.timer_callback(state, state_num_bytes, state_num_entries)
> @userstate.on_timer(processing_timer_spec)
> def on_processing_timer(self,
> state=beam.DoFn.StateParam(state_spec),
> state_num_bytes=beam.DoFn.StateParam(state_spec2),
>
> state_num_entries=beam.DoFn.StateParam(state_spec3)):
> return self.timer_callback(state, state_num_bytes, state_num_entries)
> def timer_callback(self, state, state_num_bytes, state_num_entries):
> count = 0
> for _ in state.read():
> count += 1
> state_count = state_num_entries.read()
> if count != state_count:
> raise Exception("Actual number of entries (%s) did not match expected
> (%s)" % (count, state_count))
> # Reset state bags
> state.clear()
> state_num_bytes.clear()
> state_num_entries.clear()
> # Reset metrics
> # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
> #self.bytes_gauge.set(0)
> #self.elements_gauge.set(0)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)