igalshilman opened a new pull request #52: [FLINK-16490] Add a minimal Python SDK URL: https://github.com/apache/flink-statefun/pull/52 ## Background The JVM based stateful functions implementation, has a RequestReply extension (a protocol and an implementation) that allows calling into any HTTP endpoint that implements that protocol. Although it is possible to implement this protocol independently, we should provide a minimal library for the Python programing language, that: Allows users to define and declare their functions in a convenient way Dispatch an invocation request sent from the JVM to the appropriate function previously declared ## A mini tutorial ### define and declare a function ``` from statefun import StatefulFunctions functions = StatefulFunctions() @functions.bind("demo/greeter") def greet(context, message: LoginEvent): print("Hey " + message.user_name) ``` This code, declares a function with a `FunctionType("demo", "greeter")` and binds the `greet` Python instance to it. ### Expose it with a request reply handler ``` from statefun import RequestReplyHandler handler = RequestReplyHandler(functions) ``` ### Use the handler within your favorite `HTTP` serving framework For example `Flask` ``` @app.route('/statefun', methods=['POST']) def handle(): response_data = handler(request.data) response = make_response(response_data) response.headers.set('Content-Type', 'application/octet-stream') return response if __name__ == "__main__": app.run() ``` This creates an HTTP server that accepts requests from the Stateful Functions cluster and dispatches it to the handler. ### The reaming thing would be to declare this function type in a `module.yaml` ``` functions: function: meta: kind: http type: demo/greeter spec: endpoint: http://<end point url>/statefun states: - foo - bar - baz ``` ## Currently supported features: A `Context` classes has the following attributes/methods * `send(self, typename: str, id: str, message: Any)` - send a message to any function with the function type of the form `<namespace>/<type>` and the message is a `google.protobuf.Any` * `pack_and_send(self, typename: str, id: str, message)` - the same as above, but it would pack the protobuf message in an `Any` * `reply(self, message: Any)` - send a message to the invoking function. * `pack_and_reply(self, message)` - the same as above, but would pack to an `Any` * `send_after(self, delay: timedelta, typename: str, id: str, message: Any)` - send a message after a delay. * `pack_and_send_after(self, delay: timedelta, typename: str, id: str, message)` - the same as above, but would pack the message to an `Any` * `send_egress(self, typename, message: Any)` - emit a message to an egress with a typename of the form `<namespace>/<name>` * `pack_and_send_egress(self, typename, message)` - the same as above. ## Eager state registration The request reply protocol, requires that the state names would be registered at `module.yaml` under the `state:` section (see the example above). The state values could be absent (`None` or a `google.protobuf.Any` and they can be generally obtained via the `context` parameter: ``` @functions.bind("demo/greeter") def greet(context, message: LoginEvent): session = context['session'] if not session: session = start_session(message) context['session'] = session ... ``` ## How to test 1. create the virtual environment ``` python3 -m venv venv source venv/bin/activate ``` 2. Run unit tests ``` python3 -m unittest tests ``` 3. Look at ## Create the distribution ### With Docker `./build-distribution.sh` ### With venv ``` python3 -m pip install wheel python3 setup.py sdist bdist_wheel ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
