Stephan Ewen created FLINK-23261:
------------------------------------

             Summary: StateFun - HTTP State Access Interface 
                 Key: FLINK-23261
                 URL: https://issues.apache.org/jira/browse/FLINK-23261
             Project: Flink
          Issue Type: Improvement
          Components: Stateful Functions
            Reporter: Stephan Ewen


h2. Functionality

To improve operations of StateFun applications, we should offer an interface to 
query and manipulate state entries.
 This can be used for exploration and debugging purposes, and to repair state 
if the application state needs to be corrected.

To make this simple, I would suggest an HTTP REST interface:
 - GET: read state entry for key
 - PUT: set/overwrite state for key
 - DELETE: drop state entry for key

The URLs could be: 
{{http(s)://statefun-service/state/<namespace>/<type>/<name>/<statename>}},
 where the string {{<namespace>/<type>/<name>}} is the fully-qualified address 
of the target, and the {{statename}} is the name under which that persistent 
state is stored.

Keys are always UTF-8 strings in StateFun, so they can be encoded in the URL.

For the responses, we would use the common codes 200 for GET/PUT/DELETE success 
and 404 for GET/DELETE not found.
 The state values, as returned from GET requests, would be generally just the 
bytes, and not interpreted by this request handling.

The integrate of the StateFun type system and HTTP content types (mime types) 
is up for further discussion.
 One option is set the content type response header to 
{{"statefun/<statefun_type>"}}, where all non-simple types map to 
{{Content-Type: application/octet-stream}}. We may make an exception for 
strings which could be returned as {{Content-Type: text/plain; charset=UTF-8}}.
 Later refinement is possible, like auto-stringifying contents when the request 
indicates to only accept {{text/plain}} responses.
h2. Failure Guarantees

The initial failure guarantees for PUT/DELETE would be none - the requests 
would be handled best effort.

We can easily extend this later in one of two ways:
 - Delay responses to the HTTP requests until the next checkpoint is complete. 
That makes synchronous interaction easy and sounds like a good match for a more 
admin-style interface.
 - Return the current checkpoint ID and offer a way to poll until the next 
checkpoint is completed. This avoid blocking requests, but puts more burden on 
the caller.
 Given the expected nature of the use cases for PUT/DELETE are more of a 
"admin/fix" nature, I would suggest to go with synchronous requests, for 
simplicity.

h2. Implementation

There are two options to implement this:
 (1) A Source/Sink (Ingress/Egress) pair
 (2) An Operator Coordinator with HTTP Requests

*Option (1) - Source/Sink pair*

We would implement an specific source that is both an ingress and an egress.
 The source would spawn a HTTP server (singleton per TM process).

Requests would be handled as follows:
 - Am HTTP request gets a generated correlation-ID.
 - The source injects a new message type (a "control message") into the stream. 
That message holds the Correlation-ID, the parallel subtask index of the 
originating source, and the target address and state name.
 - The function dispatcher handled these message in a special way, retrieving 
the state and sending an Egress message with the Correlation-ID to the parallel 
subtask of the egress as indicated by the message's subtask index.
 - The Egress (which is the same instance as the ingress source) uses to 
correlation ID to respond to the request.

Advantages:
 - No changes necessary in Flink
 - Might sustain higher throughput, due to multiple HTTP endpoints

Disadvantages:
 - Additional HTTP servers and ports require more setup (like service 
definitions on K8s).
 - Need to introduce new control message type and extend function dispatcher to 
handle them.
 - Makes a hard assumption that sources run on all slots. Needs "ugly" 
singleton hack to start only one server per TM process.

*Option (2) - Operator Coordinator*

Operator Coordinators are instances that run on the {{JobManager}} and can 
communicate with the Tasks via RPC.

Coordinators can receive calls from HTTP handlers at the JobManager's HTTP 
endpoint.
 An example for this is the Result Fetching through HTTP/OperatorCoordinator 
requests.
 We would need a patch to Flink to allow registering custom URLs and passing 
the path as a parameter to the request.

The RPCs can be processed in the mailbox on the Tasks, making them thread safe.
 This would also completely avoid the round-trip (source-to-sink) problem, the 
tasks simply need to send a response back to the RPC.

Advantages:
 - Reuse existing HTTP Endpoint and port. No need to have an additional HTTP 
server and port and service, for this admin-style requests, this approach 
re-uses Flink's admin HTTP endpoint.
 - No need for singleton HTTP Server logic in Tasks
 - Does require the assumption that all TMs run an instance of all operators.
 - No need for "control messages" and special handling logic for those.
 - No need for implementing round-trip routing

Disadvantages:
 - Requires change in Flink
 - Only one HTTP Server (on Job Manager) means it will not be a high-throughput 
solution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to