afedulov commented on a change in pull request #211:
URL: https://github.com/apache/flink-statefun/pull/211#discussion_r595074016
##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
Stateful functions are the building blocks of applications; they are atomic
units of isolation, distribution, and persistence.
As objects, they encapsulate the state of a single entity (e.g., a specific
user, device, or session) and encode its behavior.
Stateful functions can interact with each other, and external systems, through
message passing.
-The Java SDK is supported as an [embedded module]({{< ref
"docs/sdk/overview#embedded-module" >}}).
To get started, add the Java SDK as a dependency to your application.
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
## Defining A Stateful Function
-A stateful function is any class that implements the ``StatefulFunction``
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction`
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
- @Override
- public void invoke(Context context, Object input) {
- System.out.println("Hello " + input.toString());
- }
-}
-```
+public class GreeterFn implements StatefulFunction {
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so
one function can potentially process multiple types of messages.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-The ``Context`` provides metadata about the current message and function, and
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+ static final TypeName INBOX =
TypeName.forNameFromString("com.example.fns/inbox");
Review comment:
forNameFromString => typeNameFromString
##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
Stateful functions are the building blocks of applications; they are atomic
units of isolation, distribution, and persistence.
As objects, they encapsulate the state of a single entity (e.g., a specific
user, device, or session) and encode its behavior.
Stateful functions can interact with each other, and external systems, through
message passing.
-The Java SDK is supported as an [embedded module]({{< ref
"docs/sdk/overview#embedded-module" >}}).
To get started, add the Java SDK as a dependency to your application.
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
## Defining A Stateful Function
-A stateful function is any class that implements the ``StatefulFunction``
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction`
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
- @Override
- public void invoke(Context context, Object input) {
- System.out.println("Hello " + input.toString());
- }
-}
-```
+public class GreeterFn implements StatefulFunction {
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so
one function can potentially process multiple types of messages.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-The ``Context`` provides metadata about the current message and function, and
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+ static final TypeName INBOX =
TypeName.forNameFromString("com.example.fns/inbox");
-### Stateful Match Function
+ static final ValueSpec<Integer> SEEN =
ValueSpec.named("seen").withIntType();
-Stateful functions provide a powerful abstraction for working with events and
state, allowing developers to build components that can react to any kind of
message.
-Commonly, functions only need to handle a known set of message types, and the
``StatefulMatchFunction`` interface provides an opinionated solution to that
problem.
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!message.is(User.TYPE)) {
+ throw new IllegalStateException("Unknown type");
+ }
-#### Simple Match Function
+ User user = message.as(User.TYPE);
+ String name = user.getName();
-Stateful match functions are an opinionated variant of stateful functions for
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the
first time an instance is loaded.
+ var storage = context.storage();
+ var seen = storage.get(SEEN).orElse(0);
+ storage.set(SEEN, seen + 1);
-```java
-package org.apache.flink.statefun.docs.match;
+ context.send(
+ MessageBuilder.forAddress(INBOX, name)
+ .withValue("Hello " + name + " for the " + seen + "th time!")
+ .build());
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+ return context.done();
+ }
+}
+```
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be
[registered](#exposing-functions) under the logical type name
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current
address and stores an integer.
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee);
- }
+Every time a message is sent a greeter, it first validates the message
containing a `User` and extracts its name. Both messages and state are strongly
typed - either one of the default [built-in types]({{< ref
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from
the state store and updated
+and the greeting is sent to the users' inbox (another function type).
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
+## Types
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-}
-```
+Stateful Functions strongly types ll messages and state values.
+Because they run in a distributed manner and state values are persisted to
stable storage, Stateful Functions aims to provide efficient and easy to user
serializers.
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more
complex data structures.
-Similar to the first example, match functions are partial by default and will
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as
a catch-all for unmatched input, think of it as a default clause in a Java
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``,
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical
Java class, it can be passed to functions written in other langauge SDKs.
```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee)
- .otherwise(this::catchAll);
- }
-
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
-
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
-
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-
- private void catchAll(Context context, Object message) {
- System.out.println("Hello unexpected message");
- }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
-#### Action Resolution Order
+public class User {
-Match functions will always match actions from most to least specific using
the following resolution rules.
+ private static final ObjectMapper mapper = new ObjectMapper();
-First, find an action that matches the type and predicate. If two predicates
will return true for a particular input, the one registered in the binder first
wins.
-Next, search for an action that matches the type but does not have an
associated predicate.
-Finally, if a catch-all exists, it will be executed or an
``IllegalStateException`` will be thrown.
+ public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameFromString("com.example/User"),
+ mapper:writeValueAsBytes,
+ bytes -> mapper.readValue(byes, User.class));
-## Function Types and Messaging
+ private final String name;
-In Java, function types are defined as logical pointers composed of a
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+ private final String favoriteColor;
-```java
-package org.apache.flink.statefun.docs;
+ @JsonCreator
+ public User(
+ @JsonProperty("name") String name,
+ @JsonProperty("favorite_color" String favoriteColor)) {
-import org.apache.flink.statefun.sdk.FunctionType;
+ this.name = Objects.requireNonNull(name);
+ this.favoriteColor = Objects.requireNonNull(favoriteColor);
+ }
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+ public String getName() {
+ return name;
+ }
- public static final FunctionType HELLO_TYPE = new
FunctionType("apache/flink", "hello");
-}
+ public String getFavoriteColor() {
+ return favoriteColor;
+ }
+
+ @Override
+ public String toString() {
+ return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+ }
```
-This type can then be referenced from other functions to create an address and
message a particular instance.
+## State
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions
can easily define state that is automatically made fault tolerant by the
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s
describing your state values.
+Value specifications are defined with a unique (to the function) name and
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to
the address of the current message.
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
- @Override
- public void invoke(Context context, Object input) {
- context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
- }
-}
+```java
+// Value specification for a state named `seen`
+// with the primitive integer type
+ValueSpec
+ .named("seen")
+ .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+ .name("user")
+ .withCustomType(User.TYPE);
```
-## Sending Delayed Messages
+### State Expiration
-Functions are able to send messages on a delay so that they will arrive after
some duration.
-Functions may even send themselves delayed messages that can serve as a
callback.
-The delayed message is non-blocking so functions will continue to process
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted
after a specified duration.
```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
- @Override
- public void invoke(Context context, Object input) {
- if (input instanceof Message) {
- System.out.println("Hello");
- context.sendAfter(Duration.ofMinutes(1),
context.self(), new DelayedMessage());
- }
-
- if (input instanceof DelayedMessage) {
- System.out.println("Welcome to the future!");
- }
- }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes
+// more than 30 minutes without being called
+ValueSpec
+ .named("seen")
+ .thatExpiresAfterCall(Duration.ofDays(1))
+ .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+ .named("seen")
+ .thatExpireAfterWrite(Duration.ofDays(1))
+ .withIntType();
```
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs
to take care that communication delay with the external system does not
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional
context about the caller.
+## Sending Delayed Messages
-When the future completes, either successfully or exceptionally, the caller
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering
from failure.
-### Success
+This example sends a response back to the calling function after a 30 minute
delay.
-The asynchronous operation has succeeded, and the produced result can be
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
-The asynchronous operation has failed, and the cause can be obtained via
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
-### Unknown
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
-The stateful function was restarted, possibly on a different machine, before
the ``CompletableFuture`` was completed, therefore it is unknown what is the
status of the asynchronous operation.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/delayed");
Review comment:
forNameFromString => typeNameFromString
##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
Stateful functions are the building blocks of applications; they are atomic
units of isolation, distribution, and persistence.
As objects, they encapsulate the state of a single entity (e.g., a specific
user, device, or session) and encode its behavior.
Stateful functions can interact with each other, and external systems, through
message passing.
-The Java SDK is supported as an [embedded module]({{< ref
"docs/sdk/overview#embedded-module" >}}).
To get started, add the Java SDK as a dependency to your application.
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
## Defining A Stateful Function
-A stateful function is any class that implements the ``StatefulFunction``
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction`
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
- @Override
- public void invoke(Context context, Object input) {
- System.out.println("Hello " + input.toString());
- }
-}
-```
+public class GreeterFn implements StatefulFunction {
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so
one function can potentially process multiple types of messages.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-The ``Context`` provides metadata about the current message and function, and
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+ static final TypeName INBOX =
TypeName.forNameFromString("com.example.fns/inbox");
-### Stateful Match Function
+ static final ValueSpec<Integer> SEEN =
ValueSpec.named("seen").withIntType();
-Stateful functions provide a powerful abstraction for working with events and
state, allowing developers to build components that can react to any kind of
message.
-Commonly, functions only need to handle a known set of message types, and the
``StatefulMatchFunction`` interface provides an opinionated solution to that
problem.
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!message.is(User.TYPE)) {
+ throw new IllegalStateException("Unknown type");
+ }
-#### Simple Match Function
+ User user = message.as(User.TYPE);
+ String name = user.getName();
-Stateful match functions are an opinionated variant of stateful functions for
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the
first time an instance is loaded.
+ var storage = context.storage();
+ var seen = storage.get(SEEN).orElse(0);
+ storage.set(SEEN, seen + 1);
-```java
-package org.apache.flink.statefun.docs.match;
+ context.send(
+ MessageBuilder.forAddress(INBOX, name)
+ .withValue("Hello " + name + " for the " + seen + "th time!")
+ .build());
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+ return context.done();
+ }
+}
+```
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be
[registered](#exposing-functions) under the logical type name
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current
address and stores an integer.
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee);
- }
+Every time a message is sent a greeter, it first validates the message
containing a `User` and extracts its name. Both messages and state are strongly
typed - either one of the default [built-in types]({{< ref
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from
the state store and updated
+and the greeting is sent to the users' inbox (another function type).
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
+## Types
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-}
-```
+Stateful Functions strongly types ll messages and state values.
+Because they run in a distributed manner and state values are persisted to
stable storage, Stateful Functions aims to provide efficient and easy to user
serializers.
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more
complex data structures.
-Similar to the first example, match functions are partial by default and will
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as
a catch-all for unmatched input, think of it as a default clause in a Java
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``,
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical
Java class, it can be passed to functions written in other langauge SDKs.
```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee)
- .otherwise(this::catchAll);
- }
-
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
-
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
-
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-
- private void catchAll(Context context, Object message) {
- System.out.println("Hello unexpected message");
- }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
-#### Action Resolution Order
+public class User {
-Match functions will always match actions from most to least specific using
the following resolution rules.
+ private static final ObjectMapper mapper = new ObjectMapper();
-First, find an action that matches the type and predicate. If two predicates
will return true for a particular input, the one registered in the binder first
wins.
-Next, search for an action that matches the type but does not have an
associated predicate.
-Finally, if a catch-all exists, it will be executed or an
``IllegalStateException`` will be thrown.
+ public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameFromString("com.example/User"),
+ mapper:writeValueAsBytes,
+ bytes -> mapper.readValue(byes, User.class));
-## Function Types and Messaging
+ private final String name;
-In Java, function types are defined as logical pointers composed of a
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+ private final String favoriteColor;
-```java
-package org.apache.flink.statefun.docs;
+ @JsonCreator
+ public User(
+ @JsonProperty("name") String name,
+ @JsonProperty("favorite_color" String favoriteColor)) {
-import org.apache.flink.statefun.sdk.FunctionType;
+ this.name = Objects.requireNonNull(name);
+ this.favoriteColor = Objects.requireNonNull(favoriteColor);
+ }
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+ public String getName() {
+ return name;
+ }
- public static final FunctionType HELLO_TYPE = new
FunctionType("apache/flink", "hello");
-}
+ public String getFavoriteColor() {
+ return favoriteColor;
+ }
+
+ @Override
+ public String toString() {
+ return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+ }
```
-This type can then be referenced from other functions to create an address and
message a particular instance.
+## State
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions
can easily define state that is automatically made fault tolerant by the
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s
describing your state values.
+Value specifications are defined with a unique (to the function) name and
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to
the address of the current message.
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
- @Override
- public void invoke(Context context, Object input) {
- context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
- }
-}
+```java
+// Value specification for a state named `seen`
+// with the primitive integer type
+ValueSpec
+ .named("seen")
+ .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+ .name("user")
+ .withCustomType(User.TYPE);
```
-## Sending Delayed Messages
+### State Expiration
-Functions are able to send messages on a delay so that they will arrive after
some duration.
-Functions may even send themselves delayed messages that can serve as a
callback.
-The delayed message is non-blocking so functions will continue to process
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted
after a specified duration.
```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
- @Override
- public void invoke(Context context, Object input) {
- if (input instanceof Message) {
- System.out.println("Hello");
- context.sendAfter(Duration.ofMinutes(1),
context.self(), new DelayedMessage());
- }
-
- if (input instanceof DelayedMessage) {
- System.out.println("Welcome to the future!");
- }
- }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes
+// more than 30 minutes without being called
+ValueSpec
+ .named("seen")
+ .thatExpiresAfterCall(Duration.ofDays(1))
+ .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+ .named("seen")
+ .thatExpireAfterWrite(Duration.ofDays(1))
+ .withIntType();
```
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs
to take care that communication delay with the external system does not
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional
context about the caller.
+## Sending Delayed Messages
-When the future completes, either successfully or exceptionally, the caller
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering
from failure.
-### Success
+This example sends a response back to the calling function after a 30 minute
delay.
-The asynchronous operation has succeeded, and the produced result can be
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
-The asynchronous operation has failed, and the cause can be obtained via
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
-### Unknown
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
-The stateful function was restarted, possibly on a different machine, before
the ``CompletableFuture`` was completed, therefore it is unknown what is the
status of the asynchronous operation.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/delayed");
-```java
-package org.apache.flink.statefun.docs.async;
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!context.caller().isPresent()) {
+ LOG.debug("Message has no known caller meaning it was sent
directly from an ingress");
+ return;
+ }
-import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.sdk.AsyncOperationResult;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-@SuppressWarnings("unchecked")
-public class EnrichmentFunction implements StatefulFunction {
-
- private final QueryService client;
-
- public EnrichmentFunction(QueryService client) {
- this.client = client;
- }
-
- @Override
- public void invoke(Context context, Object input) {
- if (input instanceof User) {
- onUser(context, (User) input);
- } else if (input instanceof AsyncOperationResult) {
- onAsyncResult((AsyncOperationResult) input);
- }
- }
-
- private void onUser(Context context, User user) {
- CompletableFuture<UserEnrichment> future =
client.getDataAsync(user.getUserId());
- context.registerAsyncOperation(user, future);
- }
-
- private void onAsyncResult(AsyncOperationResult<User, UserEnrichment>
result) {
- if (result.successful()) {
- User metadata = result.metadata();
- UserEnrichment value = result.value();
- System.out.println(
- String.format("Successfully completed future:
%s %s", metadata, value));
- } else if (result.failure()) {
- System.out.println(
- String.format("Something has gone terribly
wrong %s", result.throwable()));
- } else {
- System.out.println("Not sure what happened, maybe
retry");
- }
- }
+ var caller = context.caller().get();
+ context.sendAfter(Duration.ofMinutes(30), MessageBuilder
+ .forAddress(caller)
+ .withValue("Hello from the future!"));
+ }
}
```
-## Persistence
-
-Stateful Functions treats state as a first class citizen and so all stateful
functions can easily define state that is automatically made fault tolerant by
the runtime.
-All stateful functions may contain state by merely defining one or more
persisted fields.
+## Egress
-The simplest way to get started is with a ``PersistedValue``, which is defined
by its name and the class of the type that it stores.
-The data is always scoped to a specific function type and identifier.
-Below is a stateful function that greets users based on the number of times
they have been seen.
-
-{{< hint info >}}
-All **PersistedValue**, **PersistedTable**, and **PersistedAppendingBuffer**
fields must be marked with a **@Persisted** annotation or they will not be made
fault tolerant by the runtime.
-{{< /hint >}}
+Functions can message other stateful functions and egresses, exit points for
sending messages to the outside world.
+As with other messages, egress messages are always well-typed.
+Additionally, they contain metadata pertinent to the specific egress type.
+{{< tabs "egress" >}}
+{{< tab "Apache Kafka" >}}
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class FnUserGreeter implements StatefulFunction {
-
- public static FunctionType TYPE = new FunctionType("example",
"greeter");
-
- @Persisted
- private final PersistedValue<Integer> count =
PersistedValue.of("count", Integer.class);
-
- public void invoke(Context context, Object input) {
- String userId = context.self().id();
- int seen = count.getOrDefault(0);
-
- switch (seen) {
- case 0:
- System.out.println(String.format("Hello %s!",
userId));
- break;
- case 1:
- System.out.println("Hello Again!");
- break;
- case 2:
- System.out.println("Third time is the charm
:)");
- break;
- default:
- System.out.println(String.format("Hello for the
%d-th time", seen + 1));
- }
-
- count.set(seen + 1);
- }
-}
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
-``PersistedValue`` comes with the right primitive methods to build powerful
stateful applications.
-Calling ``PersistedValue#get`` will return the current value of an object
stored in state, or ``null`` if nothing is set.
-Conversely, ``PersistedValue#set`` will update the value in state and
``PersistedValue#clear`` will delete the value from state.
+public class GreeterFn implements StatefulFunction {
-### Collection Types
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
Review comment:
forNameFromString => typeNameFromString
##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
Stateful functions are the building blocks of applications; they are atomic
units of isolation, distribution, and persistence.
As objects, they encapsulate the state of a single entity (e.g., a specific
user, device, or session) and encode its behavior.
Stateful functions can interact with each other, and external systems, through
message passing.
-The Java SDK is supported as an [embedded module]({{< ref
"docs/sdk/overview#embedded-module" >}}).
To get started, add the Java SDK as a dependency to your application.
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
## Defining A Stateful Function
-A stateful function is any class that implements the ``StatefulFunction``
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction`
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
- @Override
- public void invoke(Context context, Object input) {
- System.out.println("Hello " + input.toString());
- }
-}
-```
+public class GreeterFn implements StatefulFunction {
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so
one function can potentially process multiple types of messages.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-The ``Context`` provides metadata about the current message and function, and
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+ static final TypeName INBOX =
TypeName.forNameFromString("com.example.fns/inbox");
-### Stateful Match Function
+ static final ValueSpec<Integer> SEEN =
ValueSpec.named("seen").withIntType();
-Stateful functions provide a powerful abstraction for working with events and
state, allowing developers to build components that can react to any kind of
message.
-Commonly, functions only need to handle a known set of message types, and the
``StatefulMatchFunction`` interface provides an opinionated solution to that
problem.
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!message.is(User.TYPE)) {
+ throw new IllegalStateException("Unknown type");
+ }
-#### Simple Match Function
+ User user = message.as(User.TYPE);
+ String name = user.getName();
-Stateful match functions are an opinionated variant of stateful functions for
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the
first time an instance is loaded.
+ var storage = context.storage();
+ var seen = storage.get(SEEN).orElse(0);
+ storage.set(SEEN, seen + 1);
-```java
-package org.apache.flink.statefun.docs.match;
+ context.send(
+ MessageBuilder.forAddress(INBOX, name)
+ .withValue("Hello " + name + " for the " + seen + "th time!")
+ .build());
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+ return context.done();
+ }
+}
+```
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be
[registered](#exposing-functions) under the logical type name
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current
address and stores an integer.
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee);
- }
+Every time a message is sent a greeter, it first validates the message
containing a `User` and extracts its name. Both messages and state are strongly
typed - either one of the default [built-in types]({{< ref
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from
the state store and updated
+and the greeting is sent to the users' inbox (another function type).
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
+## Types
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-}
-```
+Stateful Functions strongly types ll messages and state values.
+Because they run in a distributed manner and state values are persisted to
stable storage, Stateful Functions aims to provide efficient and easy to user
serializers.
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more
complex data structures.
-Similar to the first example, match functions are partial by default and will
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as
a catch-all for unmatched input, think of it as a default clause in a Java
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``,
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical
Java class, it can be passed to functions written in other langauge SDKs.
```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee)
- .otherwise(this::catchAll);
- }
-
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
-
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
-
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-
- private void catchAll(Context context, Object message) {
- System.out.println("Hello unexpected message");
- }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
-#### Action Resolution Order
+public class User {
-Match functions will always match actions from most to least specific using
the following resolution rules.
+ private static final ObjectMapper mapper = new ObjectMapper();
-First, find an action that matches the type and predicate. If two predicates
will return true for a particular input, the one registered in the binder first
wins.
-Next, search for an action that matches the type but does not have an
associated predicate.
-Finally, if a catch-all exists, it will be executed or an
``IllegalStateException`` will be thrown.
+ public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameFromString("com.example/User"),
+ mapper:writeValueAsBytes,
+ bytes -> mapper.readValue(byes, User.class));
-## Function Types and Messaging
+ private final String name;
-In Java, function types are defined as logical pointers composed of a
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+ private final String favoriteColor;
-```java
-package org.apache.flink.statefun.docs;
+ @JsonCreator
+ public User(
+ @JsonProperty("name") String name,
+ @JsonProperty("favorite_color" String favoriteColor)) {
-import org.apache.flink.statefun.sdk.FunctionType;
+ this.name = Objects.requireNonNull(name);
+ this.favoriteColor = Objects.requireNonNull(favoriteColor);
+ }
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+ public String getName() {
+ return name;
+ }
- public static final FunctionType HELLO_TYPE = new
FunctionType("apache/flink", "hello");
-}
+ public String getFavoriteColor() {
+ return favoriteColor;
+ }
+
+ @Override
+ public String toString() {
+ return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+ }
```
-This type can then be referenced from other functions to create an address and
message a particular instance.
+## State
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions
can easily define state that is automatically made fault tolerant by the
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s
describing your state values.
+Value specifications are defined with a unique (to the function) name and
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to
the address of the current message.
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
- @Override
- public void invoke(Context context, Object input) {
- context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
- }
-}
+```java
+// Value specification for a state named `seen`
+// with the primitive integer type
+ValueSpec
+ .named("seen")
+ .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+ .name("user")
+ .withCustomType(User.TYPE);
```
-## Sending Delayed Messages
+### State Expiration
-Functions are able to send messages on a delay so that they will arrive after
some duration.
-Functions may even send themselves delayed messages that can serve as a
callback.
-The delayed message is non-blocking so functions will continue to process
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted
after a specified duration.
```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
- @Override
- public void invoke(Context context, Object input) {
- if (input instanceof Message) {
- System.out.println("Hello");
- context.sendAfter(Duration.ofMinutes(1),
context.self(), new DelayedMessage());
- }
-
- if (input instanceof DelayedMessage) {
- System.out.println("Welcome to the future!");
- }
- }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes
+// more than 30 minutes without being called
+ValueSpec
+ .named("seen")
+ .thatExpiresAfterCall(Duration.ofDays(1))
+ .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+ .named("seen")
+ .thatExpireAfterWrite(Duration.ofDays(1))
+ .withIntType();
```
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs
to take care that communication delay with the external system does not
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional
context about the caller.
+## Sending Delayed Messages
-When the future completes, either successfully or exceptionally, the caller
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering
from failure.
-### Success
+This example sends a response back to the calling function after a 30 minute
delay.
-The asynchronous operation has succeeded, and the produced result can be
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
-The asynchronous operation has failed, and the cause can be obtained via
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
-### Unknown
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
-The stateful function was restarted, possibly on a different machine, before
the ``CompletableFuture`` was completed, therefore it is unknown what is the
status of the asynchronous operation.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/delayed");
-```java
-package org.apache.flink.statefun.docs.async;
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!context.caller().isPresent()) {
+ LOG.debug("Message has no known caller meaning it was sent
directly from an ingress");
+ return;
+ }
-import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.sdk.AsyncOperationResult;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-@SuppressWarnings("unchecked")
-public class EnrichmentFunction implements StatefulFunction {
-
- private final QueryService client;
-
- public EnrichmentFunction(QueryService client) {
- this.client = client;
- }
-
- @Override
- public void invoke(Context context, Object input) {
- if (input instanceof User) {
- onUser(context, (User) input);
- } else if (input instanceof AsyncOperationResult) {
- onAsyncResult((AsyncOperationResult) input);
- }
- }
-
- private void onUser(Context context, User user) {
- CompletableFuture<UserEnrichment> future =
client.getDataAsync(user.getUserId());
- context.registerAsyncOperation(user, future);
- }
-
- private void onAsyncResult(AsyncOperationResult<User, UserEnrichment>
result) {
- if (result.successful()) {
- User metadata = result.metadata();
- UserEnrichment value = result.value();
- System.out.println(
- String.format("Successfully completed future:
%s %s", metadata, value));
- } else if (result.failure()) {
- System.out.println(
- String.format("Something has gone terribly
wrong %s", result.throwable()));
- } else {
- System.out.println("Not sure what happened, maybe
retry");
- }
- }
+ var caller = context.caller().get();
+ context.sendAfter(Duration.ofMinutes(30), MessageBuilder
+ .forAddress(caller)
+ .withValue("Hello from the future!"));
+ }
}
```
-## Persistence
-
-Stateful Functions treats state as a first class citizen and so all stateful
functions can easily define state that is automatically made fault tolerant by
the runtime.
-All stateful functions may contain state by merely defining one or more
persisted fields.
+## Egress
-The simplest way to get started is with a ``PersistedValue``, which is defined
by its name and the class of the type that it stores.
-The data is always scoped to a specific function type and identifier.
-Below is a stateful function that greets users based on the number of times
they have been seen.
-
-{{< hint info >}}
-All **PersistedValue**, **PersistedTable**, and **PersistedAppendingBuffer**
fields must be marked with a **@Persisted** annotation or they will not be made
fault tolerant by the runtime.
-{{< /hint >}}
+Functions can message other stateful functions and egresses, exit points for
sending messages to the outside world.
+As with other messages, egress messages are always well-typed.
+Additionally, they contain metadata pertinent to the specific egress type.
+{{< tabs "egress" >}}
+{{< tab "Apache Kafka" >}}
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class FnUserGreeter implements StatefulFunction {
-
- public static FunctionType TYPE = new FunctionType("example",
"greeter");
-
- @Persisted
- private final PersistedValue<Integer> count =
PersistedValue.of("count", Integer.class);
-
- public void invoke(Context context, Object input) {
- String userId = context.self().id();
- int seen = count.getOrDefault(0);
-
- switch (seen) {
- case 0:
- System.out.println(String.format("Hello %s!",
userId));
- break;
- case 1:
- System.out.println("Hello Again!");
- break;
- case 2:
- System.out.println("Third time is the charm
:)");
- break;
- default:
- System.out.println(String.format("Hello for the
%d-th time", seen + 1));
- }
-
- count.set(seen + 1);
- }
-}
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
-``PersistedValue`` comes with the right primitive methods to build powerful
stateful applications.
-Calling ``PersistedValue#get`` will return the current value of an object
stored in state, or ``null`` if nothing is set.
-Conversely, ``PersistedValue#set`` will update the value in state and
``PersistedValue#clear`` will delete the value from state.
+public class GreeterFn implements StatefulFunction {
-### Collection Types
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-Along with ``PersistedValue``, the Java SDK supports two persisted collection
types.
-``PersistedTable`` is a collection of keys and values, and
``PersistedAppendingBuffer`` is an append-only buffer.
+ static final TypeName KAFKA_EGRESS =
TypeName.forNameFromString("com.example/greets");
-These types are functionally equivalent to ``PersistedValue<Map>`` and
``PersistedValue<Collection>`` respectively but may provide better performance
in some situations.
+ static final ValueSpec<Integer> SEEN =
ValueSpec.named("seen").withIntType();
-```java
-@Persisted
-PersistedTable<String, Integer> table = PersistedTable.of("my-table",
String.class, Integer.class);
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!message.is(User.TYPE)) {
+ throw new IllegalStateException("Unknown type");
+ }
-@Persisted
-PersistedAppendingBuffer<Integer> buffer =
PersistedAppendingBuffer.of("my-buffer", Integer.class);
-```
+ User user = message.as(User.TYPE);
+ String name = user.getName();
-### Dynamic State Registration
+ var storage = context.storage();
+ var seen = storage.get(SEEN).orElse(0);
+ storage.set(SEEN, seen + 1);
-Using the above state types, a function's persisted state must be defined
eagerly. You cannot use those state types to
-register a new persisted state during invocations (i.e., in the ``invoke``
method) or after the function instance is created.
+ context.send(
+ KafkaEgressMessage.forEgress(KAFKA_EGRESS)
+ .withTopic("greetings")
+ .withUtf8Key(name)
+ .withUtf8Value("Hello " + name + " for the " + seen + "th
time!")
+ .build());
-If dynamic state registration is required, it can be achieved using a
``PersistedStateRegistry``:
-
-```java
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class MyFunction implements StatefulFunction {
-
- @Persisted
- private final PersistedStateRegistry registry = new
PersistedStateRegistry();
-
- private PersistedValue<Integer> value;
-
- public void invoke(Context context, Object input) {
- if (value == null) {
- value = PersistedValue.of("my-value", Integer.class);
- registry.registerValue(value);
- }
- int count = value.getOrDefault(0);
- // ...
- }
+ return context.done();
+ }
}
```
-
-Note how the ``PersistedValue`` field doesn't need to be annotated with the
``@Persisted`` annotations, and is initially
-empty. The state object is dynamically created during invocation and
registered with the ``PersistedStateRegistry`` so
-that the system picks it up to be managed for fault-tolerance.
-
-### State Expiration
-
-Persisted states may be configured to expire and be deleted after a specified
duration.
-This is supported by all types of state:
-
+{{< /tab >}}
+{{< tab "Amazon Kinesis" >}}
```java
-@Persisted
-PersistedValue<Integer> value = PersistedValue.of(
- "my-value",
- Integer.class,
- Expiration.expireAfterWriting(Duration.ofHours(1)));
-
-@Persisted
-PersistedTable<String, Integer> table = PersistedTable.of(
- "my-table",
- String.class,
- Integer.class,
- Expiration.expireAfterWriting(Duration.ofMinutes(5)));
-
-@Persisted
-PersistedAppendingBuffer<Integer> buffer = PersistedAppendingBuffer.of(
- "my-buffer",
- Integer.class,
- Expiration.expireAfterWriting(Duration.ofSeconds(30)));
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KinesisEgressMessage;
-There are two expiration modes supported:
+public class GreeterFn implements StatefulFunction {
-```java
-Expiration.expireAfterWriting(...)
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-Expiration.expireAfterReadingOrWriting(...)
-```
+ static final TypeName KINESIS_EGRESS =
TypeName.forNameFromString("com.example/greets");
Review comment:
forNameFromString => typeNameFromString
##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
Stateful functions are the building blocks of applications; they are atomic
units of isolation, distribution, and persistence.
As objects, they encapsulate the state of a single entity (e.g., a specific
user, device, or session) and encode its behavior.
Stateful functions can interact with each other, and external systems, through
message passing.
-The Java SDK is supported as an [embedded module]({{< ref
"docs/sdk/overview#embedded-module" >}}).
To get started, add the Java SDK as a dependency to your application.
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
## Defining A Stateful Function
-A stateful function is any class that implements the ``StatefulFunction``
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction`
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
- @Override
- public void invoke(Context context, Object input) {
- System.out.println("Hello " + input.toString());
- }
-}
-```
+public class GreeterFn implements StatefulFunction {
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so
one function can potentially process multiple types of messages.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-The ``Context`` provides metadata about the current message and function, and
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+ static final TypeName INBOX =
TypeName.forNameFromString("com.example.fns/inbox");
-### Stateful Match Function
+ static final ValueSpec<Integer> SEEN =
ValueSpec.named("seen").withIntType();
-Stateful functions provide a powerful abstraction for working with events and
state, allowing developers to build components that can react to any kind of
message.
-Commonly, functions only need to handle a known set of message types, and the
``StatefulMatchFunction`` interface provides an opinionated solution to that
problem.
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!message.is(User.TYPE)) {
+ throw new IllegalStateException("Unknown type");
+ }
-#### Simple Match Function
+ User user = message.as(User.TYPE);
+ String name = user.getName();
-Stateful match functions are an opinionated variant of stateful functions for
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the
first time an instance is loaded.
+ var storage = context.storage();
+ var seen = storage.get(SEEN).orElse(0);
+ storage.set(SEEN, seen + 1);
-```java
-package org.apache.flink.statefun.docs.match;
+ context.send(
+ MessageBuilder.forAddress(INBOX, name)
+ .withValue("Hello " + name + " for the " + seen + "th time!")
+ .build());
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+ return context.done();
+ }
+}
+```
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be
[registered](#exposing-functions) under the logical type name
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current
address and stores an integer.
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee);
- }
+Every time a message is sent a greeter, it first validates the message
containing a `User` and extracts its name. Both messages and state are strongly
typed - either one of the default [built-in types]({{< ref
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from
the state store and updated
+and the greeting is sent to the users' inbox (another function type).
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
+## Types
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-}
-```
+Stateful Functions strongly types ll messages and state values.
+Because they run in a distributed manner and state values are persisted to
stable storage, Stateful Functions aims to provide efficient and easy to user
serializers.
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more
complex data structures.
-Similar to the first example, match functions are partial by default and will
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as
a catch-all for unmatched input, think of it as a default clause in a Java
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``,
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical
Java class, it can be passed to functions written in other langauge SDKs.
```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee)
- .otherwise(this::catchAll);
- }
-
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
-
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
-
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-
- private void catchAll(Context context, Object message) {
- System.out.println("Hello unexpected message");
- }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
-#### Action Resolution Order
+public class User {
-Match functions will always match actions from most to least specific using
the following resolution rules.
+ private static final ObjectMapper mapper = new ObjectMapper();
-First, find an action that matches the type and predicate. If two predicates
will return true for a particular input, the one registered in the binder first
wins.
-Next, search for an action that matches the type but does not have an
associated predicate.
-Finally, if a catch-all exists, it will be executed or an
``IllegalStateException`` will be thrown.
+ public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameFromString("com.example/User"),
+ mapper:writeValueAsBytes,
+ bytes -> mapper.readValue(byes, User.class));
-## Function Types and Messaging
+ private final String name;
-In Java, function types are defined as logical pointers composed of a
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+ private final String favoriteColor;
-```java
-package org.apache.flink.statefun.docs;
+ @JsonCreator
+ public User(
+ @JsonProperty("name") String name,
+ @JsonProperty("favorite_color" String favoriteColor)) {
-import org.apache.flink.statefun.sdk.FunctionType;
+ this.name = Objects.requireNonNull(name);
+ this.favoriteColor = Objects.requireNonNull(favoriteColor);
+ }
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+ public String getName() {
+ return name;
+ }
- public static final FunctionType HELLO_TYPE = new
FunctionType("apache/flink", "hello");
-}
+ public String getFavoriteColor() {
+ return favoriteColor;
+ }
+
+ @Override
+ public String toString() {
+ return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+ }
```
-This type can then be referenced from other functions to create an address and
message a particular instance.
+## State
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions
can easily define state that is automatically made fault tolerant by the
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s
describing your state values.
+Value specifications are defined with a unique (to the function) name and
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to
the address of the current message.
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
- @Override
- public void invoke(Context context, Object input) {
- context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
- }
-}
+```java
+// Value specification for a state named `seen`
+// with the primitive integer type
+ValueSpec
+ .named("seen")
+ .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+ .name("user")
+ .withCustomType(User.TYPE);
```
-## Sending Delayed Messages
+### State Expiration
-Functions are able to send messages on a delay so that they will arrive after
some duration.
-Functions may even send themselves delayed messages that can serve as a
callback.
-The delayed message is non-blocking so functions will continue to process
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted
after a specified duration.
```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
- @Override
- public void invoke(Context context, Object input) {
- if (input instanceof Message) {
- System.out.println("Hello");
- context.sendAfter(Duration.ofMinutes(1),
context.self(), new DelayedMessage());
- }
-
- if (input instanceof DelayedMessage) {
- System.out.println("Welcome to the future!");
- }
- }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes
+// more than 30 minutes without being called
+ValueSpec
+ .named("seen")
+ .thatExpiresAfterCall(Duration.ofDays(1))
+ .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+ .named("seen")
+ .thatExpireAfterWrite(Duration.ofDays(1))
+ .withIntType();
```
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs
to take care that communication delay with the external system does not
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional
context about the caller.
+## Sending Delayed Messages
-When the future completes, either successfully or exceptionally, the caller
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering
from failure.
-### Success
+This example sends a response back to the calling function after a 30 minute
delay.
-The asynchronous operation has succeeded, and the produced result can be
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
-The asynchronous operation has failed, and the cause can be obtained via
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
-### Unknown
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
-The stateful function was restarted, possibly on a different machine, before
the ``CompletableFuture`` was completed, therefore it is unknown what is the
status of the asynchronous operation.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/delayed");
-```java
-package org.apache.flink.statefun.docs.async;
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!context.caller().isPresent()) {
+ LOG.debug("Message has no known caller meaning it was sent
directly from an ingress");
+ return;
+ }
-import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.sdk.AsyncOperationResult;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-@SuppressWarnings("unchecked")
-public class EnrichmentFunction implements StatefulFunction {
-
- private final QueryService client;
-
- public EnrichmentFunction(QueryService client) {
- this.client = client;
- }
-
- @Override
- public void invoke(Context context, Object input) {
- if (input instanceof User) {
- onUser(context, (User) input);
- } else if (input instanceof AsyncOperationResult) {
- onAsyncResult((AsyncOperationResult) input);
- }
- }
-
- private void onUser(Context context, User user) {
- CompletableFuture<UserEnrichment> future =
client.getDataAsync(user.getUserId());
- context.registerAsyncOperation(user, future);
- }
-
- private void onAsyncResult(AsyncOperationResult<User, UserEnrichment>
result) {
- if (result.successful()) {
- User metadata = result.metadata();
- UserEnrichment value = result.value();
- System.out.println(
- String.format("Successfully completed future:
%s %s", metadata, value));
- } else if (result.failure()) {
- System.out.println(
- String.format("Something has gone terribly
wrong %s", result.throwable()));
- } else {
- System.out.println("Not sure what happened, maybe
retry");
- }
- }
+ var caller = context.caller().get();
+ context.sendAfter(Duration.ofMinutes(30), MessageBuilder
+ .forAddress(caller)
+ .withValue("Hello from the future!"));
+ }
}
```
-## Persistence
-
-Stateful Functions treats state as a first class citizen and so all stateful
functions can easily define state that is automatically made fault tolerant by
the runtime.
-All stateful functions may contain state by merely defining one or more
persisted fields.
+## Egress
-The simplest way to get started is with a ``PersistedValue``, which is defined
by its name and the class of the type that it stores.
-The data is always scoped to a specific function type and identifier.
-Below is a stateful function that greets users based on the number of times
they have been seen.
-
-{{< hint info >}}
-All **PersistedValue**, **PersistedTable**, and **PersistedAppendingBuffer**
fields must be marked with a **@Persisted** annotation or they will not be made
fault tolerant by the runtime.
-{{< /hint >}}
+Functions can message other stateful functions and egresses, exit points for
sending messages to the outside world.
+As with other messages, egress messages are always well-typed.
+Additionally, they contain metadata pertinent to the specific egress type.
+{{< tabs "egress" >}}
+{{< tab "Apache Kafka" >}}
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class FnUserGreeter implements StatefulFunction {
-
- public static FunctionType TYPE = new FunctionType("example",
"greeter");
-
- @Persisted
- private final PersistedValue<Integer> count =
PersistedValue.of("count", Integer.class);
-
- public void invoke(Context context, Object input) {
- String userId = context.self().id();
- int seen = count.getOrDefault(0);
-
- switch (seen) {
- case 0:
- System.out.println(String.format("Hello %s!",
userId));
- break;
- case 1:
- System.out.println("Hello Again!");
- break;
- case 2:
- System.out.println("Third time is the charm
:)");
- break;
- default:
- System.out.println(String.format("Hello for the
%d-th time", seen + 1));
- }
-
- count.set(seen + 1);
- }
-}
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
-``PersistedValue`` comes with the right primitive methods to build powerful
stateful applications.
-Calling ``PersistedValue#get`` will return the current value of an object
stored in state, or ``null`` if nothing is set.
-Conversely, ``PersistedValue#set`` will update the value in state and
``PersistedValue#clear`` will delete the value from state.
+public class GreeterFn implements StatefulFunction {
-### Collection Types
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-Along with ``PersistedValue``, the Java SDK supports two persisted collection
types.
-``PersistedTable`` is a collection of keys and values, and
``PersistedAppendingBuffer`` is an append-only buffer.
+ static final TypeName KAFKA_EGRESS =
TypeName.forNameFromString("com.example/greets");
Review comment:
forNameFromString => typeNameFromString
##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
Stateful functions are the building blocks of applications; they are atomic
units of isolation, distribution, and persistence.
As objects, they encapsulate the state of a single entity (e.g., a specific
user, device, or session) and encode its behavior.
Stateful functions can interact with each other, and external systems, through
message passing.
-The Java SDK is supported as an [embedded module]({{< ref
"docs/sdk/overview#embedded-module" >}}).
To get started, add the Java SDK as a dependency to your application.
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
## Defining A Stateful Function
-A stateful function is any class that implements the ``StatefulFunction``
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction`
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
- @Override
- public void invoke(Context context, Object input) {
- System.out.println("Hello " + input.toString());
- }
-}
-```
+public class GreeterFn implements StatefulFunction {
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so
one function can potentially process multiple types of messages.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-The ``Context`` provides metadata about the current message and function, and
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+ static final TypeName INBOX =
TypeName.forNameFromString("com.example.fns/inbox");
-### Stateful Match Function
+ static final ValueSpec<Integer> SEEN =
ValueSpec.named("seen").withIntType();
-Stateful functions provide a powerful abstraction for working with events and
state, allowing developers to build components that can react to any kind of
message.
-Commonly, functions only need to handle a known set of message types, and the
``StatefulMatchFunction`` interface provides an opinionated solution to that
problem.
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!message.is(User.TYPE)) {
+ throw new IllegalStateException("Unknown type");
+ }
-#### Simple Match Function
+ User user = message.as(User.TYPE);
+ String name = user.getName();
-Stateful match functions are an opinionated variant of stateful functions for
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the
first time an instance is loaded.
+ var storage = context.storage();
+ var seen = storage.get(SEEN).orElse(0);
+ storage.set(SEEN, seen + 1);
-```java
-package org.apache.flink.statefun.docs.match;
+ context.send(
+ MessageBuilder.forAddress(INBOX, name)
+ .withValue("Hello " + name + " for the " + seen + "th time!")
+ .build());
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+ return context.done();
+ }
+}
+```
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be
[registered](#exposing-functions) under the logical type name
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current
address and stores an integer.
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee);
- }
+Every time a message is sent a greeter, it first validates the message
containing a `User` and extracts its name. Both messages and state are strongly
typed - either one of the default [built-in types]({{< ref
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from
the state store and updated
+and the greeting is sent to the users' inbox (another function type).
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
+## Types
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-}
-```
+Stateful Functions strongly types ll messages and state values.
+Because they run in a distributed manner and state values are persisted to
stable storage, Stateful Functions aims to provide efficient and easy to user
serializers.
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more
complex data structures.
-Similar to the first example, match functions are partial by default and will
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as
a catch-all for unmatched input, think of it as a default clause in a Java
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``,
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical
Java class, it can be passed to functions written in other langauge SDKs.
```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(Customer.class, this::greetCustomer)
- .predicate(Employee.class, Employee::isManager,
this::greetManager)
- .predicate(Employee.class, this::greetEmployee)
- .otherwise(this::catchAll);
- }
-
- private void greetCustomer(Context context, Customer message) {
- System.out.println("Hello customer " + message.getName());
- }
-
- private void greetEmployee(Context context, Employee message) {
- System.out.println("Hello employee " + message.getEmployeeId());
- }
-
- private void greetManager(Context context, Employee message) {
- System.out.println("Hello manager " + message.getEmployeeId());
- }
-
- private void catchAll(Context context, Object message) {
- System.out.println("Hello unexpected message");
- }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
-#### Action Resolution Order
+public class User {
-Match functions will always match actions from most to least specific using
the following resolution rules.
+ private static final ObjectMapper mapper = new ObjectMapper();
-First, find an action that matches the type and predicate. If two predicates
will return true for a particular input, the one registered in the binder first
wins.
-Next, search for an action that matches the type but does not have an
associated predicate.
-Finally, if a catch-all exists, it will be executed or an
``IllegalStateException`` will be thrown.
+ public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+ TypeName.typeNameFromString("com.example/User"),
+ mapper:writeValueAsBytes,
+ bytes -> mapper.readValue(byes, User.class));
-## Function Types and Messaging
+ private final String name;
-In Java, function types are defined as logical pointers composed of a
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+ private final String favoriteColor;
-```java
-package org.apache.flink.statefun.docs;
+ @JsonCreator
+ public User(
+ @JsonProperty("name") String name,
+ @JsonProperty("favorite_color" String favoriteColor)) {
-import org.apache.flink.statefun.sdk.FunctionType;
+ this.name = Objects.requireNonNull(name);
+ this.favoriteColor = Objects.requireNonNull(favoriteColor);
+ }
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+ public String getName() {
+ return name;
+ }
- public static final FunctionType HELLO_TYPE = new
FunctionType("apache/flink", "hello");
-}
+ public String getFavoriteColor() {
+ return favoriteColor;
+ }
+
+ @Override
+ public String toString() {
+ return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+ }
```
-This type can then be referenced from other functions to create an address and
message a particular instance.
+## State
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions
can easily define state that is automatically made fault tolerant by the
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s
describing your state values.
+Value specifications are defined with a unique (to the function) name and
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to
the address of the current message.
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
- @Override
- public void invoke(Context context, Object input) {
- context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
- }
-}
+```java
+// Value specification for a state named `seen`
+// with the primitive integer type
+ValueSpec
+ .named("seen")
+ .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+ .name("user")
+ .withCustomType(User.TYPE);
```
-## Sending Delayed Messages
+### State Expiration
-Functions are able to send messages on a delay so that they will arrive after
some duration.
-Functions may even send themselves delayed messages that can serve as a
callback.
-The delayed message is non-blocking so functions will continue to process
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted
after a specified duration.
```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
- @Override
- public void invoke(Context context, Object input) {
- if (input instanceof Message) {
- System.out.println("Hello");
- context.sendAfter(Duration.ofMinutes(1),
context.self(), new DelayedMessage());
- }
-
- if (input instanceof DelayedMessage) {
- System.out.println("Welcome to the future!");
- }
- }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes
+// more than 30 minutes without being called
+ValueSpec
+ .named("seen")
+ .thatExpiresAfterCall(Duration.ofDays(1))
+ .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+ .named("seen")
+ .thatExpireAfterWrite(Duration.ofDays(1))
+ .withIntType();
```
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs
to take care that communication delay with the external system does not
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional
context about the caller.
+## Sending Delayed Messages
-When the future completes, either successfully or exceptionally, the caller
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering
from failure.
-### Success
+This example sends a response back to the calling function after a 30 minute
delay.
-The asynchronous operation has succeeded, and the produced result can be
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
-The asynchronous operation has failed, and the cause can be obtained via
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
-### Unknown
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
-The stateful function was restarted, possibly on a different machine, before
the ``CompletableFuture`` was completed, therefore it is unknown what is the
status of the asynchronous operation.
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/delayed");
-```java
-package org.apache.flink.statefun.docs.async;
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!context.caller().isPresent()) {
+ LOG.debug("Message has no known caller meaning it was sent
directly from an ingress");
+ return;
+ }
-import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.sdk.AsyncOperationResult;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-@SuppressWarnings("unchecked")
-public class EnrichmentFunction implements StatefulFunction {
-
- private final QueryService client;
-
- public EnrichmentFunction(QueryService client) {
- this.client = client;
- }
-
- @Override
- public void invoke(Context context, Object input) {
- if (input instanceof User) {
- onUser(context, (User) input);
- } else if (input instanceof AsyncOperationResult) {
- onAsyncResult((AsyncOperationResult) input);
- }
- }
-
- private void onUser(Context context, User user) {
- CompletableFuture<UserEnrichment> future =
client.getDataAsync(user.getUserId());
- context.registerAsyncOperation(user, future);
- }
-
- private void onAsyncResult(AsyncOperationResult<User, UserEnrichment>
result) {
- if (result.successful()) {
- User metadata = result.metadata();
- UserEnrichment value = result.value();
- System.out.println(
- String.format("Successfully completed future:
%s %s", metadata, value));
- } else if (result.failure()) {
- System.out.println(
- String.format("Something has gone terribly
wrong %s", result.throwable()));
- } else {
- System.out.println("Not sure what happened, maybe
retry");
- }
- }
+ var caller = context.caller().get();
+ context.sendAfter(Duration.ofMinutes(30), MessageBuilder
+ .forAddress(caller)
+ .withValue("Hello from the future!"));
+ }
}
```
-## Persistence
-
-Stateful Functions treats state as a first class citizen and so all stateful
functions can easily define state that is automatically made fault tolerant by
the runtime.
-All stateful functions may contain state by merely defining one or more
persisted fields.
+## Egress
-The simplest way to get started is with a ``PersistedValue``, which is defined
by its name and the class of the type that it stores.
-The data is always scoped to a specific function type and identifier.
-Below is a stateful function that greets users based on the number of times
they have been seen.
-
-{{< hint info >}}
-All **PersistedValue**, **PersistedTable**, and **PersistedAppendingBuffer**
fields must be marked with a **@Persisted** annotation or they will not be made
fault tolerant by the runtime.
-{{< /hint >}}
+Functions can message other stateful functions and egresses, exit points for
sending messages to the outside world.
+As with other messages, egress messages are always well-typed.
+Additionally, they contain metadata pertinent to the specific egress type.
+{{< tabs "egress" >}}
+{{< tab "Apache Kafka" >}}
```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class FnUserGreeter implements StatefulFunction {
-
- public static FunctionType TYPE = new FunctionType("example",
"greeter");
-
- @Persisted
- private final PersistedValue<Integer> count =
PersistedValue.of("count", Integer.class);
-
- public void invoke(Context context, Object input) {
- String userId = context.self().id();
- int seen = count.getOrDefault(0);
-
- switch (seen) {
- case 0:
- System.out.println(String.format("Hello %s!",
userId));
- break;
- case 1:
- System.out.println("Hello Again!");
- break;
- case 2:
- System.out.println("Third time is the charm
:)");
- break;
- default:
- System.out.println(String.format("Hello for the
%d-th time", seen + 1));
- }
-
- count.set(seen + 1);
- }
-}
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
-``PersistedValue`` comes with the right primitive methods to build powerful
stateful applications.
-Calling ``PersistedValue#get`` will return the current value of an object
stored in state, or ``null`` if nothing is set.
-Conversely, ``PersistedValue#set`` will update the value in state and
``PersistedValue#clear`` will delete the value from state.
+public class GreeterFn implements StatefulFunction {
-### Collection Types
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
-Along with ``PersistedValue``, the Java SDK supports two persisted collection
types.
-``PersistedTable`` is a collection of keys and values, and
``PersistedAppendingBuffer`` is an append-only buffer.
+ static final TypeName KAFKA_EGRESS =
TypeName.forNameFromString("com.example/greets");
-These types are functionally equivalent to ``PersistedValue<Map>`` and
``PersistedValue<Collection>`` respectively but may provide better performance
in some situations.
+ static final ValueSpec<Integer> SEEN =
ValueSpec.named("seen").withIntType();
-```java
-@Persisted
-PersistedTable<String, Integer> table = PersistedTable.of("my-table",
String.class, Integer.class);
+ @Override
+ CompletableFuture<Void> apply(Context context, Message message) {
+ if (!message.is(User.TYPE)) {
+ throw new IllegalStateException("Unknown type");
+ }
-@Persisted
-PersistedAppendingBuffer<Integer> buffer =
PersistedAppendingBuffer.of("my-buffer", Integer.class);
-```
+ User user = message.as(User.TYPE);
+ String name = user.getName();
-### Dynamic State Registration
+ var storage = context.storage();
+ var seen = storage.get(SEEN).orElse(0);
+ storage.set(SEEN, seen + 1);
-Using the above state types, a function's persisted state must be defined
eagerly. You cannot use those state types to
-register a new persisted state during invocations (i.e., in the ``invoke``
method) or after the function instance is created.
+ context.send(
+ KafkaEgressMessage.forEgress(KAFKA_EGRESS)
+ .withTopic("greetings")
+ .withUtf8Key(name)
+ .withUtf8Value("Hello " + name + " for the " + seen + "th
time!")
+ .build());
-If dynamic state registration is required, it can be achieved using a
``PersistedStateRegistry``:
-
-```java
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class MyFunction implements StatefulFunction {
-
- @Persisted
- private final PersistedStateRegistry registry = new
PersistedStateRegistry();
-
- private PersistedValue<Integer> value;
-
- public void invoke(Context context, Object input) {
- if (value == null) {
- value = PersistedValue.of("my-value", Integer.class);
- registry.registerValue(value);
- }
- int count = value.getOrDefault(0);
- // ...
- }
+ return context.done();
+ }
}
```
-
-Note how the ``PersistedValue`` field doesn't need to be annotated with the
``@Persisted`` annotations, and is initially
-empty. The state object is dynamically created during invocation and
registered with the ``PersistedStateRegistry`` so
-that the system picks it up to be managed for fault-tolerance.
-
-### State Expiration
-
-Persisted states may be configured to expire and be deleted after a specified
duration.
-This is supported by all types of state:
-
+{{< /tab >}}
+{{< tab "Amazon Kinesis" >}}
```java
-@Persisted
-PersistedValue<Integer> value = PersistedValue.of(
- "my-value",
- Integer.class,
- Expiration.expireAfterWriting(Duration.ofHours(1)));
-
-@Persisted
-PersistedTable<String, Integer> table = PersistedTable.of(
- "my-table",
- String.class,
- Integer.class,
- Expiration.expireAfterWriting(Duration.ofMinutes(5)));
-
-@Persisted
-PersistedAppendingBuffer<Integer> buffer = PersistedAppendingBuffer.of(
- "my-buffer",
- Integer.class,
- Expiration.expireAfterWriting(Duration.ofSeconds(30)));
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KinesisEgressMessage;
-There are two expiration modes supported:
+public class GreeterFn implements StatefulFunction {
-```java
-Expiration.expireAfterWriting(...)
+ static final TypeName TYPE =
TypeName.forNameFromString("com.example.fns/greeter");
Review comment:
forNameFromString => typeNameFromString
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]