tvalentyn opened a new issue, #24178: URL: https://github.com/apache/beam/issues/24178
### What happened? Saw in https://github.com/apache/beam/pull/24106. @damccorm I haven't seen this error before, could you please take a look whether this is a new error by trying to repro it in several runs? Note that this test uses slow coders (as per stacktrace), so to repro we may need to install apache-beam from sources in an environment without cython. If we can't repro and the error doesn't look obvious we can wait and see. Thanks ``` ______________ FnApiRunnerTestWithDisabledCaching.test_reshuffle _______________ [gw2] darwin -- Python 3.8.14 /Users/runner/work/beam/beam/sdks/python/target/.tox/py38/bin/python self = <apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithDisabledCaching testMethod=test_reshuffle> def test_reshuffle(self): with self.create_pipeline() as p: > assert_that( p | beam.Create([1, 2, 3]) | beam.Reshuffle(), equal_to([1, 2, 3])) apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1052: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/pipeline.py:600: in __exit__ self.result = self.run() apache_beam/pipeline.py:577: in run return self.runner.run_pipeline(self, self._options) apache_beam/runners/portability/fn_api_runner/fn_runner.py:201: in run_pipeline self._latest_run_result = self.run_via_runner_api( apache_beam/runners/portability/fn_api_runner/fn_runner.py:222: in run_via_runner_api return self.run_stages(stage_context, stages) apache_beam/runners/portability/fn_api_runner/fn_runner.py:453: in run_stages bundle_results = self._execute_bundle( apache_beam/runners/portability/fn_api_runner/fn_runner.py:781: in _execute_bundle self._run_bundle( apache_beam/runners/portability/fn_api_runner/fn_runner.py:1010: in _run_bundle result, splits = bundle_manager.process_bundle( apache_beam/runners/portability/fn_api_runner/fn_runner.py:[137](https://github.com/apache/beam/actions/runs/3471900531/jobs/5802006567#step:6:138)8: in process_bundle self.bundle_context_manager.get_buffer( apache_beam/runners/portability/fn_api_runner/execution.py:244: in append windowed_key_value = coder_impl.decode_from_stream(input_stream, True) apache_beam/coders/coder_impl.py:1472: in decode_from_stream value = self._value_coder.decode_from_stream(in_stream, nested) apache_beam/coders/coder_impl.py:1015: in decode_from_stream return self._construct_from_components([ apache_beam/coders/coder_impl.py:1016: in <listcomp> c.decode_from_stream( apache_beam/coders/coder_impl.py:625: in decode_from_stream return in_stream.read_all(nested) apache_beam/coders/slow_stream.py:140: in read_all return self.read(self.read_var_int64() if nested else self.size()) apache_beam/coders/slow_stream.py:151: in read_var_int64 byte = self.read_byte() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.coders.slow_stream.InputStream object at 0x12ff52d00> def read_byte(self): # type: () -> int self.pos += 1 > return self.data[self.pos - 1] E IndexError: index out of range apache_beam/coders/slow_stream.py:[145](https://github.com/apache/beam/actions/runs/3471900531/jobs/5802006567#step:6:146): IndexError ----------------------------- Captured stderr call ----------------------------- Traceback (most recent call last): File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 648, in process_bundle self.bundle_processor_cache.release(instruction_id) File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 528, in release self.active_bundle_processors.pop(instruction_id)) KeyError: 'bundle_681' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute response = task() File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction return getattr(self, request_type)( File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 652, in process_bundle self.bundle_processor_cache.discard(instruction_id) File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 507, in discard processor = self.active_bundle_processors[instruction_id][1] KeyError: 'bundle_681' ------------------------------ Captured log call ------------------------------- ERROR apache_beam.runners.worker.sdk_worker:sdk_worker.py:291 Error processing instruction bundle_681. Original traceback is Traceback (most recent call last): File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 648, in process_bundle self.bundle_processor_cache.release(instruction_id) File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 528, in release self.active_bundle_processors.pop(instruction_id)) KeyError: 'bundle_681' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute response = task() File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction return getattr(self, request_type)( File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 652, in process_bundle self.bundle_processor_cache.discard(instruction_id) File "/Users/runner/work/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 507, in discard processor = self.active_bundle_processors[instruction_id][1] KeyError: 'bundle_681' ``` ### Issue Priority Priority: 1 ### Issue Component Component: sdk-py-core -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
