Hi,

I'm trying to set a timer from a timer callback in the Python SDK:

class MyFn(beam.DoFn):
  timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)

  def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
    self.key = element[0]
    timer.set(0)

  @userstate.on_timer(timer_spec)
  def process_timer(self, timer=beam.DoFn.TimerParam(timer_spec)):
    timer.set(0)

This yields the following Python stack trace:

INFO:apache_beam.utils.subprocess_server:Caused by:
java.lang.RuntimeException: Error received from SDK harness for
instruction 4: Traceback (most recent call last):
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
INFO:apache_beam.utils.subprocess_server: response = task()
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
INFO:apache_beam.utils.subprocess_server: lambda:
self.create_worker().do_instruction(request), request)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
INFO:apache_beam.utils.subprocess_server: getattr(request,
request_type), request.instruction_id)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
INFO:apache_beam.utils.subprocess_server:
bundle_processor.process_bundle(instruction_id))
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/worker/bundle_processor.py", line 910, in
process_bundle
INFO:apache_beam.utils.subprocess_server: element.timer_family_id,
timer_data)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/worker/operations.py", line 688, in process_timer
INFO:apache_beam.utils.subprocess_server: timer_data.fire_timestamp)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/common.py", line 990, in process_user_timer
INFO:apache_beam.utils.subprocess_server: self._reraise_augmented(exn)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/common.py", line 1043, in _reraise_augmented
INFO:apache_beam.utils.subprocess_server: raise_with_traceback(new_exn)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/common.py", line 988, in process_user_timer
INFO:apache_beam.utils.subprocess_server:
self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/common.py", line 517, in invoke_user_timer
INFO:apache_beam.utils.subprocess_server: self.user_state_context, key,
window, timestamp))
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/common.py", line 1093, in process_outputs
INFO:apache_beam.utils.subprocess_server: for result in results:
INFO:apache_beam.utils.subprocess_server: File
"/Users/max/Dev/beam/sdks/python/apache_beam/testing/load_tests/pardo_test.py",
line 185, in process_timer
INFO:apache_beam.utils.subprocess_server: timer.set(0)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/runners/worker/bundle_processor.py", line 589, in set
INFO:apache_beam.utils.subprocess_server:
self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/coders/coder_impl.py", line 651, in encode_to_stream
INFO:apache_beam.utils.subprocess_server: value.hold_timestamp, out, True)
INFO:apache_beam.utils.subprocess_server: File
"apache_beam/coders/coder_impl.py", line 608, in encode_to_stream
INFO:apache_beam.utils.subprocess_server: millis = value.micros // 1000
INFO:apache_beam.utils.subprocess_server:AttributeError: 'NoneType'
object has no attribute 'micros' [while running 'GenerateLoad']

Looking at the code base, I'm not sure we have tests for timer output
timestamps. Am I missing something?

-Max

Reply via email to