Maximilian Michels created BEAM-9132:
----------------------------------------

             Summary: State request handler is removed prematurely when closing 
ActiveBundle
                 Key: BEAM-9132
                 URL: https://issues.apache.org/jira/browse/BEAM-9132
             Project: Beam
          Issue Type: Bug
          Components: java-fn-execution
            Reporter: Maximilian Michels
            Assignee: Maximilian Michels


We have observed these errors in a state-intense application: 

{noformat}
Error processing instruction 107. Original traceback is
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 780, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 659, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 880, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/common.py", line 895, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "redacted.py", line 56, in process
    recent_events_map = load_recent_events_map(recent_events_state)
  File "redacted.py", line 128, in _load_recent_events_map
    items_in_recent_events_bag = list(recent_events_state.read())
  File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
    for elem in self.first:
  File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
    self._state_key, self._coder_impl, is_cached=self._is_cached)
  File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
    self._materialize_iter(state_key, coder))
  File "apache_beam/runners/worker/sdk_worker.py", line 723, in 
_materialize_iter
    self._underlying.get_raw(state_key, continuation_token)
  File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
    continuation_token=continuation_token)))
  File "apache_beam/runners/worker/sdk_worker.py", line 637, in 
_blocking_request
    raise RuntimeError(response.error)
RuntimeError: Unknown process bundle instruction id '107'
{noformat}

Notice that the error is thrown on the Runner side. It seems to originate from 
the {{ActiveBundle}} de-registering the state request handler too early when 
the processing may still be going on in the SDK Harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to