rmetzger commented on a change in pull request #77: [FLINK-16730][docs] Python 
SDK Getting Started
URL: https://github.com/apache/flink-statefun/pull/77#discussion_r399477462
 
 

 ##########
 File path: docs/getting-started/walkthrough.md
 ##########
 @@ -24,177 +24,411 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+Stateful Functions offers a platform for building robust, stateful 
event-driven applications.
+It provides fine-grained control over state and time, which allows for the 
implementation of advanced systems.
+In this step-by-step guide you’ll learn how to build a stateful applications 
with the Stateful Functions API.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are You Building?
+
 Like all great introductions in software, this walkthrough will start at the 
beginning: saying hello.
 The application will run a simple function that accepts a request and responds 
with a greeting.
 It will not attempt to cover all the complexities of application development, 
but instead focus on building a stateful function — which is where you will 
implement your business logic.
 
-* This will be replaced by the TOC
-{:toc}
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Python, but you 
should be able to follow along even if you are coming from a different 
programming language.
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/gettinghelp.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly.
 
-## A Basic Hello
+## How to Follow Along
+
+If you want to follow along, you will require a computer with [Python 
3](https://www.python.org/) along with [Docker](https://www.docker.com/).
+
+{% panel **Note:** Each code block within this walkthrough may not contain the 
full surrounding class for brevity.
+The full code is available on [at the bottom of this page](#full-application). 
%}
+
+You can download a zip file with a skeleton project by clicking [here]({{ 
site.baseurl }}/downloads/walkthrough.zip).
+
+{% unless site.is_stable %}
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+    <b>Note</b>: The Stateful Functions project does not publish snapshot 
versions of the Python SDK to PyPy.
+    Please consider using a stable version of this guide.
+</p>
+{% endunless %}
+
+After unzipping the package, you will find a number of files.
+These include dockerfiles and data generators to run this walkthrough in a 
local self contained environment.
+
+{% highlight bash %}
+$ tree statefun-walkthrough
+statefun-walkthrough
+├── Dockerfile
+├── README.md
+├── docker-compose.yml
+├── generator
+│   ├── Dockerfile
+│   ├── event-generator.py
+│   └── messages_pb2.py
+├── greeter
+│   ├── Dockerfile
+│   ├── greeter.py
+│   ├── messages.proto
+│   ├── messages_pb2.py
+│   └── requirements.txt
+└── module.yaml
+{% endhighlight %}
 
-Greeting actions are triggered by consuming, routing and passing messages that 
are defined using ProtoBuf.
+## Start With Events
+
+Stateful Functions is an event driven system, so development begins by 
defining our events.
+The greeter application will define its events using [protocol 
buffers](https://developers.google.com/protocol-buffers). 
+When a greet request for a particular user is ingested, it will be routed to 
the appropriate function.
+The response will be returned with an appropriate greeting.
+The third type, `SeenCount`, is a utility class that will be used latter on to 
help manage the number of times a user has been seen so far.
 
 {% highlight proto %}
 syntax = "proto3";
 
+package example;
+
+// External request sent by a user who wants to be greeted
 message GreetRequest {
-    string who = 1;
+    // The name of the user to greet
+    string name = 1;
 }
-
+// A customized response sent to the user
 message GreetResponse {
-    string who = 1;
+    // The name of the user being greeted
+    string name = 1;
+    // The users customized greeting
     string greeting = 2;
 }
+// An internal message used to store state
+message SeenCount {
+    // The number of times a users has been seen so far
+    int64 seen = 1;
+}
 {% endhighlight %}
 
-Under the hood, messages are processed using [stateful functions]({{ 
site.baseurl }}/sdk/java.html), by definition any class that implements the 
``StatefulFunction`` interface.
 
-{% highlight java %}
-package org.apache.flink.statefun.examples.greeter;
+## Our First Function
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
+Under the hood, messages are processed using [stateful functions]({{ 
site.baseurl }}/sdk/python.html), which is any two argument function that is 
bound to the ``StatefulFunction`` runtime.
+Functions are bound to the runtime with the `@function.bind` decorator.
+When binding a function, it is annotated with a function type.
+This is the name used to reference this function when sending it messages.
 
-public final class GreetFunction implements StatefulFunction {
+When you open the file `greeter/greeter.py` you should see the following code.
 
-    @Override
-    public void invoke(Context context, Object input) {
-        GreetRequest greetMessage = (GreetRequest) input;
+{% highlight python %}
+from statefun import StatefulFunctions
 
-        GreetResponse response = GreetResponse.newBuilder()
-            .setWho(greetMessage.getWho())
-            .setGreeting("Hello " + greetMessage.getWho())
-            .build();
+functions = StatefulFunctions()
 
-        context.send(GreetingConstants.GREETING_EGRESS_ID, response);
-    }
-}
[email protected]("example/greeter")
+def greet(context, greet_request):
+    pass
 {% endhighlight %}
 
+A stateful function takes two arguments, a context and message. 
+The [context]({{ site.baseurl }}/sdk/python.html#context-reference) provides 
access to stateful functions runtime features such as state management and 
message passing.
+You will explore some of these features as you progress through this 
walkthrough. 
 
-This function takes in a request and sends a response to an external system 
(or [egress]({{ site.baseurl }}/io-module/index.html#egress)).
-While this is nice, it does not show off the real power of stateful functions: 
handling state.
+The other parameter is the input message that has been passed to this function.
+By default messages are passed around as protobuf 
[Any](https://developers.google.com/protocol-buffers/docs/reference/python-generated#wkt).
+If a function only accepts a known type, you can override the message type 
using Python 3 type syntax.
+This way you do not need to unwrap the message or check types.
 
-## A Stateful Hello
+{% highlight python %}
+from messages_pb2 import GreetRequest
+from statefun import StatefulFunctions
 
-Suppose you want to generate a personalized response for each user depending 
on how many times they have sent a request.
+functions = StatefulFunctions()
 
-{% highlight java %}
-private static String greetText(String name, int seen) {
-    switch (seen) {
-        case 0:
-            return String.format("Hello %s !", name);
-        case 1:
-            return String.format("Hello again %s !", name);
-        case 2:
-            return String.format("Third times the charm! %s!", name);
-        case 3:
-            return String.format("Happy to see you once again %s !", name);
-        default:
-            return String.format("Hello at the %d-th time %s", seen + 1, name);
-}
[email protected]("example/greeter")
+def greet(context, greet_request: GreetRequest):
+    pass
 {% endhighlight %}
 
-## Routing Messages
+## Sending A Response
 
-To send a user a personalized greeting, the system needs to keep track of how 
many times it has seen each user so far.
-Speaking in general terms, the simplest solution would be to create one 
function for every user and independently track the number of times they have 
been seen. Using most frameworks, this would be prohibitively expensive.
-However, stateful functions are virtual and do not consume any CPU or memory 
when not actively being invoked.
-That means your application can create as many functions as necessary — in 
this case, users — without worrying about resource consumption.
+Stateful Functions accept messages and can also send them out.
+Messages can be sent to other functions, as well as external systems (or 
[egress]({{ site.baseurl }}/io-module/index.html#egress)).
 
-Whenever data is consumed from an external system (or [ingress]({{ 
site.baseurl }}/io-module/index.html#ingress)), it is routed to a specific 
function based on a given function type and identifier.
-The function type represents the Class of function to be invoked, such as the 
Greeter function, while the identifier (``GreetRequest#getWho``) scopes the 
call to a specific virtual instance based on some key.
+One popular external system is [Apache Kafka](http://kafka.apache.org/).
+As a first step, lets update our function to respond to each input by sending 
a greeting to a Kafka topic.
 
-{% highlight java %}
-package org.apache.flink.statefun.examples.greeter;
+{% highlight python %}
+from messages_pb2 import GreetRequest, GreetResponse
+from statefun import StatefulFunctions
 
-import org.apache.flink.statefun.examples.kafka.generated.GreetRequest;
-import org.apache.flink.statefun.sdk.io.Router;
+functions = StatefulFunctions()
 
-final class GreetRouter implements Router<GreetRequest> {
[email protected]("example/greeter")
+def greet(context, message: GreetRequest):
+    response = GreetResponse()
+    response.name = message.name
+    response.greeting = "Hello {}".format(message.name)
+    
+    egress_message = kafka_egress_record(topic="greetings", key=message.name, 
value=response)
+    context.pack_and_send_egress("example/greets", egress_message)
+{% endhighlight %} 
 
-    @Override
-    public void route(GreetRequest message, Downstream<GreetRequest> 
downstream) {
-        downstream.forward(GreetingConstants.GREETER_FUNCTION_TYPE, 
message.getWho(), message);
-    }
-}
+For each message, a response is constructed and sent to a kafka topic call 
`greetings` partitioned by `name`.
+The `egress_message` is sent to a an `egress` named `example/greets`.
+This identifier points to a particular Kafka cluster and is configured on 
deployment below.
+
+## A Stateful Hello
+
+This is a great start, but does not show off the real power of stateful 
functions - working with state.
+Suppose you want to generate a personalized response for each user depending 
on how many times they have sent a request.
+
+{% highlight python %}
+def compute_greeting(name, seen):
+    """
+    Compute a personalized greeting, based on the number of times this @name 
had been seen before.
+    """
+    templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is 
a charm %s"]
+    if seen < len(templates):
+        greeting = templates[seen] % name
+    else:
+        greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)
+
+    response = GreetResponse()
+    response.name = name
+    response.greeting = greeting
+
+    return response
 {% endhighlight %}
 
-So, if a message for a user named John comes in, it will be shipped to John’s 
dedicated Greeter function.
-In case there is a following message for a user named Jane, a new instance of 
the Greeter function will be spawned.
+To “remember” information across multiple greeting messages, you then need to 
associate a persisted value field (``seen_count``) to the Greet function.
+For each user, functions can now track how many times they have been seen.
 
-## Persistence
+{% highlight python %}
[email protected]("example/greeter")
+def greet(context, message: GreetRequest):
+    state = context.state('seen_count').unpack(SeenCount)
+    if not state:
+        state = SeenCount()
+        state.seen = 1
+    else:
+        state.seen += 1
+    context.state('seen_count').pack(state)
 
-[Persisted value]({{ site.baseurl }}/sdk/java.html#persistence) is a special 
data type that enables stateful functions to maintain fault-tolerant state 
scoped to their identifiers, so that each instance of a function can track 
state independently.
-To “remember” information across multiple greeting messages, you then need to 
associate a persisted value field (``count``) to the Greet function. For each 
user, functions can now track how many times they have been seen.
+    response = compute_greeting(greet_request.name, state.seen)
 
-{% highlight java %}
-package org.apache.flink.statefun.examples.greeter;
+    egress_message = kafka_egress_record(topic="greetings", 
key=greet_request.name, value=response)
+    context.pack_and_send_egress("example/greets", egress_message)
+{% endhighlight %}
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
+The state, `seen_count` is always scoped to the current name so it can track 
each user independently.
 
-public final class GreetFunction implements StatefulFunction {
+## Wiring It All Together
 
-    @Persisted
-    private final PersistedValue<Integer> count = PersistedValue.of("count", 
Integer.class);
+Stateful Function applications communicate with the Apache Flink runtime using 
`http`.
+The Python SDK ships with a ``RequestReplyHandler`` that automatically 
dispatches function calls based on RESTful HTTP ``POSTS``.
+The ``RequestReplyHandler`` may be exposed using any HTTP framework.
 
-    @Override
-    public void invoke(Context context, Object input) {
-        GreetRequest greetMessage = (GreetRequest) input;
+One popular Python web framework is 
[Flask](https://palletsprojects.com/p/flask/).
+It can be used to quickly and easily expose an application to the Apache Flink 
runtime.
 
-        GreetResponse response = computePersonalizedGreeting(greetMessage);
+{% highlight python %}
+from statefun import StatefulFunctions
+from statefun import RequestReplyHandler
 
-        context.send(GreetingConstants.GREETING_EGRESS_ID, response);
-    }
+functions = StatefulFunctions()
 
-    private GreetResponse computePersonalizedGreeting(GreetRequest 
greetMessage) {
-        final String name = greetMessage.getWho();
-        final int seen = count.getOrDefault(0);
-        count.set(seen + 1);
[email protected]("walkthrough/greeter")
+def greeter(context, message: GreetRequest):
+    pass
 
-        String greeting = greetText(name, seen);
+handler = RequestReplyHandler(functions)
 
-        return GreetResponse.newBuilder()
-            .setWho(name)
-            .setGreeting(greeting)
-            .build();
-    }
-}
+# Serve the endpoint
+
+from flask import request
+from flask import make_response
+from flask import Flask
+
+app = Flask(__name__)
+
[email protected]('/statefun', methods=['POST'])
+def handle():
+    response_data = handler(request.data)
+    response = make_response(response_data)
+    response.headers.set('Content-Type', 'application/octet-stream')
+    return response
+
+
+if __name__ == "__main__":
+    app.run()
 {% endhighlight %}
 
-Each time a message is processed, the function computes a personalized message 
for that user.
-It reads and updates the number of times that user has been seen and sends a 
greeting to the egress.
+## Configuring for Runtime
+
+The Apache Flink runtime makes requests to the greeter function by making 
`http` calls to the `Flask` server.
+To do that, Flink needs to know what endpoint it can use to reach the server.
+This is also a good time to configure our connection to the input and output 
Kafka topics.
+The configuration is in a file called `module.yaml`.
+
+{% highlight yaml %}
+version: "1.0"
+module:
+  meta:
+    type: remote
+  spec:
+    functions:
+      - function:
+          meta:
+            kind: http
+            type: example/greeter
+          spec:
+            endpoint: http://python-worker:8000/statefun
+            states:
+              - seen_count
+            maxNumBatchRequests: 500
+            timeout: 2min
+    ingresses:
+      - ingress:
+          meta:
+            type: statefun.kafka.io/routable-protobuf-ingress
+            id: example/names
+          spec:
+            address: kafka-broker:9092
+            consumerGroupId: my-group-id
+            topics:
+              - topic: names
+                typeUrl: com.googleapis/example.GreetRequest
+                targets:
+                  - example/greeter
+    egresses:
+      - egress:
+          meta:
+            type: statefun.kafka.io/generic-egress
+            id: example/greets
+          spec:
+            address: kafka-broker:9092
+            deliverySemantic:
+              type: exactly-once
+              transactionTimeoutMillis: 100000
+{% endhighlight %}
 
-You can check the full code for the application described in this walkthrough 
[here]({{ site.github_url }}/tree/{{ site.github_branch 
}}/statefun-examples/statefun-greeter-example).
-In particular, take a look at the module ``GreetingModule``, which is the main 
entry point for the full application, to see how everything gets tied together.
-You can run this example locally using the provided Docker setup.
+This configuration does a few interesting things.
 
-{% highlight bash %}
-$ docker-compose build 
-$ docker-compose up
+The first is to declare our function, `example/greeter`.
+It includes the endpoint by which it is reachable along with the states the 
function has access to.
+
+The ingress is the input Kafka topic that routes `GreetRequest` messages to 
the function.
+Along with basic properties like broker address and consumer group, it 
contains a list of targets.
+These are the functions each message will be sent to.
+
+The egress is the output Kafka cluster.
+It contains broker specific configurations but allows each message to route to 
any topic.
+
+## Deployment
+
+Now that the greeter application has been built it is time to deploy. 
+The simplest way to deploy a Stateful Function application is by using the 
community provided base image and loading your module.
+The base image provides the Stateful Function runtime, it will use the 
provided `module.yaml` to configure for this specific job.
+This can be found in the docker file in the root directory. 
+
+{% highlight docker %}
+FROM statefun
+
+RUN mkdir -p /opt/statefun/modules/greeter
+ADD module.yaml /opt/statefun/modules/greeter
 {% endhighlight %}
 
-Then, send some messages to the topic "names", and observe what comes out of 
"greetings".
+You can now run this application locally using the provided Docker setup.
 
 {% highlight bash %}
-$ docker-compose exec kafka-broker kafka-console-producer.sh \
-    --broker-list localhost:9092 \
-    --topic names
+$ docker-compose up -d
 
 Review comment:
   Thx. That worked
   
   ```
   Step 5/9 : RUN pip install -r requirements.txt
    ---> Running in 7a9ee7fda4f7
   ERROR: Could not find a version that satisfies the requirement 
apache-flink-statefun (from -r requirements.txt (line 17)) (from versions: none)
   ERROR: No matching distribution found for apache-flink-statefun (from -r 
requirements.txt (line 17))
   ERROR: Service 'python-worker' failed to build: The command '/bin/sh -c pip 
install -r requirements.txt' returned a non-zero code: 1
   ```
   
   How do I get `apache-flink-statefun` into that Docker build?
   
   I managed to create a `RELEASE_VERSION=2.0 ./create_python_sdk_release.sh` 
release

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to