This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 2e7cdfbcc0af23f80a283885db9fc97545fd4124 Author: Igal Shilman <igalshil...@gmail.com> AuthorDate: Mon Mar 23 12:34:11 2020 +0100 [FLINK-16722] Add Python SDK walkthrough This closes #66. --- .../statefun-python-walkthrough/README.md | 44 ++++ .../statefun-python-walkthrough/example_utils.py | 38 ++++ .../statefun-python-walkthrough/requirements.txt | 20 ++ .../statefun-python-walkthrough/run-example.py | 176 ++++++++++++++++ .../statefun-python-walkthrough/walkthrough.proto | 42 ++++ .../statefun-python-walkthrough/walkthrough.py | 157 ++++++++++++++ .../statefun-python-walkthrough/walkthrough_pb2.py | 229 +++++++++++++++++++++ 7 files changed, 706 insertions(+) diff --git a/statefun-examples/statefun-python-walkthrough/README.md b/statefun-examples/statefun-python-walkthrough/README.md new file mode 100644 index 0000000..fd75f1c --- /dev/null +++ b/statefun-examples/statefun-python-walkthrough/README.md @@ -0,0 +1,44 @@ +# Apache Stateful Functions - Python SDK Walkthrough + +## Setup + +* Create a virtual env + +``` +python3 -m venv venv +source venv/bin/activate +``` + +* Install the requirements + +``` +pip3 install -r requirements.txt +``` + +If you are building from source, then first build the +distribution (via calling `statefun-python-sdk/build-distribution.sh`) +then copy `statefun-python-sdk/dist/apache_flink_statefun-<version>-py3-none-any.whl` here and +run + +``` +pip3 install apache_flink_statefun-<version>-py3-none-any.whl +``` + +## Examples + +* Checkout the walkthrough examples at [walkthrough.py](walkthrough.py) +* To invoke one of the example functions, and observe its result, run: +``` +python3 walkthrough.py +``` + +And from another terminal run: +``` +python3 run-example.py <example name> +``` + +e.g. + +``` +python3 run-example.py walkthrough/hello +``` diff --git a/statefun-examples/statefun-python-walkthrough/example_utils.py b/statefun-examples/statefun-python-walkthrough/example_utils.py new file mode 100644 index 0000000..9409350 --- /dev/null +++ b/statefun-examples/statefun-python-walkthrough/example_utils.py @@ -0,0 +1,38 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from flask import request +from flask import make_response +from flask import Flask + +from statefun import RequestReplyHandler + + +def flask_server(endpoint, functions): + app = Flask(__name__) + + handler = RequestReplyHandler(functions) + + @app.route(endpoint, methods=['POST']) + def handle(): + response_data = handler(request.data) + response = make_response(response_data) + response.headers.set('Content-Type', 'application/octet-stream') + return response + + app.run() diff --git a/statefun-examples/statefun-python-walkthrough/requirements.txt b/statefun-examples/statefun-python-walkthrough/requirements.txt new file mode 100644 index 0000000..580d40e --- /dev/null +++ b/statefun-examples/statefun-python-walkthrough/requirements.txt @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +flask +protobuf>=3.11.3,<4.0.0 +requests + diff --git a/statefun-examples/statefun-python-walkthrough/run-example.py b/statefun-examples/statefun-python-walkthrough/run-example.py new file mode 100644 index 0000000..7f25099 --- /dev/null +++ b/statefun-examples/statefun-python-walkthrough/run-example.py @@ -0,0 +1,176 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys + +import pprint +import requests +from google.protobuf.json_format import MessageToDict +from google.protobuf.any_pb2 import Any + +from statefun.request_reply_pb2 import ToFunction, FromFunction + +from walkthrough_pb2 import Hello, AnotherHello, Counter + + +class InvocationBuilder(object): + """builder for the ToFunction message""" + + def __init__(self): + self.to_function = ToFunction() + + def with_target(self, ns, type, id): + InvocationBuilder.set_address(ns, type, id, self.to_function.invocation.target) + return self + + def with_state(self, name, value=None): + state = self.to_function.invocation.state.add() + state.state_name = name + if value: + any = Any() + any.Pack(value) + state.state_value = any.SerializeToString() + return self + + def with_invocation(self, arg, caller=None): + invocation = self.to_function.invocation.invocations.add() + if caller: + (ns, type, id) = caller + InvocationBuilder.set_address(ns, type, id, invocation.caller) + invocation.argument.Pack(arg) + return self + + def SerializeToString(self): + return self.to_function.SerializeToString() + + @staticmethod + def set_address(namespace, type, id, address): + address.namespace = namespace + address.type = type + address.id = id + + +def post(data): + return requests.post(url='http://localhost:5000/statefun', + data=data, + headers={'Content-Type': 'application/octet-stream'}) + + +# -------------------------------------------------------------------------------------------------------------- +# example +# --------------------------------------------------------------------------------------------------------------- + +class Examples(object): + def __init__(self): + self.examples = {} + + def bind(self, typename): + def wrapper(fn): + self.examples[typename] = fn + return fn + + return wrapper + + def invoke(self, typename): + fn = self.examples[typename] + builder = InvocationBuilder() + type, name = typename.split("/") + builder.with_target(type, name, "some id") + fn(builder) + result = post(builder.SerializeToString()) + from_fn = FromFunction() + from_fn.ParseFromString(result.content) + pprint.pprint(MessageToDict(from_fn, preserving_proto_field_name=True, including_default_value_fields=True)) + + +examples = Examples() + + +@examples.bind("walkthrough/hello") +def hello(builder): + msg = Hello() + msg.world = "Hello world!" + builder.with_invocation(msg) + + +@examples.bind("walkthrough/any") +def any_example(builder): + hello(builder) + + +@examples.bind("walkthrough/type-hint") +def typehint(builder): + hello(builder) + + +@examples.bind("walkthrough/union-type-hint") +def union_type_hint(builder): + hello = Hello() + builder.with_invocation(hello) + + another_hello = AnotherHello() + builder.with_invocation(another_hello) + + +@examples.bind("walkthrough/state_access") +def state1(builder): + builder.with_state("counter") + builder.with_invocation(Hello()) + + +@examples.bind("walkthrough/state_access_unpack") +def state2(builder): + counter = Counter() + counter.value = 1 + builder.with_state("counter", counter) + builder.with_invocation(Hello()) + + +@examples.bind("walkthrough/state_access_del") +def state3(builder): + counter = Counter() + counter.value = 1 + builder.with_state("counter", counter) + builder.with_invocation(Hello()) + + +@examples.bind("walkthrough/send") +def send(builder): + hello(builder) + + +@examples.bind("walkthrough/reply") +def reply(builder): + reply_to = ("example-runner", "reply", "0") + builder.with_invocation(Hello(), reply_to) + + +@examples.bind("walkthrough/egress") +def egress(builder): + hello(builder) + + +def main(): + if len(sys.argv) != 2: + print("usage: run-example.py <ns/name>") + sys.exit(1) + example = sys.argv[1] + examples.invoke(example) + + +if __name__ == "__main__": + main() diff --git a/statefun-examples/statefun-python-walkthrough/walkthrough.proto b/statefun-examples/statefun-python-walkthrough/walkthrough.proto new file mode 100644 index 0000000..8f2897f --- /dev/null +++ b/statefun-examples/statefun-python-walkthrough/walkthrough.proto @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +// protoc *.proto --python_out=. + +package walkthrough; + +message Hello { + string world = 1; +} + +message AnotherHello { +} + +message Counter { + int64 value = 1; +} + +message HelloReply { + string message = 1; +} + +message Event { +} + + diff --git a/statefun-examples/statefun-python-walkthrough/walkthrough.py b/statefun-examples/statefun-python-walkthrough/walkthrough.py new file mode 100644 index 0000000..61d8911 --- /dev/null +++ b/statefun-examples/statefun-python-walkthrough/walkthrough.py @@ -0,0 +1,157 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import typing + +from statefun import StatefulFunctions, kafka_egress_record +from google.protobuf.any_pb2 import Any + +# +# @functions is the entry point, that allows us to register +# stateful functions identified via a namespace and a name pair +# of the form "<namespace>/<name>". +# +from walkthrough_pb2 import HelloReply, Hello, Counter, AnotherHello, Event + +functions = StatefulFunctions() + + +# +# The following statement binds the Python function instance hello to a namespaced name +# "walkthrough/hello". This is also known as a function type, in stateful functions terms. +# i.e. the function type of hello is FunctionType(namespace="walkthrough", type="hello") +# messages that would be address to this function type, would be dispatched to this function instance. +# +@functions.bind("walkthrough/hello") +def hello(context, message): + print(message) + + +# ----------------------------------------------------------------------------------------------------------------- +# Message Types +# ----------------------------------------------------------------------------------------------------------------- + + +@functions.bind("walkthrough/any") +def any_example(context, any_message): + # messages sent to a Python function are always packed into a google.protobuf.Any + # (https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/Any.html) + # Therefore the first thing we need to do is to unpack it. + if not any_message.Is(Hello.DESCRIPTOR): + raise TypeError('Unexpected message type') + + hello = Hello() + any_message.Unpack(hello) + print(hello) + + +@functions.bind("walkthrough/type-hint") +def typehint(context, message: Hello): + # Although messages that are sent to a Python function are always packed into a google.protobuf.Any + # StateFun can deduce type hints and can unpack the message for you automatically. + print(message.world) + + +@functions.bind("walkthrough/union-type-hint") +def union_type_hint(context, message: typing.Union[Hello, AnotherHello]): + # StateFun can deduce type hints and can unpack the message for you automatically, even + # when you are expecting more than one message type. + print(message) # <-- would be either an instance of Hello or an instance of AnotherHello + + +@functions.bind("walkthrough/state_access") +def state1(context, message): + # state can be accessed directly by getting the state name (as registered in a module.yaml). remember that the + # state has to be a valid Protocol Buffers message, and has to be packed into a google.protobuf.Any + pb_any = context['counter'] + if pb_any: + # state was previously stored for this address + counter = Counter() + pb_any.Unpack(counter) + counter.value += 1 + pb_any.Pack(counter) + context['counter'] = pb_any + else: + # state was not stored for this address + counter = Counter() + counter.value = 1 + pb_any = Any() + pb_any.Pack(counter) + context['counter'] = pb_any + + +# ----------------------------------------------------------------------------------------------------------------- +# State management +# ----------------------------------------------------------------------------------------------------------------- + +@functions.bind("walkthrough/state_access_unpack") +def state2(context, message): + # statefun can help you to unpack/pack the values directly, removing some of the boilerplate + # associated with google.protobuf.Any. + counter = context.state('counter').unpack(Counter) + if counter: + counter.value += 1 + else: + counter = Counter() + counter.value = 1 + context.state('counter').pack(counter) + + +@functions.bind("walkthrough/state_access_del") +def state3(context, message): + # state can be deleted easily by using the del keyword. + del context['counter'] + + +# ----------------------------------------------------------------------------------------------------------------- +# Sending Messages +# ----------------------------------------------------------------------------------------------------------------- + +@functions.bind("walkthrough/send") +def send(context, message): + # context allows you to send messages to other functions, as long as you + # know their address. An address is composed of a function type and an id. + any = Any() + any.Pack(Hello()) + context.send("walkthrough/reply", "some-id", any) # see reply() below. + + # you can also use the convenience alternative, that would pack the argument to a google.protobuf.Any + context.pack_and_send("walkthrough/reply", "some-id", Hello()) + + +@functions.bind("walkthrough/reply") +def reply(context, message): + # directly reply to the sender! + reply = HelloReply() + reply.message = "This is a reply!" + context.pack_and_reply(reply) + + +@functions.bind("walkthrough/egress") +def egress(context, message): + # send a message to an external system via an egress. Egresses needs to be defined in a module.yaml + # and can be referenced by type. + # The following two lines prepare a message to send to the pre-built Kafka egress. + key = context.address.identity # use the identity part of our own address as the target Kafka key. + record = kafka_egress_record(topic="events", key=key, value=Event()) + context.pack_and_send_egress("walkthrough/events-egress", record) + + +if __name__ == "__main__": + from example_utils import flask_server + + flask_server("/statefun", functions) diff --git a/statefun-examples/statefun-python-walkthrough/walkthrough_pb2.py b/statefun-examples/statefun-python-walkthrough/walkthrough_pb2.py new file mode 100644 index 0000000..c974860 --- /dev/null +++ b/statefun-examples/statefun-python-walkthrough/walkthrough_pb2.py @@ -0,0 +1,229 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: walkthrough.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='walkthrough.proto', + package='walkthrough', + syntax='proto3', + serialized_options=None, + serialized_pb=_b('\n\x11walkthrough.proto\x12\x0bwalkthrough\"\x16\n\x05Hello\x12\r\n\x05world\x18\x01 \x01(\t\"\x0e\n\x0c\x41notherHello\"\x18\n\x07\x43ounter\x12\r\n\x05value\x18\x01 \x01(\x03\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t\"\x07\n\x05\x45ventb\x06proto3') +) + + + + +_HELLO = _descriptor.Descriptor( + name='Hello', + full_name='walkthrough.Hello', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='world', full_name='walkthrough.Hello.world', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=34, + serialized_end=56, +) + + +_ANOTHERHELLO = _descriptor.Descriptor( + name='AnotherHello', + full_name='walkthrough.AnotherHello', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=58, + serialized_end=72, +) + + +_COUNTER = _descriptor.Descriptor( + name='Counter', + full_name='walkthrough.Counter', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='value', full_name='walkthrough.Counter.value', index=0, + number=1, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=74, + serialized_end=98, +) + + +_HELLOREPLY = _descriptor.Descriptor( + name='HelloReply', + full_name='walkthrough.HelloReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='message', full_name='walkthrough.HelloReply.message', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=100, + serialized_end=129, +) + + +_EVENT = _descriptor.Descriptor( + name='Event', + full_name='walkthrough.Event', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=131, + serialized_end=138, +) + +DESCRIPTOR.message_types_by_name['Hello'] = _HELLO +DESCRIPTOR.message_types_by_name['AnotherHello'] = _ANOTHERHELLO +DESCRIPTOR.message_types_by_name['Counter'] = _COUNTER +DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY +DESCRIPTOR.message_types_by_name['Event'] = _EVENT +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Hello = _reflection.GeneratedProtocolMessageType('Hello', (_message.Message,), dict( + DESCRIPTOR = _HELLO, + __module__ = 'walkthrough_pb2' + # @@protoc_insertion_point(class_scope:walkthrough.Hello) + )) +_sym_db.RegisterMessage(Hello) + +AnotherHello = _reflection.GeneratedProtocolMessageType('AnotherHello', (_message.Message,), dict( + DESCRIPTOR = _ANOTHERHELLO, + __module__ = 'walkthrough_pb2' + # @@protoc_insertion_point(class_scope:walkthrough.AnotherHello) + )) +_sym_db.RegisterMessage(AnotherHello) + +Counter = _reflection.GeneratedProtocolMessageType('Counter', (_message.Message,), dict( + DESCRIPTOR = _COUNTER, + __module__ = 'walkthrough_pb2' + # @@protoc_insertion_point(class_scope:walkthrough.Counter) + )) +_sym_db.RegisterMessage(Counter) + +HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), dict( + DESCRIPTOR = _HELLOREPLY, + __module__ = 'walkthrough_pb2' + # @@protoc_insertion_point(class_scope:walkthrough.HelloReply) + )) +_sym_db.RegisterMessage(HelloReply) + +Event = _reflection.GeneratedProtocolMessageType('Event', (_message.Message,), dict( + DESCRIPTOR = _EVENT, + __module__ = 'walkthrough_pb2' + # @@protoc_insertion_point(class_scope:walkthrough.Event) + )) +_sym_db.RegisterMessage(Event) + + +# @@protoc_insertion_point(module_scope)