[ 
https://issues.apache.org/jira/browse/BEAM-8157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reopened BEAM-8157:
--------------------------------------

Reopening. Unfortunately, this is still an issue and setting the encoding 
scheme to be NESTED by default, did not cure it. I'm getting the following when 
Python's {{FastPrimitivesCoder}} has been used for encoding a key for a state 
request:

{noformat}
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error received from SDK harness for instruction 4: Traceback (most recent call 
last):
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 168, in _execute
    response = task()
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 201, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 356, in do_instruction
    request.instruction_id)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 382, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 667, in process_bundle
    data.ptransform_id].process_encoded(data.data)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 143, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 255, in 
apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 256, in 
apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 593, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 594, in 
apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 776, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 782, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 849, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 780, in 
apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 659, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 880, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    def process_outputs(self, windowed_input_element, results):
  File "apache_beam/runners/common.py", line 895, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    for result in results:
  File "pricingrealtime/event_processing/stateful_event_processing.py", line 
55, in process
    recent_events_map = 
StatefulEventDoFn._load_recent_events_map(recent_events_state)
  File "pricingrealtime/event_processing/stateful_event_processing.py", line 
127, in _load_recent_events_map
    items_in_recent_events_bag = [e for e in recent_events_state.read()]
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 335, in __iter__
    for elem in self.first:
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 723, in _materialize_iter
    self._underlying.get_raw(state_key, continuation_token)
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 603, in get_raw
    continuation_token=continuation_token)))
  File 
"/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 637, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: java.lang.IllegalStateException: The current key '[1, -104, -97, 
-93, -34, -73, -128, -42, 36]' with key group index '274' does not belong to 
the key group range 'KeyGroupRange{startKeyGroup=153, endKeyGroup=154}'. Runner 
KeyCoder: LengthPrefixCoder(ByteArrayCoder). Ptransformid: 
ref_AppliedPTransform_process_events_with_stateful_dofn_23 Userstateid: 
recent_events
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:531)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.prepareStateBackend(ExecutableStageDoFnOperator.java:387)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:309)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:303)
        at 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handleGetRequest(StateRequestHandlers.java:468)
        at 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handle(StateRequestHandlers.java:415)
        at 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers$StateKeyTypeDelegatingStateRequestHandler.handle(StateRequestHandlers.java:206)
        at 
org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:130)
        at 
org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:118)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
 [while running 'process_events_with_stateful_dofn']
{noformat}

As long as we use standard coders, these errors do not occur. When an 
SDK-specific coder is used, then the nested encoding does not work properly in 
Python, i.e. Python does not add a length prefix but the Runner side has a 
{{LengthPrefixCoder(ByteArrayCoder)}}. The only way to fix this, is to remove 
the length prefix coder on the Runner side and use OUTER encoding for the key 
in state requests. This gives correct results for known and unknown coders.

> Key encoding for state requests is not consistent across SDKs
> -------------------------------------------------------------
>
>                 Key: BEAM-8157
>                 URL: https://issues.apache.org/jira/browse/BEAM-8157
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.13.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>             Fix For: 2.17.0
>
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> The Flink runner requires the internal key to be encoded without a length 
> prefix (OUTER context). The user state request handler exposes a serialized 
> version of the key to the Runner. This key is encoded with the NESTED context 
> which may add a length prefix. We need to convert it to OUTER context to 
> match the Flink runner's key encoding.
> So far this has not caused the Flink Runner to behave incorrectly. However, 
> with the upcoming support for Flink 1.9, the state backend will not accept 
> requests for keys not part of any key group/partition of the operator. This 
> is very likely to happen with the encoding not being consistent.
> **NOTE** This is only applicable to the Java SDK, as the Python SDK uses 
> OUTER encoding for the key in state requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to