This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
The following commit(s) were added to refs/heads/dev by this push:
new 766e548 [FLINK-21882] Add Python greeter example
766e548 is described below
commit 766e54898f86d29740bd819a7e80444172fff4ee
Author: Igal Shilman <[email protected]>
AuthorDate: Thu Mar 18 16:07:28 2021 +0100
[FLINK-21882] Add Python greeter example
This closes #5.
---
python/greeter/.dockerignore | 3 +
python/greeter/Dockerfile | 34 +++++++
python/greeter/README.md | 55 +++++++++++
python/greeter/arch.png | Bin 0 -> 35343 bytes
python/greeter/docker-compose.yml | 101 +++++++++++++++++++
python/greeter/functions.py | 107 +++++++++++++++++++++
python/greeter/input-example.json | 2 +
python/greeter/lib-dev/README.md | 4 +
...he_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl | Bin 0 -> 32722 bytes
python/greeter/module.yaml | 53 ++++++++++
python/greeter/requirements.txt | 18 ++++
11 files changed, 377 insertions(+)
diff --git a/python/greeter/.dockerignore b/python/greeter/.dockerignore
new file mode 100644
index 0000000..c1e502b
--- /dev/null
+++ b/python/greeter/.dockerignore
@@ -0,0 +1,3 @@
+.idea
+venv/
+checkpoint-dir/
diff --git a/python/greeter/Dockerfile b/python/greeter/Dockerfile
new file mode 100644
index 0000000..2bf2758
--- /dev/null
+++ b/python/greeter/Dockerfile
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM python:3.9-slim-buster
+
+RUN mkdir -p /app
+WORKDIR /app
+
+COPY requirements.txt /app
+RUN pip install -r requirements.txt
+
+COPY functions.py /app
+
+EXPOSE 8000
+
+# TODO: remove the following 2 lines, once apache_filink_statefun-3.0.0 will
be released.
+ADD lib-dev/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl /app
+RUN pip3 install /app/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl
+
+CMD ["python3", "/app/functions.py"]
+
diff --git a/python/greeter/README.md b/python/greeter/README.md
new file mode 100644
index 0000000..3d160f2
--- /dev/null
+++ b/python/greeter/README.md
@@ -0,0 +1,55 @@
+# The Greeter Example
+
+This is a simple example of a stateful functions application implemented in
`Python`.
+
+In this example, we imagine a service that computes personalized greetings.
+Our service, consist out of the following components:
+
+* `kafka ingress` - This component forwards messages produced to the `names`
kafka topic,
+to the `person` stateful function. Messages produced to this topic has the
following
+schema `{ "name" : "bob"}`.
+
+* `person` - This function is triggered by the ingress defined above.
+This function keeps track of the number of visits, and triggers the next
functions:
+
+* `greeter` - This function, computes a personalized greeting, based on the
name and the number
+of visits of that user. The output of that computation is forward to a Kafka
egress defined below.
+
+* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to
the `greetings` Kafka topic.
+
+
+
+
+## Running the example
+
+```
+docker-compose build
+docker-compose up
+```
+
+To observe the customized greeting, as they appear in the `greetings` Kafka
topic, run in a separate terminal:
+
+```
+docker-compose exec kafka kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --isolation-level read_committed \
+ --from-beginning \
+ --topic greetings
+```
+
+Try adding few more input lines to [input-example.json](input-example.json),
and restart
+the producer service.
+
+```
+docker-compose restart producer
+```
+
+Feeling curious? add the following print to the `person` function at
[server.py](server.py):
+```print(f"Hello there {context.address.id}!", flush=True)```.
+
+Then, rebuild and restart only the `functions` service.
+
+```
+docker-compose build functions
+docker-compose up functions
+```
diff --git a/python/greeter/arch.png b/python/greeter/arch.png
new file mode 100644
index 0000000..1723613
Binary files /dev/null and b/python/greeter/arch.png differ
diff --git a/python/greeter/docker-compose.yml
b/python/greeter/docker-compose.yml
new file mode 100644
index 0000000..37f97be
--- /dev/null
+++ b/python/greeter/docker-compose.yml
@@ -0,0 +1,101 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+version: "2.1"
+
+services:
+
+ ###############################################################
+ # Functions service
+ ###############################################################
+
+ functions:
+ build:
+ context: ./
+ expose:
+ - "8000"
+
+ ###############################################################
+ # StateFun runtime
+ ###############################################################
+
+ statefun-manager:
+ image: flink-statefun:2.3-SNAPSHOT
+ expose:
+ - "6123"
+ ports:
+ - "8081:8081"
+ environment:
+ ROLE: master
+ MASTER_HOST: statefun-manager
+ volumes:
+ - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+ statefun-worker:
+ image: flink-statefun:2.3-SNAPSHOT
+ expose:
+ - "6121"
+ - "6122"
+ depends_on:
+ - statefun-manager
+ - kafka
+ environment:
+ ROLE: worker
+ MASTER_HOST: statefun-manager
+ volumes:
+ - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+ ###############################################################
+ # Kafka for ingress and egress
+ ###############################################################
+
+ zookeeper:
+ image: confluentinc/cp-zookeeper:5.4.3
+ environment:
+ ZOOKEEPER_CLIENT_PORT: "2181"
+ ports:
+ - "2181:2181"
+
+ kafka:
+ image: confluentinc/cp-kafka:5.4.3
+ ports:
+ - "9092:9092"
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+ ###############################################################
+ # Simple Kafka JSON producer to simulate ingress events
+ ###############################################################
+
+ producer:
+ image: ververica/statefun-playground-producer:latest
+ depends_on:
+ - kafka
+ - statefun-worker
+ environment:
+ APP_PATH: /mnt/input-example.json
+ APP_KAFKA_HOST: kafka:9092
+ APP_KAFKA_TOPIC: names
+ APP_JSON_PATH: name
+ APP_DELAY_SECONDS: 1
+ volumes:
+ - ./input-example.json:/mnt/input-example.json
diff --git a/python/greeter/functions.py b/python/greeter/functions.py
new file mode 100644
index 0000000..04bf1a3
--- /dev/null
+++ b/python/greeter/functions.py
@@ -0,0 +1,107 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+
+from statefun import *
+import asyncio
+from aiohttp import web
+
+functions = StatefulFunctions()
+
+
+def serialize_json_utf8(obj) -> bytes:
+ """
+ serialize the given object as a JSON utf-8 bytes.
+ """
+ str = json.dumps(obj, ensure_ascii=False)
+ return str.encode('utf-8')
+
+
+GREET_REQUEST_TYPE = simple_type(typename="example/GreetRequest",
+ serialize_fn=serialize_json_utf8,
+ deserialize_fn=json.loads)
+
+
[email protected](typename="example/person", specs=[ValueSpec(name="visits",
type=IntType)])
+async def person(context: Context, message: Message):
+ # update the visit count.
+ visits = context.storage.visits or 0
+ visits += 1
+ context.storage.visits = visits
+
+ # enrich the request with the number of vists.
+ request = message.as_type(GREET_REQUEST_TYPE)
+ request['visits'] = visits
+
+ # next, we will forward a message to a special greeter function,
+ # that will compute a super-doper-personalized greeting based on the
+ # number of visits that this person has.
+ context.send(
+ message_builder(target_typename="example/greeter",
+ target_id=request['name'],
+ value=request,
+ value_type=GREET_REQUEST_TYPE))
+
+
[email protected](typename="example/greeter")
+async def greeter(context, message):
+ request = message.as_type(GREET_REQUEST_TYPE)
+ person_name = request['name']
+ visits = request['visits']
+
+ greeting = await compute_fancy_greeting(person_name, visits)
+
+ context.send_egress(kafka_egress_message(typename="example/greets",
+ topic="greetings",
+ key=person_name,
+ value=greeting))
+
+
+async def compute_fancy_greeting(name: str, seen: int):
+ """
+ Compute a personalized greeting, based on the number of times this @name
had been seen before.
+ """
+ templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is
a charm %s"]
+ if seen < len(templates):
+ greeting = templates[seen] % name
+ else:
+ greeting = f"Nice to see you at the {seen}-nth time {name}!"
+
+ await asyncio.sleep(1)
+ return greeting
+
+
+#
+# Serve the endpoint
+#
+
+
+handler = RequestReplyHandler(functions)
+
+
+async def handle(request):
+ req = await request.read()
+ res = await handler.handle_async(req)
+ return web.Response(body=res, content_type="application/octet-stream")
+
+
+app = web.Application()
+app.add_routes([web.post('/statefun', handle)])
+
+if __name__ == '__main__':
+ web.run_app(app, port=8000)
diff --git a/python/greeter/input-example.json
b/python/greeter/input-example.json
new file mode 100644
index 0000000..ad72aa8
--- /dev/null
+++ b/python/greeter/input-example.json
@@ -0,0 +1,2 @@
+{"name" : "Bob"}
+{"name" : "Joe"}
diff --git a/python/greeter/lib-dev/README.md b/python/greeter/lib-dev/README.md
new file mode 100644
index 0000000..f7905bd
--- /dev/null
+++ b/python/greeter/lib-dev/README.md
@@ -0,0 +1,4 @@
+# Unreleased artifacts
+
+This directory contains unreleased artifacts.
+TODO: remove this once merging `dev` to `main`.
diff --git
a/python/greeter/lib-dev/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl
b/python/greeter/lib-dev/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl
new file mode 100644
index 0000000..13ef023
Binary files /dev/null and
b/python/greeter/lib-dev/apache_flink_statefun-2.3_SNAPSHOT-py3-none-any.whl
differ
diff --git a/python/greeter/module.yaml b/python/greeter/module.yaml
new file mode 100644
index 0000000..5b50885
--- /dev/null
+++ b/python/greeter/module.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+version: "3.0"
+module:
+ meta:
+ type: remote
+ spec:
+ endpoints:
+ - endpoint:
+ meta:
+ kind: http
+ spec:
+ typename:
+ namespace: example
+ urlPathTemplate: http://functions:8000/statefun
+ timeouts:
+ call: 2min
+ ingresses:
+ - ingress:
+ meta:
+ type: statefun.kafka.io/routable-protobuf-ingress
+ id: example/names
+ spec:
+ address: kafka:9092
+ consumerGroupId: my-group-id
+ topics:
+ - topic: names
+ typeUrl: example/GreetRequest
+ targets:
+ - example/person
+ egresses:
+ - egress:
+ meta:
+ type: statefun.kafka.io/generic-egress
+ id: example/greets
+ spec:
+ address: kafka:9092
+ deliverySemantic:
+ type: exactly-once
+ transactionTimeoutMillis: 100000
+
diff --git a/python/greeter/requirements.txt b/python/greeter/requirements.txt
new file mode 100644
index 0000000..0796d9b
--- /dev/null
+++ b/python/greeter/requirements.txt
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+aiohttp
+apache-flink-statefun