tvalentyn opened a new issue, #24178:
URL: https://github.com/apache/beam/issues/24178

   ### What happened?
   
   Saw in https://github.com/apache/beam/pull/24106.
   
   @damccorm I haven't seen this error before, could you please take a look 
whether this is a new error by trying to repro it in several runs?
   
   Note that this test uses slow coders (as per stacktrace), so to repro we may 
need to install apache-beam from sources in an environment without cython.
   
   If we can't repro and the error doesn't look obvious we can wait and see.
   
   Thanks
   
   
   ```
   ______________ FnApiRunnerTestWithDisabledCaching.test_reshuffle 
_______________
   [gw2] darwin -- Python 3.8.14 
/Users/runner/work/beam/beam/sdks/python/target/.tox/py38/bin/python
   
   self = 
<apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithDisabledCaching
 testMethod=test_reshuffle>
   
       def test_reshuffle(self):
         with self.create_pipeline() as p:
   >       assert_that(
               p | beam.Create([1, 2, 3]) | beam.Reshuffle(), equal_to([1, 2, 
3]))
   
   apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1052: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   apache_beam/pipeline.py:600: in __exit__
       self.result = self.run()
   apache_beam/pipeline.py:577: in run
       return self.runner.run_pipeline(self, self._options)
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:201: in 
run_pipeline
       self._latest_run_result = self.run_via_runner_api(
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:222: in 
run_via_runner_api
       return self.run_stages(stage_context, stages)
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:453: in run_stages
       bundle_results = self._execute_bundle(
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:781: in 
_execute_bundle
       self._run_bundle(
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:1010: in 
_run_bundle
       result, splits = bundle_manager.process_bundle(
   
apache_beam/runners/portability/fn_api_runner/fn_runner.py:[137](https://github.com/apache/beam/actions/runs/3471900531/jobs/5802006567#step:6:138)8:
 in process_bundle
       self.bundle_context_manager.get_buffer(
   apache_beam/runners/portability/fn_api_runner/execution.py:244: in append
       windowed_key_value = coder_impl.decode_from_stream(input_stream, True)
   apache_beam/coders/coder_impl.py:1472: in decode_from_stream
       value = self._value_coder.decode_from_stream(in_stream, nested)
   apache_beam/coders/coder_impl.py:1015: in decode_from_stream
       return self._construct_from_components([
   apache_beam/coders/coder_impl.py:1016: in <listcomp>
       c.decode_from_stream(
   apache_beam/coders/coder_impl.py:625: in decode_from_stream
       return in_stream.read_all(nested)
   apache_beam/coders/slow_stream.py:140: in read_all
       return self.read(self.read_var_int64() if nested else self.size())
   apache_beam/coders/slow_stream.py:151: in read_var_int64
       byte = self.read_byte()
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.coders.slow_stream.InputStream object at 0x12ff52d00>
   
       def read_byte(self):
         # type: () -> int
         self.pos += 1
   >     return self.data[self.pos - 1]
   E     IndexError: index out of range
   
   
apache_beam/coders/slow_stream.py:[145](https://github.com/apache/beam/actions/runs/3471900531/jobs/5802006567#step:6:146):
 IndexError
   ----------------------------- Captured stderr call 
-----------------------------
   Traceback (most recent call last):
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 648, in process_bundle
       self.bundle_processor_cache.release(instruction_id)
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 528, in release
       self.active_bundle_processors.pop(instruction_id))
   KeyError: 'bundle_681'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 287, in _execute
       response = task()
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 360, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 596, in do_instruction
       return getattr(self, request_type)(
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 652, in process_bundle
       self.bundle_processor_cache.discard(instruction_id)
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 507, in discard
       processor = self.active_bundle_processors[instruction_id][1]
   KeyError: 'bundle_681'
   
   ------------------------------ Captured log call 
-------------------------------
   ERROR    apache_beam.runners.worker.sdk_worker:sdk_worker.py:291 Error 
processing instruction bundle_681. Original traceback is
   Traceback (most recent call last):
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 648, in process_bundle
       self.bundle_processor_cache.release(instruction_id)
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 528, in release
       self.active_bundle_processors.pop(instruction_id))
   KeyError: 'bundle_681'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 287, in _execute
       response = task()
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 360, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 596, in do_instruction
       return getattr(self, request_type)(
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 652, in process_bundle
       self.bundle_processor_cache.discard(instruction_id)
     File 
"/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 507, in discard
       processor = self.active_bundle_processors[instruction_id][1]
   KeyError: 'bundle_681'
   ```
   
   ### Issue Priority
   
   Priority: 1
   
   ### Issue Component
   
   Component: sdk-py-core


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to