This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git


The following commit(s) were added to refs/heads/dev by this push:
     new 766e548  [FLINK-21882] Add Python greeter example
766e548 is described below

commit 766e54898f86d29740bd819a7e80444172fff4ee
Author: Igal Shilman <[email protected]>
AuthorDate: Thu Mar 18 16:07:28 2021 +0100

    [FLINK-21882] Add Python greeter example
    
    This closes #5.
---
 python/greeter/.dockerignore                       |   3 +
 python/greeter/Dockerfile                          |  34 +++++++
 python/greeter/README.md                           |  55 +++++++++++
 python/greeter/arch.png                            | Bin 0 -> 35343 bytes
 python/greeter/docker-compose.yml                  | 101 +++++++++++++++++++
 python/greeter/functions.py                        | 107 +++++++++++++++++++++
 python/greeter/input-example.json                  |   2 +
 python/greeter/lib-dev/README.md                   |   4 +
 ...he_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl | Bin 0 -> 32722 bytes
 python/greeter/module.yaml                         |  53 ++++++++++
 python/greeter/requirements.txt                    |  18 ++++
 11 files changed, 377 insertions(+)

diff --git a/python/greeter/.dockerignore b/python/greeter/.dockerignore
new file mode 100644
index 0000000..c1e502b
--- /dev/null
+++ b/python/greeter/.dockerignore
@@ -0,0 +1,3 @@
+.idea
+venv/
+checkpoint-dir/
diff --git a/python/greeter/Dockerfile b/python/greeter/Dockerfile
new file mode 100644
index 0000000..2bf2758
--- /dev/null
+++ b/python/greeter/Dockerfile
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM python:3.9-slim-buster
+
+RUN mkdir -p /app
+WORKDIR /app
+
+COPY requirements.txt /app
+RUN pip install -r requirements.txt
+
+COPY functions.py /app
+
+EXPOSE 8000
+
+# TODO: remove the following 2 lines, once apache_filink_statefun-3.0.0 will 
be released.
+ADD lib-dev/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl /app 
+RUN pip3 install /app/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl 
+
+CMD ["python3", "/app/functions.py"]
+
diff --git a/python/greeter/README.md b/python/greeter/README.md
new file mode 100644
index 0000000..3d160f2
--- /dev/null
+++ b/python/greeter/README.md
@@ -0,0 +1,55 @@
+# The Greeter Example
+
+This is a simple example of a stateful functions application implemented in 
`Python`.
+
+In this example, we imagine a service that computes personalized greetings. 
+Our service, consist out of the following components:
+
+* `kafka ingress` - This component forwards messages produced to the `names` 
kafka topic,
+to the `person` stateful function. Messages produced to this topic has the 
following 
+schema `{ "name" : "bob"}`.
+
+* `person` - This function is triggered by the ingress defined above.
+This function keeps track of the number of visits, and triggers the next 
functions:
+
+* `greeter` - This function, computes a personalized greeting, based on the 
name and the number
+of visits of that user. The output of that computation is forward to a Kafka 
egress defined below.
+ 
+* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to 
the `greetings` Kafka topic.
+
+
+![Flow](arch.png "Flow")
+
+## Running the example
+
+```
+docker-compose build
+docker-compose up
+```
+
+To observe the customized greeting, as they appear in the `greetings` Kafka 
topic, run in a separate terminal:
+
+```
+docker-compose exec kafka kafka-console-consumer \
+     --bootstrap-server kafka:9092 \
+     --isolation-level read_committed \
+     --from-beginning \
+     --topic greetings
+```
+
+Try adding few more input lines to [input-example.json](input-example.json), 
and restart
+the producer service.
+
+```
+docker-compose restart producer
+``` 
+
+Feeling curious? add the following print to the `person` function at 
[server.py](server.py):
+```print(f"Hello there {context.address.id}!", flush=True)```.
+
+Then, rebuild and restart only the `functions` service.
+
+```
+docker-compose build functions
+docker-compose up functions
+```
diff --git a/python/greeter/arch.png b/python/greeter/arch.png
new file mode 100644
index 0000000..1723613
Binary files /dev/null and b/python/greeter/arch.png differ
diff --git a/python/greeter/docker-compose.yml 
b/python/greeter/docker-compose.yml
new file mode 100644
index 0000000..37f97be
--- /dev/null
+++ b/python/greeter/docker-compose.yml
@@ -0,0 +1,101 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+version: "2.1"
+
+services:
+
+  ###############################################################
+  #    Functions service
+  ###############################################################
+
+  functions:
+    build:
+      context: ./
+    expose:
+      - "8000"
+
+  ###############################################################
+  #    StateFun runtime
+  ###############################################################
+
+  statefun-manager:
+    image: flink-statefun:2.3-SNAPSHOT
+    expose:
+      - "6123"
+    ports:
+      - "8081:8081"
+    environment:
+      ROLE: master
+      MASTER_HOST: statefun-manager
+    volumes:
+      - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+  statefun-worker:
+    image: flink-statefun:2.3-SNAPSHOT
+    expose:
+      - "6121"
+      - "6122"
+    depends_on:
+      - statefun-manager
+      - kafka
+    environment:
+      ROLE: worker
+      MASTER_HOST: statefun-manager
+    volumes:
+      - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+  ###############################################################
+  #    Kafka for ingress and egress
+  ###############################################################
+
+  zookeeper:
+    image: confluentinc/cp-zookeeper:5.4.3
+    environment:
+      ZOOKEEPER_CLIENT_PORT: "2181"
+    ports:
+      - "2181:2181"
+
+  kafka:
+    image: confluentinc/cp-kafka:5.4.3
+    ports:
+      - "9092:9092"
+    depends_on:
+      - zookeeper
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+  ###############################################################
+  #    Simple Kafka JSON producer to simulate ingress events
+  ###############################################################
+
+  producer:
+    image: ververica/statefun-playground-producer:latest
+    depends_on:
+      - kafka
+      - statefun-worker
+    environment:
+      APP_PATH: /mnt/input-example.json
+      APP_KAFKA_HOST: kafka:9092
+      APP_KAFKA_TOPIC: names
+      APP_JSON_PATH: name
+      APP_DELAY_SECONDS: 1
+    volumes:
+      - ./input-example.json:/mnt/input-example.json
diff --git a/python/greeter/functions.py b/python/greeter/functions.py
new file mode 100644
index 0000000..04bf1a3
--- /dev/null
+++ b/python/greeter/functions.py
@@ -0,0 +1,107 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+
+from statefun import *
+import asyncio
+from aiohttp import web
+
+functions = StatefulFunctions()
+
+
+def serialize_json_utf8(obj) -> bytes:
+    """
+    serialize the given object as a JSON utf-8 bytes.
+    """
+    str = json.dumps(obj, ensure_ascii=False)
+    return str.encode('utf-8')
+
+
+GREET_REQUEST_TYPE = simple_type(typename="example/GreetRequest",
+                                 serialize_fn=serialize_json_utf8,
+                                 deserialize_fn=json.loads)
+
+
[email protected](typename="example/person", specs=[ValueSpec(name="visits", 
type=IntType)])
+async def person(context: Context, message: Message):
+    # update the visit count.
+    visits = context.storage.visits or 0
+    visits += 1
+    context.storage.visits = visits
+
+    # enrich the request with the number of vists.
+    request = message.as_type(GREET_REQUEST_TYPE)
+    request['visits'] = visits
+
+    # next, we will forward a message to a special greeter function,
+    # that will compute a super-doper-personalized greeting based on the
+    # number of visits that this person has.
+    context.send(
+        message_builder(target_typename="example/greeter",
+                        target_id=request['name'],
+                        value=request,
+                        value_type=GREET_REQUEST_TYPE))
+
+
[email protected](typename="example/greeter")
+async def greeter(context, message):
+    request = message.as_type(GREET_REQUEST_TYPE)
+    person_name = request['name']
+    visits = request['visits']
+
+    greeting = await compute_fancy_greeting(person_name, visits)
+
+    context.send_egress(kafka_egress_message(typename="example/greets",
+                                             topic="greetings",
+                                             key=person_name,
+                                             value=greeting))
+
+
+async def compute_fancy_greeting(name: str, seen: int):
+    """
+    Compute a personalized greeting, based on the number of times this @name 
had been seen before.
+    """
+    templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is 
a charm %s"]
+    if seen < len(templates):
+        greeting = templates[seen] % name
+    else:
+        greeting = f"Nice to see you at the {seen}-nth time {name}!"
+
+    await asyncio.sleep(1)
+    return greeting
+
+
+#
+# Serve the endpoint
+#
+
+
+handler = RequestReplyHandler(functions)
+
+
+async def handle(request):
+    req = await request.read()
+    res = await handler.handle_async(req)
+    return web.Response(body=res, content_type="application/octet-stream")
+
+
+app = web.Application()
+app.add_routes([web.post('/statefun', handle)])
+
+if __name__ == '__main__':
+    web.run_app(app, port=8000)
diff --git a/python/greeter/input-example.json 
b/python/greeter/input-example.json
new file mode 100644
index 0000000..ad72aa8
--- /dev/null
+++ b/python/greeter/input-example.json
@@ -0,0 +1,2 @@
+{"name" : "Bob"}
+{"name" : "Joe"}
diff --git a/python/greeter/lib-dev/README.md b/python/greeter/lib-dev/README.md
new file mode 100644
index 0000000..f7905bd
--- /dev/null
+++ b/python/greeter/lib-dev/README.md
@@ -0,0 +1,4 @@
+# Unreleased artifacts
+
+This directory contains unreleased artifacts.
+TODO: remove this once merging `dev` to `main`.
diff --git 
a/python/greeter/lib-dev/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl 
b/python/greeter/lib-dev/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl
new file mode 100644
index 0000000..13ef023
Binary files /dev/null and 
b/python/greeter/lib-dev/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl 
differ
diff --git a/python/greeter/module.yaml b/python/greeter/module.yaml
new file mode 100644
index 0000000..5b50885
--- /dev/null
+++ b/python/greeter/module.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+version: "3.0"
+module:
+  meta:
+    type: remote
+  spec:
+    endpoints:
+      - endpoint:
+          meta:
+            kind: http
+          spec:
+            typename:
+              namespace: example
+            urlPathTemplate: http://functions:8000/statefun
+            timeouts:
+              call: 2min
+    ingresses:
+      - ingress:
+          meta:
+            type: statefun.kafka.io/routable-protobuf-ingress
+            id: example/names
+          spec:
+            address: kafka:9092
+            consumerGroupId: my-group-id
+            topics:
+              - topic: names
+                typeUrl: example/GreetRequest
+                targets:
+                  - example/person
+    egresses:
+      - egress:
+          meta:
+            type: statefun.kafka.io/generic-egress
+            id: example/greets
+          spec:
+            address: kafka:9092
+            deliverySemantic:
+              type: exactly-once
+              transactionTimeoutMillis: 100000
+
diff --git a/python/greeter/requirements.txt b/python/greeter/requirements.txt
new file mode 100644
index 0000000..0796d9b
--- /dev/null
+++ b/python/greeter/requirements.txt
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+aiohttp
+apache-flink-statefun

Reply via email to