Stephan Ewen created FLINK-23261: ------------------------------------ Summary: StateFun - HTTP State Access Interface Key: FLINK-23261 URL: https://issues.apache.org/jira/browse/FLINK-23261 Project: Flink Issue Type: Improvement Components: Stateful Functions Reporter: Stephan Ewen
h2. Functionality To improve operations of StateFun applications, we should offer an interface to query and manipulate state entries. This can be used for exploration and debugging purposes, and to repair state if the application state needs to be corrected. To make this simple, I would suggest an HTTP REST interface: - GET: read state entry for key - PUT: set/overwrite state for key - DELETE: drop state entry for key The URLs could be: {{http(s)://statefun-service/state/<namespace>/<type>/<name>/<statename>}}, where the string {{<namespace>/<type>/<name>}} is the fully-qualified address of the target, and the {{statename}} is the name under which that persistent state is stored. Keys are always UTF-8 strings in StateFun, so they can be encoded in the URL. For the responses, we would use the common codes 200 for GET/PUT/DELETE success and 404 for GET/DELETE not found. The state values, as returned from GET requests, would be generally just the bytes, and not interpreted by this request handling. The integrate of the StateFun type system and HTTP content types (mime types) is up for further discussion. One option is set the content type response header to {{"statefun/<statefun_type>"}}, where all non-simple types map to {{Content-Type: application/octet-stream}}. We may make an exception for strings which could be returned as {{Content-Type: text/plain; charset=UTF-8}}. Later refinement is possible, like auto-stringifying contents when the request indicates to only accept {{text/plain}} responses. h2. Failure Guarantees The initial failure guarantees for PUT/DELETE would be none - the requests would be handled best effort. We can easily extend this later in one of two ways: - Delay responses to the HTTP requests until the next checkpoint is complete. That makes synchronous interaction easy and sounds like a good match for a more admin-style interface. - Return the current checkpoint ID and offer a way to poll until the next checkpoint is completed. This avoid blocking requests, but puts more burden on the caller. Given the expected nature of the use cases for PUT/DELETE are more of a "admin/fix" nature, I would suggest to go with synchronous requests, for simplicity. h2. Implementation There are two options to implement this: (1) A Source/Sink (Ingress/Egress) pair (2) An Operator Coordinator with HTTP Requests *Option (1) - Source/Sink pair* We would implement an specific source that is both an ingress and an egress. The source would spawn a HTTP server (singleton per TM process). Requests would be handled as follows: - Am HTTP request gets a generated correlation-ID. - The source injects a new message type (a "control message") into the stream. That message holds the Correlation-ID, the parallel subtask index of the originating source, and the target address and state name. - The function dispatcher handled these message in a special way, retrieving the state and sending an Egress message with the Correlation-ID to the parallel subtask of the egress as indicated by the message's subtask index. - The Egress (which is the same instance as the ingress source) uses to correlation ID to respond to the request. Advantages: - No changes necessary in Flink - Might sustain higher throughput, due to multiple HTTP endpoints Disadvantages: - Additional HTTP servers and ports require more setup (like service definitions on K8s). - Need to introduce new control message type and extend function dispatcher to handle them. - Makes a hard assumption that sources run on all slots. Needs "ugly" singleton hack to start only one server per TM process. *Option (2) - Operator Coordinator* Operator Coordinators are instances that run on the {{JobManager}} and can communicate with the Tasks via RPC. Coordinators can receive calls from HTTP handlers at the JobManager's HTTP endpoint. An example for this is the Result Fetching through HTTP/OperatorCoordinator requests. We would need a patch to Flink to allow registering custom URLs and passing the path as a parameter to the request. The RPCs can be processed in the mailbox on the Tasks, making them thread safe. This would also completely avoid the round-trip (source-to-sink) problem, the tasks simply need to send a response back to the RPC. Advantages: - Reuse existing HTTP Endpoint and port. No need to have an additional HTTP server and port and service, for this admin-style requests, this approach re-uses Flink's admin HTTP endpoint. - No need for singleton HTTP Server logic in Tasks - Does require the assumption that all TMs run an instance of all operators. - No need for "control messages" and special handling logic for those. - No need for implementing round-trip routing Disadvantages: - Requires change in Flink - Only one HTTP Server (on Job Manager) means it will not be a high-throughput solution. -- This message was sent by Atlassian Jira (v8.3.4#803005)