[
https://issues.apache.org/jira/browse/FLINK-28747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574267#comment-17574267
]
Till Rohrmann edited comment on FLINK-28747 at 8/2/22 2:57 PM:
---------------------------------------------------------------
Thanks for reporting this issue [~stepweiwu]. Without knowing the exact details
of your StateFun job, I suspect that it could be caused by Kafka messages that
only have an empty string as a key specified. This could also explain why you
are seeing the problem appear across different services consuming from the same
topic.
For some context, StateFun uses the key of a Kafka message as the target id for
the StatefulFunction invocation. The target id identifies an instance of the
StatefulFunction so that the runtime can associate state with it.
Empty strings should be valid keys, so I believe that this is a bug in the
Python SDK where we check {{if not target_id: raise ValueError}} that is
executed if the key is empty
(https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/messages.py#L41).
As a workaround (if you can control it), you could require your Kafka message
to require a non-empty key. The proper fix would be to fix the Python SDK.
was (Author: till.rohrmann):
Thanks for reporting this issue [~stepweiwu]. Without knowing the exact details
of your StateFun job, I suspect that it could be caused by Kafka messages that
only have an empty string as a key specified. This could also explain why you
are seeing the problem appear across different services consuming from the same
topic.
Empty strings should be valid keys, so I believe that this is a bug in the
Python SDK where we check {{if not target_id: raise ValueError}} that is
executed if the key is empty
(https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/messages.py#L41).
As a workaround (if you can control it), you could require your Kafka message
to require a non-empty key. The proper fix would be to fix the Python SDK.
> "target_id can not be missing" in HTTP statefun request
> -------------------------------------------------------
>
> Key: FLINK-28747
> URL: https://issues.apache.org/jira/browse/FLINK-28747
> Project: Flink
> Issue Type: Bug
> Components: Stateful Functions
> Affects Versions: statefun-3.0.0, statefun-3.2.0, statefun-3.1.1
> Reporter: Stephan Weinwurm
> Priority: Major
>
> Hi all,
> We've suddenly started to see the following exception in our HTTP statefun
> functions endpoints:
> {code}Traceback (most recent call last):
> File
> "/src/.venv/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py",
> line 403, in run_asgi
> result = await app(self.scope, self.receive, self.send)
> File
> "/src/.venv/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py",
> line 78, in __call__
> return await self.app(scope, receive, send)
> File "/src/worker/baseplate_asgi/asgi/baseplate_asgi_middleware.py", line
> 37, in __call__
> await span_processor.execute()
> File "/src/worker/baseplate_asgi/asgi/asgi_http_span_processor.py", line
> 61, in execute
> raise e
> File "/src/worker/baseplate_asgi/asgi/asgi_http_span_processor.py", line
> 57, in execute
> await self.app(self.scope, self.receive, self.send)
> File "/src/.venv/lib/python3.9/site-packages/starlette/applications.py",
> line 124, in __call__
> await self.middleware_stack(scope, receive, send)
> File
> "/src/.venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line
> 184, in __call__
> raise exc
> File
> "/src/.venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line
> 162, in __call__
> await self.app(scope, receive, _send)
> File
> "/src/.venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py",
> line 75, in __call__
> raise exc
> File
> "/src/.venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py",
> line 64, in __call__
> await self.app(scope, receive, sender)
> File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line
> 680, in __call__
> await route.handle(scope, receive, send)
> File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line
> 275, in handle
> await self.app(scope, receive, send)
> File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line
> 65, in app
> response = await func(request)
> File "/src/worker/baseplate_statefun/server/asgi/make_statefun_handler.py",
> line 25, in statefun_handler
> result = await handler.handle_async(request_body)
> File "/src/.venv/lib/python3.9/site-packages/statefun/request_reply_v3.py",
> line 262, in handle_async
> msg = Message(target_typename=sdk_address.typename,
> target_id=sdk_address.id,
> File "/src/.venv/lib/python3.9/site-packages/statefun/messages.py", line
> 42, in __init__
> raise ValueError("target_id can not be missing"){code}
> Interestingly, this has started to happen in three separate Flink deployments
> at the very same time. The only thing in common between the three deployments
> is that they consume the same Kafka topics.
> No deployments have happened when the issue started happening which was on
> July 28th 3:05PM. We have since been continuously seeing the error.
> We were also able to extract the request that Flink sends to the HTTP
> statefun endpoint:
> {code}{'invocation': {'target': {'namespace': 'com.x.dummy', 'type':
> 'dummy'}, 'invocations': [{'argument': {'typename':
> 'type.googleapis.com/v2_event.Event', 'has_value': True, 'value':
> '-redicated-'}}]}}
> {code}
> As you can see, no `id` field is present in the `invocation.target` object or
> the `target_id` was an empty string.
>
> This is our module.yaml from one of the Flink deployments:
>
> {code}
> version: "3.0"
> module:
> meta:
> type: remote
> spec:
> endpoints:
> - endpoint:
> meta:
> kind: io.statefun.endpoints.v1/http
> spec:
> functions: com.x.dummy/dummy
> urlPathTemplate: [http://x-worker-dummy.x-functions:9090/statefun]
> timeouts:
> call: 2 min
> read: 2 min
> write: 2 min
> maxNumBatchRequests: 100
> ingresses:
> - ingress:
> meta:
> type: io.statefun.kafka/ingress
> id: com.x/ingress
> spec:
> address: x-kafka-0.x.ue1.x.net:9092
> consumerGroupId: x-worker-dummy
> topics:
> - topic: v2_post_events
> valueType: type.googleapis.com/v2_event.Event
> targets:
> - com.x.dummy/dummy
> startupPosition:
> type: group-offsets
> autoOffsetResetPosition: earliest
> {code}
>
> Can you please help us investigate as this is critically impacting our prod
> setup?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)