[
https://issues.apache.org/jira/browse/FLINK-31043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu reassigned FLINK-31043:
-------------------------------
Assignee: Xingbo Huang
> KeyError exception is thrown in CachedMapState
> ----------------------------------------------
>
> Key: FLINK-31043
> URL: https://issues.apache.org/jira/browse/FLINK-31043
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.15.0
> Reporter: Dian Fu
> Assignee: Xingbo Huang
> Priority: Major
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
>
> Have seen the following exception in a PyFlink job which runs in Flink 1.15.
> It happens occasionally and may indicate a bug of the state cache of MapState:
> {code:java}
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 131: Traceback (most recent call last):
> File
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
> response = task()
> File
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 607, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 644, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1000, in process_bundle
> element.data)
> File
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 228, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 357, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 359, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 221, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 158, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 170, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> File
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py",
> line 417, in finish_bundle
> return self.group_agg_function.finish_bundle()
> File "pyflink/fn_execution/table/aggregate_fast.pyx", line 597, in
> pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle
> File "pyflink/fn_execution/table/aggregate_fast.pyx", line 652, in
> pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle
> File "pyflink/fn_execution/table/aggregate_fast.pyx", line 389, in
> pyflink.fn_execution.table.aggregate_fast.SimpleTableAggsHandleFunction.emit_value
> File
> "/tmp/pyflink/17360444-8c0b-46a5-90a4-689c376ea4ed/0e2967b5-181c-4663-bd7a-267d47509cf5/whms_dws_stock_python_sps_1_output.py",
> line 29, in emit_value
> File
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/state_data_view.py",
> line 147, in get
> return self._map_state.get(key)
> File
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py",
> line 915, in get
> return self.get_internal_state().get(key)
> File
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py",
> line 773, in get
> self._state_key, map_key, self._map_key_encoder, self._map_value_decoder)
> File
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py",
> line 418, in blocking_get
> cached_map_state.put(map_key, (exists, value))
> File
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py",
> line 319, in put
> super(CachedMapState, self).put(key, exists_and_value)
> File
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py",
> line 68, in put
> self._on_evict(name, value)
> File
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py",
> line 305, in on_evict
> self._cached_keys.remove(key)
> KeyError: 'SPAREPARTS_M11F010L4L1_01'
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)