This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
The following commit(s) were added to refs/heads/dev by this push:
new dce2fa7 [FLINK-21865] Add a Docker Compose greeter example
dce2fa7 is described below
commit dce2fa7db176039a43612f966a3536a8a72edc32
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Fri Mar 19 13:18:50 2021 +0800
[FLINK-21865] Add a Docker Compose greeter example
This closes #4.
---
java/greeter/Dockerfile | 36 ++++++
java/greeter/README.md | 74 +++++++++++
java/greeter/docker-compose.yml | 110 ++++++++++++++++
java/greeter/module.yaml | 53 ++++++++
java/greeter/pom.xml | 138 +++++++++++++++++++++
.../playground/java/greeter/GreeterAppServer.java | 49 ++++++++
.../playground/java/greeter/GreetingsFn.java | 75 +++++++++++
.../statefun/playground/java/greeter/UserFn.java | 84 +++++++++++++
.../playground/java/greeter/types/Types.java | 27 ++++
.../playground/java/greeter/types/UserLogin.java | 60 +++++++++
.../java/greeter/undertow/UndertowHttpHandler.java | 62 +++++++++
java/greeter/src/main/protobuf/messages.proto | 31 +++++
java/greeter/statefun-sdk-java-2.3-SNAPSHOT.jar | Bin 0 -> 1855003 bytes
java/greeter/user-logins.txt | 74 +++++++++++
java/showcase/README.md | 9 ++
15 files changed, 882 insertions(+)
diff --git a/java/greeter/Dockerfile b/java/greeter/Dockerfile
new file mode 100644
index 0000000..ef8a3bc
--- /dev/null
+++ b/java/greeter/Dockerfile
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Build the functions code ...
+FROM maven:3.6.3-jdk-11 AS builder
+COPY src /usr/src/app/src
+COPY pom.xml /usr/src/app
+# TODO remove these commented lines and the jar; this is needed now only
because we don't have the latest Java SDK published to Maven central yet
+# COPY statefun-sdk-java-2.3-SNAPSHOT.jar /usr/src/app
+# RUN mvn install:install-file \
+# -Dfile=/usr/src/app/statefun-sdk-java-2.3-SNAPSHOT.jar \
+# -DgroupId=org.apache.flink \
+# -DartifactId=statefun-sdk-java \
+# -Dversion=2.3-SNAPSHOT \
+# -Dpackaging=jar \
+# -DgeneratePom=true
+RUN mvn -f /usr/src/app/pom.xml clean package
+
+# ... and run the web server!
+FROM openjdk:8
+WORKDIR /
+COPY --from=builder
/usr/src/app/target/greeter-functions-app*jar-with-dependencies.jar
greeter-functions-app.jar
+EXPOSE 1108
+CMD java -jar greeter-functions-app.jar
diff --git a/java/greeter/README.md b/java/greeter/README.md
new file mode 100644
index 0000000..3e1cec3
--- /dev/null
+++ b/java/greeter/README.md
@@ -0,0 +1,74 @@
+# Greeter Example with Docker Compose
+
+This example is intended as a follow-up after completion of the [Java SDK
Showcase Tutorial](../showcase). If you're
+already familiar with the Java SDK fundamentals and would like to get a better
understanding of how a realistic StateFun
+application looks like, then you're in the right place! Otherwise, we highly
suggest taking a look at the Showcase
+tutorial first.
+
+This example works with Docker Compose, and runs a few services that build up
an end-to-end StateFun application:
+- Functions service that runs your functions and expose them through an HTTP
endpoint.
+- StateFun runtime processes (a manager plus workers) that will handle
ingress, egress, and inter-function messages as
+ well as function state storage in a consistent and fault-tolerant manner.
+- Apache Kafka broker for the application ingress and egress. StateFun
currently natively supports AWS Kinesis as well,
+ and you can also extend to connect with other systems.
+
+To motivate this example, we'll implement a simple user greeter application,
which has two functions - a `UserFn` that
+expects `UserLogin` JSON events from an ingress and keeps in state storage
information about users, and a `GreetingsFn`
+that accepts user information to generate personalized greeting messages that
are sent to users via an egress.
+
+## Directory structure
+
+- `src/`, `pom.xml` and `Dockerfile`: These files and directories are the
contents of a Java Maven project which builds
+ our functions service, hosting the `UserFn` and `UserLogin` behind a HTTP
endpoint. Check out the source code under
+ `src/main/java`. The `Dockerfile` is used to build a Docker image for our
functions service.
+- `user-logins.txt`: A file with multiple JSON objects per line; this is used
as test events produced to our application ingress.
+- `module.yaml`: The [Module Specification]() file to be mounted to the
StateFun runtime process containers. This
+ configures a few things for a StateFun application, such as the service
endpoints of the application's functions, as
+ well as definitions of [Ingresses and Egresses]() which the application will
use.
+- `docker-compose.yml`: Docker Compose file to spin up everything.
+
+## Prerequisites
+
+- Docker
+- Docker Compose
+
+## Running the example
+
+First, lets build the example. From this directory, execute:
+
+```
+$ docker-compose build
+```
+
+This pulls all the necessary Docker images (StateFun and Kafka), and also
builds the functions service image. This can
+take a few minutes as it also needs to build the function's Java project.
+
+Afterward the build completes, start running all the services:
+
+```
+$ docker-compose run
+```
+
+## Play around!
+
+You can take a look at what messages are being sent to the Kafka egress:
+
+```
+$ docker-compose exec kafka kafka-console-consumer \
+ --bootstrap-server kafka:9092 \
+ --topic greetings \
+ --from-beginning
+```
+
+You can also try modifying the function code in the `src/main/java` directory,
and do a zero-downtime upgrade of the
+functions. Some ideas you can try out:
+- Add some more state to be persisted by the `UserFn`. For example, let it
additionally keep track of the user's previous login location.
+- Or maybe just simply changing the greetings message generated by the
`GreetingsFn`!
+
+After you've finished changing the function code, you can do a hot redeploy of
your functions service:
+
+```
+$ docker-compose up -d --build greeter-functions
+```
+
+This rebuilds the functions service image with the updated code, and restarts
the service with the new image.
diff --git a/java/greeter/docker-compose.yml b/java/greeter/docker-compose.yml
new file mode 100644
index 0000000..dc57374
--- /dev/null
+++ b/java/greeter/docker-compose.yml
@@ -0,0 +1,110 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+version: "2.1"
+
+services:
+
+ ###############################################################
+ # Functions service
+ ###############################################################
+
+ greeter-functions:
+ build:
+ dockerfile: Dockerfile
+ context: .
+ expose:
+ - "1108"
+
+ ###############################################################
+ # StateFun runtime
+ ###############################################################
+
+ statefun-manager:
+ image: flink-statefun:2.3-SNAPSHOT
+ expose:
+ - "6123"
+ ports:
+ - "8081:8081"
+ environment:
+ ROLE: master
+ MASTER_HOST: statefun-manager
+ volumes:
+ - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+ statefun-worker:
+ image: flink-statefun:2.3-SNAPSHOT
+ expose:
+ - "6121"
+ - "6122"
+ depends_on:
+ - statefun-manager
+ - kafka
+ - greeter-functions
+ links:
+ - "statefun-manager:statefun-manager"
+ - "kafka:kafka"
+ - "greeter-functions:greeter-functions"
+ environment:
+ ROLE: worker
+ MASTER_HOST: statefun-manager
+ volumes:
+ - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+ ###############################################################
+ # Kafka for ingress and egress
+ ###############################################################
+
+ zookeeper:
+ image: confluentinc/cp-zookeeper:5.4.3
+ environment:
+ ZOOKEEPER_CLIENT_PORT: "2181"
+ ports:
+ - "2181:2181"
+
+ kafka:
+ image: confluentinc/cp-kafka:5.4.3
+ ports:
+ - "9092:9092"
+ depends_on:
+ - zookeeper
+ links:
+ - "zookeeper:zookeeper"
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+ ###############################################################
+ # Simple Kafka JSON producer to simulate ingress events
+ ###############################################################
+
+ user-logins-producer:
+ image: ververica/statefun-playground-producer:latest
+ depends_on:
+ - kafka
+ - statefun-worker
+ links:
+ - "kafka:kafka"
+ environment:
+ APP_PATH: /mnt/user-logins.txt
+ APP_KAFKA_HOST: kafka:9092
+ APP_KAFKA_TOPIC: user-logins
+ APP_JSON_PATH: user_id
+ volumes:
+ - ./user-logins.txt:/mnt/user-logins.txt
diff --git a/java/greeter/module.yaml b/java/greeter/module.yaml
new file mode 100644
index 0000000..9a06195
--- /dev/null
+++ b/java/greeter/module.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.0"
+
+module:
+ meta:
+ type: remote
+ spec:
+ endpoints:
+ - endpoint:
+ meta:
+ kind: http
+ spec:
+ typename:
+ namespace: greeter.fns
+ urlPathTemplate: http://greeter-functions:1108/
+ ingresses:
+ - ingress:
+ meta:
+ type: statefun.kafka.io/routable-protobuf-ingress
+ id: greeter.io/user-logins
+ spec:
+ address: kafka:9092
+ consumerGroupId: greeter
+ startupPosition:
+ type: earliest
+ topics:
+ - topic: user-logins
+ typeUrl:
greeter.types/org.apache.flink.statefun.playground.java.greeter.types.UserLogin
+ targets:
+ - greeter.fns/user
+ egresses:
+ - egress:
+ meta:
+ type: statefun.kafka.io/generic-egress
+ id: greeter.io/user-greetings
+ spec:
+ address: kafka:9092
+ deliverySemantic:
+ type: at-least-once
diff --git a/java/greeter/pom.xml b/java/greeter/pom.xml
new file mode 100644
index 0000000..c30299e
--- /dev/null
+++ b/java/greeter/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.flink</groupId>
+ <artifactId>greeter-functions-app</artifactId>
+ <version>3.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <properties>
+ <statefun.version>2.3-SNAPSHOT</statefun.version>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <!-- StateFun Java SDK -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-sdk-java</artifactId>
+ <version>${statefun.version}</version>
+ </dependency>
+
+ <!-- For custom type serialization (JSON and Protobuf) -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.12.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.11.4</version>
+ </dependency>
+
+ <!-- Undertow web server -->
+ <dependency>
+ <groupId>io.undertow</groupId>
+ <artifactId>undertow-core</artifactId>
+ <version>1.4.18.Final</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Build a fat executable jar -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.3.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+
<mainClass>org.apache.flink.statefun.playground.java.greeter.GreeterAppServer</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Java code style -->
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>1.20.0</version>
+ <configuration>
+ <java>
+ <googleJavaFormat>
+ <version>1.7</version>
+ <style>GOOGLE</style>
+ </googleJavaFormat>
+ <removeUnusedImports/>
+ </java>
+ </configuration>
+ <executions>
+ <execution>
+ <id>spotless-check</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Protoc plugin to auto-generate Protobuf message files -->
+ <plugin>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ <version>3.11.1</version>
+ <executions>
+ <execution>
+ <id>generate-protobuf-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <includeStdTypes>true</includeStdTypes>
+ <protocVersion>3.11.4</protocVersion>
+ <cleanOutputFolder>true</cleanOutputFolder>
+
<outputDirectory>${basedir}/target/generated-sources/protoc-jar</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java
new file mode 100644
index 0000000..0da66fa
--- /dev/null
+++
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.playground.java.greeter;
+
+import io.undertow.Undertow;
+import
org.apache.flink.statefun.playground.java.greeter.undertow.UndertowHttpHandler;
+import org.apache.flink.statefun.sdk.java.StatefulFunctions;
+import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+
+/**
+ * Entry point to start an {@link Undertow} web server that exposes the
functions that build up our
+ * User Greeter application, {@link UserFn} and {@link GreetingsFn}.
+ *
+ * <p>Here we are using the {@link Undertow} web server just to show a very
simple demonstration.
+ * You may choose any web server that can handle HTTP request and responses,
for example, Spring,
+ * Micronaut, or even deploy your functions on popular FaaS platforms, like
AWS Lambda.
+ */
+public final class GreeterAppServer {
+
+ public static void main(String[] args) {
+ final StatefulFunctions functions = new StatefulFunctions();
+ functions.withStatefulFunction(UserFn.SPEC);
+ functions.withStatefulFunction(GreetingsFn.SPEC);
+
+ final RequestReplyHandler requestReplyHandler =
functions.requestReplyHandler();
+ final Undertow httpServer =
+ Undertow.builder()
+ .addHttpListener(1108, "0.0.0.0")
+ .setHandler(new UndertowHttpHandler(requestReplyHandler))
+ .build();
+ httpServer.start();
+ }
+}
diff --git
a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java
new file mode 100644
index 0000000..d37d712
--- /dev/null
+++
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreetingsFn.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.playground.java.greeter;
+
+import static
org.apache.flink.statefun.playground.java.greeter.types.Types.USER_PROFILE_PROTOBUF_TYPE;
+
+import java.util.concurrent.CompletableFuture;
+import
org.apache.flink.statefun.playground.java.greeter.types.generated.UserProfile;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * A simple function that computes personalized greetings messages based on a
given {@link
+ * UserProfile}. Then, it sends the greetings message back to the user via an
egress Kafka topic.
+ */
+final class GreetingsFn implements StatefulFunction {
+
+ private static final String[] GREETINGS_TEMPLATES =
+ new String[] {"Welcome %s!", "Nice to see you again %s.", "Third time is
a charm %s!"};
+
+ static final TypeName TYPENAME = TypeName.typeNameOf("greeter.fns",
"greetings");
+ static final StatefulFunctionSpec SPEC =
+
StatefulFunctionSpec.builder(TYPENAME).withSupplier(GreetingsFn::new).build();
+
+ private static final TypeName KAFKA_EGRESS =
TypeName.typeNameOf("greeter.io", "user-greetings");
+
+ @Override
+ public CompletableFuture<Void> apply(Context context, Message message) {
+ if (message.is(USER_PROFILE_PROTOBUF_TYPE)) {
+ final UserProfile profile = message.as(USER_PROFILE_PROTOBUF_TYPE);
+ final String greetings = createGreetingsMessage(profile);
+
+ final String userId = context.self().id();
+ context.send(
+ KafkaEgressMessage.forEgress(KAFKA_EGRESS)
+ .withTopic("greetings")
+ .withUtf8Key(userId)
+ .withUtf8Value(greetings)
+ .build());
+ }
+ return context.done();
+ }
+
+ private static String createGreetingsMessage(UserProfile profile) {
+ final int seenCount = profile.getSeenCount();
+
+ if (seenCount <= GREETINGS_TEMPLATES.length) {
+ return String.format(GREETINGS_TEMPLATES[seenCount - 1],
profile.getName());
+ } else {
+ return String.format(
+ "Nice to see you for the %dth time, %s! It has been %d milliseconds
since we last saw you.",
+ seenCount, profile.getName(), profile.getLastSeenDeltaMs());
+ }
+ }
+}
diff --git
a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/UserFn.java
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/UserFn.java
new file mode 100644
index 0000000..92f68ec
--- /dev/null
+++
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/UserFn.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.playground.java.greeter;
+
+import static
org.apache.flink.statefun.playground.java.greeter.types.Types.USER_LOGIN_JSON_TYPE;
+import static
org.apache.flink.statefun.playground.java.greeter.types.Types.USER_PROFILE_PROTOBUF_TYPE;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.playground.java.greeter.types.UserLogin;
+import
org.apache.flink.statefun.playground.java.greeter.types.generated.UserProfile;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
+
+/**
+ * A stateful function that is invoked with {@link UserLogin} events, and
persists in state storage
+ * the number of times we've seen the user log in as well as the user's last
seen timestamp. This
+ * function in-turn invokes {@link GreetingsFn} with the user information to
generate a personalized
+ * greetings message.
+ */
+final class UserFn implements StatefulFunction {
+
+ private static final ValueSpec<Integer> SEEN_COUNT =
ValueSpec.named("seen_count").withIntType();
+ private static final ValueSpec<Long> SEEN_TIMESTAMP_MS =
+ ValueSpec.named("seen_timestamp_ms").withLongType();
+
+ static final TypeName TYPENAME = TypeName.typeNameOf("greeter.fns", "user");
+ static final StatefulFunctionSpec SPEC =
+ StatefulFunctionSpec.builder(TYPENAME)
+ .withValueSpecs(SEEN_COUNT, SEEN_TIMESTAMP_MS)
+ .withSupplier(UserFn::new)
+ .build();
+
+ @Override
+ public CompletableFuture<Void> apply(Context context, Message message) {
+ if (message.is(USER_LOGIN_JSON_TYPE)) {
+ final UserLogin login = message.as(USER_LOGIN_JSON_TYPE);
+
+ int seenCount = context.storage().get(SEEN_COUNT).orElse(0);
+ seenCount++;
+
+ final long nowMs = System.currentTimeMillis();
+ final long lastSeenTimestampMs =
context.storage().get(SEEN_TIMESTAMP_MS).orElse(nowMs);
+
+ context.storage().set(SEEN_COUNT, seenCount);
+ context.storage().set(SEEN_TIMESTAMP_MS, nowMs);
+
+ final UserProfile profile =
+ UserProfile.newBuilder()
+ .setName(login.getUserName())
+ .setLoginLocation(login.getLoginType().name())
+ .setSeenCount(seenCount)
+ .setLastSeenDeltaMs(nowMs - lastSeenTimestampMs)
+ .build();
+ context.send(
+ MessageBuilder.forAddress(GreetingsFn.TYPENAME, login.getUserId())
+ .withCustomType(USER_PROFILE_PROTOBUF_TYPE, profile)
+ .build());
+ } else {
+ throw new IllegalArgumentException("Unexpected message type: " +
message.valueTypeName());
+ }
+ return context.done();
+ }
+}
diff --git
a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
new file mode 100644
index 0000000..3ceaeef
--- /dev/null
+++
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
@@ -0,0 +1,27 @@
+package org.apache.flink.statefun.playground.java.greeter.types;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.statefun.playground.java.greeter.types.generated.UserProfile;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+
+public final class Types {
+
+ private Types() {}
+
+ private static final ObjectMapper JSON_OBJ_MAPPER = new ObjectMapper();
+ private static final String TYPES_NAMESPACE = "greeter.types";
+
+ public static final Type<UserLogin> USER_LOGIN_JSON_TYPE =
+ SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameOf(TYPES_NAMESPACE, UserLogin.class.getName()),
+ JSON_OBJ_MAPPER::writeValueAsBytes,
+ bytes -> JSON_OBJ_MAPPER.readValue(bytes, UserLogin.class));
+
+ public static final Type<UserProfile> USER_PROFILE_PROTOBUF_TYPE =
+ SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameOf(TYPES_NAMESPACE,
UserProfile.getDescriptor().getFullName()),
+ UserProfile::toByteArray,
+ UserProfile::parseFrom);
+}
diff --git
a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/UserLogin.java
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/UserLogin.java
new file mode 100644
index 0000000..7557678
--- /dev/null
+++
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/UserLogin.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.playground.java.greeter.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.statefun.sdk.java.types.Type;
+
+/**
+ * Java Bean class for user login events, which are passed around in JSON form.
+ *
+ * <p>Please take a look at {@link Types#USER_LOGIN_JSON_TYPE} on how a custom
StateFun {@link Type}
+ * can be created to let the Java SDK understand how to marshal and unmarshal
JSON strings (either
+ * used as invocation messages or state values) to this Java Bean class.
+ */
+public final class UserLogin {
+
+ public enum LoginType {
+ WEB,
+ MOBILE
+ }
+
+ @JsonProperty("user_id")
+ private String userId;
+
+ @JsonProperty("user_name")
+ private String userName;
+
+ @JsonProperty("login_type")
+ private LoginType loginType;
+
+ public UserLogin() {}
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public LoginType getLoginType() {
+ return loginType;
+ }
+}
diff --git
a/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java
new file mode 100644
index 0000000..6be5a42
--- /dev/null
+++
b/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.playground.java.greeter.undertow;
+
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.Headers;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+
+/**
+ * A simple Undertow {@link HttpHandler} that delegates requests from StateFun
runtime processes to
+ * a StateFun {@link RequestReplyHandler}.
+ */
+public final class UndertowHttpHandler implements HttpHandler {
+ private final RequestReplyHandler handler;
+
+ public UndertowHttpHandler(RequestReplyHandler handler) {
+ this.handler = Objects.requireNonNull(handler);
+ }
+
+ @Override
+ public void handleRequest(HttpServerExchange exchange) {
+ exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
+ }
+
+ private void onRequestBody(HttpServerExchange exchange, byte[] requestBytes)
{
+ exchange.dispatch();
+ CompletableFuture<Slice> future =
handler.handle(Slices.wrap(requestBytes));
+ future.whenComplete((response, exception) -> onComplete(exchange,
response, exception));
+ }
+
+ private void onComplete(HttpServerExchange exchange, Slice responseBytes,
Throwable ex) {
+ if (ex != null) {
+ ex.printStackTrace(System.out);
+ exchange.getResponseHeaders().put(Headers.STATUS, 500);
+ exchange.endExchange();
+ return;
+ }
+ exchange.getResponseHeaders().put(Headers.CONTENT_TYPE,
"application/octet-stream");
+ exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer());
+ }
+}
diff --git a/java/greeter/src/main/protobuf/messages.proto
b/java/greeter/src/main/protobuf/messages.proto
new file mode 100644
index 0000000..9b2febc
--- /dev/null
+++ b/java/greeter/src/main/protobuf/messages.proto
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.playground.java.greeter.types;
+
+option java_package =
"org.apache.flink.statefun.playground.java.greeter.types.generated";
+option java_multiple_files = true;
+
+message UserProfile {
+ string name = 1;
+ int64 last_seen_delta_ms = 2;
+ string login_location = 3;
+ int32 seen_count = 4;
+}
diff --git a/java/greeter/statefun-sdk-java-2.3-SNAPSHOT.jar
b/java/greeter/statefun-sdk-java-2.3-SNAPSHOT.jar
new file mode 100644
index 0000000..60fbd1d
Binary files /dev/null and b/java/greeter/statefun-sdk-java-2.3-SNAPSHOT.jar
differ
diff --git a/java/greeter/user-logins.txt b/java/greeter/user-logins.txt
new file mode 100644
index 0000000..73499c0
--- /dev/null
+++ b/java/greeter/user-logins.txt
@@ -0,0 +1,74 @@
+{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
+{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
+{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
+{"user_id": "3", "user_name": "Marta", "login_type": "WEB"}
+{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
+{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
+{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
+{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
+{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
+{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
+{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
+{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
+{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
+{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
+{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
+{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
+{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
+{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
+{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "4", "user_name": "Seth", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
+{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
+{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
+{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
+{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "2", "user_name": "Igal", "login_type": "WEB"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
+{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
+{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
+{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "WEB"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
+{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
+{"user_id": "6", "user_name": "Konstantin", "login_type": "MOBILE"}
+{"user_id": "5", "user_name": "Stephan", "login_type": "WEB"}
+{"user_id": "4", "user_name": "Seth", "login_type": "WEB"}
+{"user_id": "3", "user_name": "Marta", "login_type": "MOBILE"}
+{"user_id": "2", "user_name": "Igal", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "MOBILE"}
+{"user_id": "4", "user_name": "Seth", "login_type": "MOBILE"}
+{"user_id": "1", "user_name": "Gordon", "login_type": "WEB"}
\ No newline at end of file
diff --git a/java/showcase/README.md b/java/showcase/README.md
index c9b9e87..bb1ef49 100644
--- a/java/showcase/README.md
+++ b/java/showcase/README.md
@@ -39,3 +39,12 @@ from remote databases.
The grand finale of this tutorial series. In this last part of the tutorial,
you will serve functions via a HTTP web
server, so that it is reachable by the StateFun runtime. This final part of
the tutorial provides a runnable demo
experience, so we highly recommend taking a look to see everything in action!
+
+## Next Steps
+
+The setup you executed in the last part of this tutorial is not how you'd
normally deploy StateFun processes
+and functions. It's a rather simplified setup to allow you to explore the
interaction between
+functions and the StateFun processes by setting debugger breakpoints in the
function code in your IDE.
+
+We recommend now to take a look at a slightly more realistic setup, using
Docker Compose, in the
+[Greeter Docker Compose Example](../greeter).