I'll take a look. Thanks Max! On Wed, Apr 22, 2020 at 8:20 AM Maximilian Michels <[email protected]> wrote:
> Attempting to fix this here, if somebody could have a look: > https://github.com/apache/beam/pull/11492 > > On 22.04.20 17:10, Maximilian Michels wrote: > > Hi, > > > > I'm trying to set a timer from a timer callback in the Python SDK: > > > > class MyFn(beam.DoFn): > > timer_spec = userstate.TimerSpec('timer', > userstate.TimeDomain.WATERMARK) > > > > def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)): > > self.key = element[0] > > timer.set(0) > > > > @userstate.on_timer(timer_spec) > > def process_timer(self, timer=beam.DoFn.TimerParam(timer_spec)): > > timer.set(0) > > > > This yields the following Python stack trace: > > > > INFO:apache_beam.utils.subprocess_server:Caused by: > > java.lang.RuntimeException: Error received from SDK harness for > > instruction 4: Traceback (most recent call last): > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/worker/sdk_worker.py", line 245, in _execute > > INFO:apache_beam.utils.subprocess_server: response = task() > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> > > INFO:apache_beam.utils.subprocess_server: lambda: > > self.create_worker().do_instruction(request), request) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction > > INFO:apache_beam.utils.subprocess_server: getattr(request, > > request_type), request.instruction_id) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle > > INFO:apache_beam.utils.subprocess_server: > > bundle_processor.process_bundle(instruction_id)) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/worker/bundle_processor.py", line 910, in > > process_bundle > > INFO:apache_beam.utils.subprocess_server: element.timer_family_id, > > timer_data) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/worker/operations.py", line 688, in process_timer > > INFO:apache_beam.utils.subprocess_server: timer_data.fire_timestamp) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/common.py", line 990, in process_user_timer > > INFO:apache_beam.utils.subprocess_server: self._reraise_augmented(exn) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/common.py", line 1043, in _reraise_augmented > > INFO:apache_beam.utils.subprocess_server: raise_with_traceback(new_exn) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/common.py", line 988, in process_user_timer > > INFO:apache_beam.utils.subprocess_server: > > self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/common.py", line 517, in invoke_user_timer > > INFO:apache_beam.utils.subprocess_server: self.user_state_context, key, > > window, timestamp)) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/common.py", line 1093, in process_outputs > > INFO:apache_beam.utils.subprocess_server: for result in results: > > INFO:apache_beam.utils.subprocess_server: File > > > "/Users/max/Dev/beam/sdks/python/apache_beam/testing/load_tests/pardo_test.py", > > line 185, in process_timer > > INFO:apache_beam.utils.subprocess_server: timer.set(0) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/runners/worker/bundle_processor.py", line 589, in set > > INFO:apache_beam.utils.subprocess_server: > > self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/coders/coder_impl.py", line 651, in encode_to_stream > > INFO:apache_beam.utils.subprocess_server: value.hold_timestamp, out, > True) > > INFO:apache_beam.utils.subprocess_server: File > > "apache_beam/coders/coder_impl.py", line 608, in encode_to_stream > > INFO:apache_beam.utils.subprocess_server: millis = value.micros // 1000 > > INFO:apache_beam.utils.subprocess_server:AttributeError: 'NoneType' > > object has no attribute 'micros' [while running 'GenerateLoad'] > > > > Looking at the code base, I'm not sure we have tests for timer output > > timestamps. Am I missing something? > > > > -Max > > >
