afedulov commented on a change in pull request #211:
URL: https://github.com/apache/flink-statefun/pull/211#discussion_r595074016



##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
 Stateful functions are the building blocks of applications; they are atomic 
units of isolation, distribution, and persistence.
 As objects, they encapsulate the state of a single entity (e.g., a specific 
user, device, or session) and encode its behavior.
 Stateful functions can interact with each other, and external systems, through 
message passing.
-The Java SDK is supported as an [embedded module]({{< ref 
"docs/sdk/overview#embedded-module" >}}).
 
 To get started, add the Java SDK as a dependency to your application.
 
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
 
 ## Defining A Stateful Function
 
-A stateful function is any class that implements the ``StatefulFunction`` 
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction` 
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
 
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-       @Override
-       public void invoke(Context context, Object input) {
-               System.out.println("Hello " + input.toString());
-       }
-}
-```
+public class GreeterFn implements StatefulFunction {
 
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so 
one function can potentially process multiple types of messages.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-The ``Context`` provides metadata about the current message and function, and 
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+    static final TypeName INBOX = 
TypeName.forNameFromString("com.example.fns/inbox");

Review comment:
       forNameFromString  => typeNameFromString

##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
 Stateful functions are the building blocks of applications; they are atomic 
units of isolation, distribution, and persistence.
 As objects, they encapsulate the state of a single entity (e.g., a specific 
user, device, or session) and encode its behavior.
 Stateful functions can interact with each other, and external systems, through 
message passing.
-The Java SDK is supported as an [embedded module]({{< ref 
"docs/sdk/overview#embedded-module" >}}).
 
 To get started, add the Java SDK as a dependency to your application.
 
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
 
 ## Defining A Stateful Function
 
-A stateful function is any class that implements the ``StatefulFunction`` 
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction` 
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
 
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-       @Override
-       public void invoke(Context context, Object input) {
-               System.out.println("Hello " + input.toString());
-       }
-}
-```
+public class GreeterFn implements StatefulFunction {
 
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so 
one function can potentially process multiple types of messages.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-The ``Context`` provides metadata about the current message and function, and 
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+    static final TypeName INBOX = 
TypeName.forNameFromString("com.example.fns/inbox");
 
-### Stateful Match Function 
+    static final ValueSpec<Integer> SEEN = 
ValueSpec.named("seen").withIntType();
 
-Stateful functions provide a powerful abstraction for working with events and 
state, allowing developers to build components that can react to any kind of 
message.
-Commonly, functions only need to handle a known set of message types, and the 
``StatefulMatchFunction`` interface provides an opinionated solution to that 
problem.
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!message.is(User.TYPE)) {
+            throw new IllegalStateException("Unknown type");
+        }
 
-#### Simple Match Function
+        User user = message.as(User.TYPE);
+        String name = user.getName();
 
-Stateful match functions are an opinionated variant of stateful functions for 
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed 
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the 
first time an instance is loaded.
+        var storage = context.storage();
+        var seen = storage.get(SEEN).orElse(0);
+        storage.set(SEEN, seen + 1);
 
-```java
-package org.apache.flink.statefun.docs.match;
+        context.send(
+            MessageBuilder.forAddress(INBOX, name)
+                .withValue("Hello " + name + " for the " + seen + "th time!")
+                .build());
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+        return context.done();
+    }
+}
+```
 
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be 
[registered](#exposing-functions) under the logical type name 
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current 
address and stores an integer.
 
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee);
-       }
+Every time a message is sent a greeter, it first validates the message 
containing a `User` and extracts its name. Both messages and state are strongly 
typed - either one of the default [built-in types]({{< ref 
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
 
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from 
the state store and updated
+and the greeting is sent to the users' inbox (another function type). 
 
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
+## Types
 
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-}
-```
+Stateful Functions strongly types ll messages and state values. 
+Because they run in a distributed manner and state values are persisted to 
stable storage, Stateful Functions aims to provide efficient and easy to user 
serializers. 
 
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for 
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more 
complex data structures. 
 
-Similar to the first example, match functions are partial by default and will 
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as 
a catch-all for unmatched input, think of it as a default clause in a Java 
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``, 
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function 
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between 
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical 
Java class, it can be passed to functions written in other langauge SDKs. 
 
 ```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee)
-                       .otherwise(this::catchAll);
-       }
-
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
-
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
-
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-
-       private void catchAll(Context context, Object message) {
-               System.out.println("Hello unexpected message");
-       }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
 
-#### Action Resolution Order
+public class User {
 
-Match functions will always match actions from most to least specific using 
the following resolution rules.
+    private static final ObjectMapper mapper = new ObjectMapper();
 
-First, find an action that matches the type and predicate. If two predicates 
will return true for a particular input, the one registered in the binder first 
wins.
-Next, search for an action that matches the type but does not have an 
associated predicate.
-Finally, if a catch-all exists, it will be executed or an 
``IllegalStateException`` will be thrown.
+    public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+        TypeName.typeNameFromString("com.example/User"),
+        mapper:writeValueAsBytes,
+        bytes -> mapper.readValue(byes, User.class));
 
-## Function Types and Messaging
+    private final String name;
 
-In Java, function types are defined as logical pointers composed of a 
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref 
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+    private final String favoriteColor;
 
-```java
-package org.apache.flink.statefun.docs;
+    @JsonCreator
+    public User(
+        @JsonProperty("name") String name,
+        @JsonProperty("favorite_color" String favoriteColor)) {
 
-import org.apache.flink.statefun.sdk.FunctionType;
+        this.name = Objects.requireNonNull(name);
+        this.favoriteColor = Objects.requireNonNull(favoriteColor);
+    }
 
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+    public String getName() {
+        return name;
+    }
 
-  public static final FunctionType HELLO_TYPE = new 
FunctionType("apache/flink", "hello");
-}
+    public String getFavoriteColor() {
+        return favoriteColor;
+    }
+
+    @Override
+    public String toString() {
+        return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+    }
 ```
 
-This type can then be referenced from other functions to create an address and 
message a particular instance.
+## State
 
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions 
can easily define state that is automatically made fault tolerant by the 
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s 
describing your state values.
+Value specifications are defined with a unique (to the function) name and 
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to 
the address of the current message.
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id 
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the 
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
 
-  @Override
-  public void invoke(Context context, Object input) {
-    context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
-  }
-}
+```java
+// Value specification for a state named `seen` 
+// with the primitive integer type
+ValueSpec
+    .named("seen")
+    .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+    .name("user")
+    .withCustomType(User.TYPE);
 ```
 
-## Sending Delayed Messages
+### State Expiration
 
-Functions are able to send messages on a delay so that they will arrive after 
some duration.
-Functions may even send themselves delayed messages that can serve as a 
callback.
-The delayed message is non-blocking so functions will continue to process 
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted 
after a specified duration.
 
 ```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
-       @Override
-       public void invoke(Context context, Object input) {
-               if (input instanceof Message) {
-                       System.out.println("Hello");
-                       context.sendAfter(Duration.ofMinutes(1), 
context.self(), new DelayedMessage());
-               }
-
-               if (input instanceof DelayedMessage) {
-                       System.out.println("Welcome to the future!");
-               }
-       }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes 
+// more than 30 minutes without being called
+ValueSpec
+    .named("seen")
+    .thatExpiresAfterCall(Duration.ofDays(1))
+    .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+    .named("seen")
+    .thatExpireAfterWrite(Duration.ofDays(1))
+    .withIntType();
 ```
 
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs 
to take care that communication delay with the external system does not 
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will 
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional 
context about the caller.
+## Sending Delayed Messages
 
-When the future completes, either successfully or exceptionally, the caller 
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some 
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process 
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering 
from failure. 
 
-### Success
+This example sends a response back to the calling function after a 30 minute 
delay.
 
-The asynchronous operation has succeeded, and the produced result can be 
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
 
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-The asynchronous operation has failed, and the cause can be obtained via 
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
 
-### Unknown
+    private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
 
-The stateful function was restarted, possibly on a different machine, before 
the ``CompletableFuture`` was completed, therefore it is unknown what is the 
status of the asynchronous operation.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/delayed");

Review comment:
       forNameFromString  => typeNameFromString

##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
 Stateful functions are the building blocks of applications; they are atomic 
units of isolation, distribution, and persistence.
 As objects, they encapsulate the state of a single entity (e.g., a specific 
user, device, or session) and encode its behavior.
 Stateful functions can interact with each other, and external systems, through 
message passing.
-The Java SDK is supported as an [embedded module]({{< ref 
"docs/sdk/overview#embedded-module" >}}).
 
 To get started, add the Java SDK as a dependency to your application.
 
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
 
 ## Defining A Stateful Function
 
-A stateful function is any class that implements the ``StatefulFunction`` 
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction` 
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
 
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-       @Override
-       public void invoke(Context context, Object input) {
-               System.out.println("Hello " + input.toString());
-       }
-}
-```
+public class GreeterFn implements StatefulFunction {
 
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so 
one function can potentially process multiple types of messages.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-The ``Context`` provides metadata about the current message and function, and 
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+    static final TypeName INBOX = 
TypeName.forNameFromString("com.example.fns/inbox");
 
-### Stateful Match Function 
+    static final ValueSpec<Integer> SEEN = 
ValueSpec.named("seen").withIntType();
 
-Stateful functions provide a powerful abstraction for working with events and 
state, allowing developers to build components that can react to any kind of 
message.
-Commonly, functions only need to handle a known set of message types, and the 
``StatefulMatchFunction`` interface provides an opinionated solution to that 
problem.
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!message.is(User.TYPE)) {
+            throw new IllegalStateException("Unknown type");
+        }
 
-#### Simple Match Function
+        User user = message.as(User.TYPE);
+        String name = user.getName();
 
-Stateful match functions are an opinionated variant of stateful functions for 
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed 
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the 
first time an instance is loaded.
+        var storage = context.storage();
+        var seen = storage.get(SEEN).orElse(0);
+        storage.set(SEEN, seen + 1);
 
-```java
-package org.apache.flink.statefun.docs.match;
+        context.send(
+            MessageBuilder.forAddress(INBOX, name)
+                .withValue("Hello " + name + " for the " + seen + "th time!")
+                .build());
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+        return context.done();
+    }
+}
+```
 
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be 
[registered](#exposing-functions) under the logical type name 
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current 
address and stores an integer.
 
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee);
-       }
+Every time a message is sent a greeter, it first validates the message 
containing a `User` and extracts its name. Both messages and state are strongly 
typed - either one of the default [built-in types]({{< ref 
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
 
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from 
the state store and updated
+and the greeting is sent to the users' inbox (another function type). 
 
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
+## Types
 
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-}
-```
+Stateful Functions strongly types ll messages and state values. 
+Because they run in a distributed manner and state values are persisted to 
stable storage, Stateful Functions aims to provide efficient and easy to user 
serializers. 
 
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for 
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more 
complex data structures. 
 
-Similar to the first example, match functions are partial by default and will 
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as 
a catch-all for unmatched input, think of it as a default clause in a Java 
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``, 
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function 
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between 
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical 
Java class, it can be passed to functions written in other langauge SDKs. 
 
 ```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee)
-                       .otherwise(this::catchAll);
-       }
-
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
-
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
-
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-
-       private void catchAll(Context context, Object message) {
-               System.out.println("Hello unexpected message");
-       }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
 
-#### Action Resolution Order
+public class User {
 
-Match functions will always match actions from most to least specific using 
the following resolution rules.
+    private static final ObjectMapper mapper = new ObjectMapper();
 
-First, find an action that matches the type and predicate. If two predicates 
will return true for a particular input, the one registered in the binder first 
wins.
-Next, search for an action that matches the type but does not have an 
associated predicate.
-Finally, if a catch-all exists, it will be executed or an 
``IllegalStateException`` will be thrown.
+    public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+        TypeName.typeNameFromString("com.example/User"),
+        mapper:writeValueAsBytes,
+        bytes -> mapper.readValue(byes, User.class));
 
-## Function Types and Messaging
+    private final String name;
 
-In Java, function types are defined as logical pointers composed of a 
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref 
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+    private final String favoriteColor;
 
-```java
-package org.apache.flink.statefun.docs;
+    @JsonCreator
+    public User(
+        @JsonProperty("name") String name,
+        @JsonProperty("favorite_color" String favoriteColor)) {
 
-import org.apache.flink.statefun.sdk.FunctionType;
+        this.name = Objects.requireNonNull(name);
+        this.favoriteColor = Objects.requireNonNull(favoriteColor);
+    }
 
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+    public String getName() {
+        return name;
+    }
 
-  public static final FunctionType HELLO_TYPE = new 
FunctionType("apache/flink", "hello");
-}
+    public String getFavoriteColor() {
+        return favoriteColor;
+    }
+
+    @Override
+    public String toString() {
+        return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+    }
 ```
 
-This type can then be referenced from other functions to create an address and 
message a particular instance.
+## State
 
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions 
can easily define state that is automatically made fault tolerant by the 
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s 
describing your state values.
+Value specifications are defined with a unique (to the function) name and 
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to 
the address of the current message.
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id 
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the 
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
 
-  @Override
-  public void invoke(Context context, Object input) {
-    context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
-  }
-}
+```java
+// Value specification for a state named `seen` 
+// with the primitive integer type
+ValueSpec
+    .named("seen")
+    .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+    .name("user")
+    .withCustomType(User.TYPE);
 ```
 
-## Sending Delayed Messages
+### State Expiration
 
-Functions are able to send messages on a delay so that they will arrive after 
some duration.
-Functions may even send themselves delayed messages that can serve as a 
callback.
-The delayed message is non-blocking so functions will continue to process 
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted 
after a specified duration.
 
 ```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
-       @Override
-       public void invoke(Context context, Object input) {
-               if (input instanceof Message) {
-                       System.out.println("Hello");
-                       context.sendAfter(Duration.ofMinutes(1), 
context.self(), new DelayedMessage());
-               }
-
-               if (input instanceof DelayedMessage) {
-                       System.out.println("Welcome to the future!");
-               }
-       }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes 
+// more than 30 minutes without being called
+ValueSpec
+    .named("seen")
+    .thatExpiresAfterCall(Duration.ofDays(1))
+    .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+    .named("seen")
+    .thatExpireAfterWrite(Duration.ofDays(1))
+    .withIntType();
 ```
 
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs 
to take care that communication delay with the external system does not 
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will 
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional 
context about the caller.
+## Sending Delayed Messages
 
-When the future completes, either successfully or exceptionally, the caller 
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some 
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process 
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering 
from failure. 
 
-### Success
+This example sends a response back to the calling function after a 30 minute 
delay.
 
-The asynchronous operation has succeeded, and the produced result can be 
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
 
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-The asynchronous operation has failed, and the cause can be obtained via 
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
 
-### Unknown
+    private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
 
-The stateful function was restarted, possibly on a different machine, before 
the ``CompletableFuture`` was completed, therefore it is unknown what is the 
status of the asynchronous operation.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/delayed");
 
-```java
-package org.apache.flink.statefun.docs.async;
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!context.caller().isPresent()) {
+            LOG.debug("Message has no known caller meaning it was sent 
directly from an ingress");
+            return;
+        }
 
-import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.sdk.AsyncOperationResult;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-@SuppressWarnings("unchecked")
-public class EnrichmentFunction implements StatefulFunction {
-
-       private final QueryService client;
-
-       public EnrichmentFunction(QueryService client) {
-               this.client = client;
-       }
-
-       @Override
-       public void invoke(Context context, Object input) {
-               if (input instanceof User) {
-                       onUser(context, (User) input);
-               } else if (input instanceof AsyncOperationResult) {
-                       onAsyncResult((AsyncOperationResult) input);
-               }
-       }
-
-       private void onUser(Context context, User user) {
-               CompletableFuture<UserEnrichment> future = 
client.getDataAsync(user.getUserId());
-               context.registerAsyncOperation(user, future);
-       }
-
-       private void onAsyncResult(AsyncOperationResult<User, UserEnrichment> 
result) {
-               if (result.successful()) {
-                       User metadata = result.metadata();
-                       UserEnrichment value = result.value();
-                       System.out.println(
-                               String.format("Successfully completed future: 
%s %s", metadata, value));
-               } else if (result.failure()) {
-                       System.out.println(
-                               String.format("Something has gone terribly 
wrong %s", result.throwable()));
-               } else {
-                       System.out.println("Not sure what happened, maybe 
retry");
-               }
-       }
+        var caller = context.caller().get();
+        context.sendAfter(Duration.ofMinutes(30), MessageBuilder
+            .forAddress(caller)
+            .withValue("Hello from the future!"));
+    }
 }
 ```
 
-## Persistence
-
-Stateful Functions treats state as a first class citizen and so all stateful 
functions can easily define state that is automatically made fault tolerant by 
the runtime.
-All stateful functions may contain state by merely defining one or more 
persisted fields.
+## Egress
 
-The simplest way to get started is with a ``PersistedValue``, which is defined 
by its name and the class of the type that it stores.
-The data is always scoped to a specific function type and identifier.
-Below is a stateful function that greets users based on the number of times 
they have been seen.
-
-{{< hint info >}}
-All **PersistedValue**, **PersistedTable**, and **PersistedAppendingBuffer** 
fields must be marked with a **@Persisted** annotation or they will not be made 
fault tolerant by the runtime.
-{{< /hint >}}
+Functions can message other stateful functions and egresses, exit points for 
sending messages to the outside world.
+As with other messages, egress messages are always well-typed. 
+Additionally, they contain metadata pertinent to the specific egress type.
 
+{{< tabs "egress" >}}
+{{< tab "Apache Kafka" >}}
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class FnUserGreeter implements StatefulFunction {
-
-       public static FunctionType TYPE = new FunctionType("example", 
"greeter");
-
-       @Persisted
-       private final PersistedValue<Integer> count = 
PersistedValue.of("count", Integer.class);
-
-       public void invoke(Context context, Object input) {
-               String userId = context.self().id();
-               int seen = count.getOrDefault(0);
-
-               switch (seen) {
-                       case 0:
-                               System.out.println(String.format("Hello %s!", 
userId));
-                               break;
-                       case 1:
-                               System.out.println("Hello Again!");
-                               break;
-                       case 2:
-                               System.out.println("Third time is the charm 
:)");
-                               break;
-                       default:
-                               System.out.println(String.format("Hello for the 
%d-th time", seen + 1));
-               }
-
-               count.set(seen + 1);
-       }
-}
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
 
-``PersistedValue`` comes with the right primitive methods to build powerful 
stateful applications.
-Calling ``PersistedValue#get`` will return the current value of an object 
stored in state, or ``null`` if nothing is set.
-Conversely, ``PersistedValue#set`` will update the value in state and 
``PersistedValue#clear`` will delete the value from state.
+public class GreeterFn implements StatefulFunction {
 
-### Collection Types
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");

Review comment:
       forNameFromString  => typeNameFromString

##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
 Stateful functions are the building blocks of applications; they are atomic 
units of isolation, distribution, and persistence.
 As objects, they encapsulate the state of a single entity (e.g., a specific 
user, device, or session) and encode its behavior.
 Stateful functions can interact with each other, and external systems, through 
message passing.
-The Java SDK is supported as an [embedded module]({{< ref 
"docs/sdk/overview#embedded-module" >}}).
 
 To get started, add the Java SDK as a dependency to your application.
 
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
 
 ## Defining A Stateful Function
 
-A stateful function is any class that implements the ``StatefulFunction`` 
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction` 
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
 
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-       @Override
-       public void invoke(Context context, Object input) {
-               System.out.println("Hello " + input.toString());
-       }
-}
-```
+public class GreeterFn implements StatefulFunction {
 
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so 
one function can potentially process multiple types of messages.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-The ``Context`` provides metadata about the current message and function, and 
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+    static final TypeName INBOX = 
TypeName.forNameFromString("com.example.fns/inbox");
 
-### Stateful Match Function 
+    static final ValueSpec<Integer> SEEN = 
ValueSpec.named("seen").withIntType();
 
-Stateful functions provide a powerful abstraction for working with events and 
state, allowing developers to build components that can react to any kind of 
message.
-Commonly, functions only need to handle a known set of message types, and the 
``StatefulMatchFunction`` interface provides an opinionated solution to that 
problem.
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!message.is(User.TYPE)) {
+            throw new IllegalStateException("Unknown type");
+        }
 
-#### Simple Match Function
+        User user = message.as(User.TYPE);
+        String name = user.getName();
 
-Stateful match functions are an opinionated variant of stateful functions for 
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed 
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the 
first time an instance is loaded.
+        var storage = context.storage();
+        var seen = storage.get(SEEN).orElse(0);
+        storage.set(SEEN, seen + 1);
 
-```java
-package org.apache.flink.statefun.docs.match;
+        context.send(
+            MessageBuilder.forAddress(INBOX, name)
+                .withValue("Hello " + name + " for the " + seen + "th time!")
+                .build());
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+        return context.done();
+    }
+}
+```
 
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be 
[registered](#exposing-functions) under the logical type name 
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current 
address and stores an integer.
 
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee);
-       }
+Every time a message is sent a greeter, it first validates the message 
containing a `User` and extracts its name. Both messages and state are strongly 
typed - either one of the default [built-in types]({{< ref 
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
 
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from 
the state store and updated
+and the greeting is sent to the users' inbox (another function type). 
 
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
+## Types
 
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-}
-```
+Stateful Functions strongly types ll messages and state values. 
+Because they run in a distributed manner and state values are persisted to 
stable storage, Stateful Functions aims to provide efficient and easy to user 
serializers. 
 
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for 
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more 
complex data structures. 
 
-Similar to the first example, match functions are partial by default and will 
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as 
a catch-all for unmatched input, think of it as a default clause in a Java 
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``, 
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function 
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between 
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical 
Java class, it can be passed to functions written in other langauge SDKs. 
 
 ```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee)
-                       .otherwise(this::catchAll);
-       }
-
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
-
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
-
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-
-       private void catchAll(Context context, Object message) {
-               System.out.println("Hello unexpected message");
-       }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
 
-#### Action Resolution Order
+public class User {
 
-Match functions will always match actions from most to least specific using 
the following resolution rules.
+    private static final ObjectMapper mapper = new ObjectMapper();
 
-First, find an action that matches the type and predicate. If two predicates 
will return true for a particular input, the one registered in the binder first 
wins.
-Next, search for an action that matches the type but does not have an 
associated predicate.
-Finally, if a catch-all exists, it will be executed or an 
``IllegalStateException`` will be thrown.
+    public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+        TypeName.typeNameFromString("com.example/User"),
+        mapper:writeValueAsBytes,
+        bytes -> mapper.readValue(byes, User.class));
 
-## Function Types and Messaging
+    private final String name;
 
-In Java, function types are defined as logical pointers composed of a 
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref 
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+    private final String favoriteColor;
 
-```java
-package org.apache.flink.statefun.docs;
+    @JsonCreator
+    public User(
+        @JsonProperty("name") String name,
+        @JsonProperty("favorite_color" String favoriteColor)) {
 
-import org.apache.flink.statefun.sdk.FunctionType;
+        this.name = Objects.requireNonNull(name);
+        this.favoriteColor = Objects.requireNonNull(favoriteColor);
+    }
 
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+    public String getName() {
+        return name;
+    }
 
-  public static final FunctionType HELLO_TYPE = new 
FunctionType("apache/flink", "hello");
-}
+    public String getFavoriteColor() {
+        return favoriteColor;
+    }
+
+    @Override
+    public String toString() {
+        return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+    }
 ```
 
-This type can then be referenced from other functions to create an address and 
message a particular instance.
+## State
 
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions 
can easily define state that is automatically made fault tolerant by the 
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s 
describing your state values.
+Value specifications are defined with a unique (to the function) name and 
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to 
the address of the current message.
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id 
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the 
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
 
-  @Override
-  public void invoke(Context context, Object input) {
-    context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
-  }
-}
+```java
+// Value specification for a state named `seen` 
+// with the primitive integer type
+ValueSpec
+    .named("seen")
+    .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+    .name("user")
+    .withCustomType(User.TYPE);
 ```
 
-## Sending Delayed Messages
+### State Expiration
 
-Functions are able to send messages on a delay so that they will arrive after 
some duration.
-Functions may even send themselves delayed messages that can serve as a 
callback.
-The delayed message is non-blocking so functions will continue to process 
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted 
after a specified duration.
 
 ```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
-       @Override
-       public void invoke(Context context, Object input) {
-               if (input instanceof Message) {
-                       System.out.println("Hello");
-                       context.sendAfter(Duration.ofMinutes(1), 
context.self(), new DelayedMessage());
-               }
-
-               if (input instanceof DelayedMessage) {
-                       System.out.println("Welcome to the future!");
-               }
-       }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes 
+// more than 30 minutes without being called
+ValueSpec
+    .named("seen")
+    .thatExpiresAfterCall(Duration.ofDays(1))
+    .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+    .named("seen")
+    .thatExpireAfterWrite(Duration.ofDays(1))
+    .withIntType();
 ```
 
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs 
to take care that communication delay with the external system does not 
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will 
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional 
context about the caller.
+## Sending Delayed Messages
 
-When the future completes, either successfully or exceptionally, the caller 
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some 
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process 
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering 
from failure. 
 
-### Success
+This example sends a response back to the calling function after a 30 minute 
delay.
 
-The asynchronous operation has succeeded, and the produced result can be 
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
 
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-The asynchronous operation has failed, and the cause can be obtained via 
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
 
-### Unknown
+    private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
 
-The stateful function was restarted, possibly on a different machine, before 
the ``CompletableFuture`` was completed, therefore it is unknown what is the 
status of the asynchronous operation.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/delayed");
 
-```java
-package org.apache.flink.statefun.docs.async;
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!context.caller().isPresent()) {
+            LOG.debug("Message has no known caller meaning it was sent 
directly from an ingress");
+            return;
+        }
 
-import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.sdk.AsyncOperationResult;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-@SuppressWarnings("unchecked")
-public class EnrichmentFunction implements StatefulFunction {
-
-       private final QueryService client;
-
-       public EnrichmentFunction(QueryService client) {
-               this.client = client;
-       }
-
-       @Override
-       public void invoke(Context context, Object input) {
-               if (input instanceof User) {
-                       onUser(context, (User) input);
-               } else if (input instanceof AsyncOperationResult) {
-                       onAsyncResult((AsyncOperationResult) input);
-               }
-       }
-
-       private void onUser(Context context, User user) {
-               CompletableFuture<UserEnrichment> future = 
client.getDataAsync(user.getUserId());
-               context.registerAsyncOperation(user, future);
-       }
-
-       private void onAsyncResult(AsyncOperationResult<User, UserEnrichment> 
result) {
-               if (result.successful()) {
-                       User metadata = result.metadata();
-                       UserEnrichment value = result.value();
-                       System.out.println(
-                               String.format("Successfully completed future: 
%s %s", metadata, value));
-               } else if (result.failure()) {
-                       System.out.println(
-                               String.format("Something has gone terribly 
wrong %s", result.throwable()));
-               } else {
-                       System.out.println("Not sure what happened, maybe 
retry");
-               }
-       }
+        var caller = context.caller().get();
+        context.sendAfter(Duration.ofMinutes(30), MessageBuilder
+            .forAddress(caller)
+            .withValue("Hello from the future!"));
+    }
 }
 ```
 
-## Persistence
-
-Stateful Functions treats state as a first class citizen and so all stateful 
functions can easily define state that is automatically made fault tolerant by 
the runtime.
-All stateful functions may contain state by merely defining one or more 
persisted fields.
+## Egress
 
-The simplest way to get started is with a ``PersistedValue``, which is defined 
by its name and the class of the type that it stores.
-The data is always scoped to a specific function type and identifier.
-Below is a stateful function that greets users based on the number of times 
they have been seen.
-
-{{< hint info >}}
-All **PersistedValue**, **PersistedTable**, and **PersistedAppendingBuffer** 
fields must be marked with a **@Persisted** annotation or they will not be made 
fault tolerant by the runtime.
-{{< /hint >}}
+Functions can message other stateful functions and egresses, exit points for 
sending messages to the outside world.
+As with other messages, egress messages are always well-typed. 
+Additionally, they contain metadata pertinent to the specific egress type.
 
+{{< tabs "egress" >}}
+{{< tab "Apache Kafka" >}}
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class FnUserGreeter implements StatefulFunction {
-
-       public static FunctionType TYPE = new FunctionType("example", 
"greeter");
-
-       @Persisted
-       private final PersistedValue<Integer> count = 
PersistedValue.of("count", Integer.class);
-
-       public void invoke(Context context, Object input) {
-               String userId = context.self().id();
-               int seen = count.getOrDefault(0);
-
-               switch (seen) {
-                       case 0:
-                               System.out.println(String.format("Hello %s!", 
userId));
-                               break;
-                       case 1:
-                               System.out.println("Hello Again!");
-                               break;
-                       case 2:
-                               System.out.println("Third time is the charm 
:)");
-                               break;
-                       default:
-                               System.out.println(String.format("Hello for the 
%d-th time", seen + 1));
-               }
-
-               count.set(seen + 1);
-       }
-}
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
 
-``PersistedValue`` comes with the right primitive methods to build powerful 
stateful applications.
-Calling ``PersistedValue#get`` will return the current value of an object 
stored in state, or ``null`` if nothing is set.
-Conversely, ``PersistedValue#set`` will update the value in state and 
``PersistedValue#clear`` will delete the value from state.
+public class GreeterFn implements StatefulFunction {
 
-### Collection Types
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-Along with ``PersistedValue``, the Java SDK supports two persisted collection 
types.
-``PersistedTable`` is a collection of keys and values, and 
``PersistedAppendingBuffer`` is an append-only buffer.
+    static final TypeName KAFKA_EGRESS = 
TypeName.forNameFromString("com.example/greets");
 
-These types are functionally equivalent to ``PersistedValue<Map>`` and 
``PersistedValue<Collection>`` respectively but may provide better performance 
in some situations.
+    static final ValueSpec<Integer> SEEN = 
ValueSpec.named("seen").withIntType();
 
-```java
-@Persisted
-PersistedTable<String, Integer> table = PersistedTable.of("my-table", 
String.class, Integer.class);
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!message.is(User.TYPE)) {
+            throw new IllegalStateException("Unknown type");
+        }
 
-@Persisted
-PersistedAppendingBuffer<Integer> buffer = 
PersistedAppendingBuffer.of("my-buffer", Integer.class);
-```
+        User user = message.as(User.TYPE);
+        String name = user.getName();
 
-### Dynamic State Registration
+        var storage = context.storage();
+        var seen = storage.get(SEEN).orElse(0);
+        storage.set(SEEN, seen + 1);
 
-Using the above state types, a function's persisted state must be defined 
eagerly. You cannot use those state types to
-register a new persisted state during invocations (i.e., in the ``invoke`` 
method) or after the function instance is created.
+        context.send(
+            KafkaEgressMessage.forEgress(KAFKA_EGRESS)
+                .withTopic("greetings")
+                .withUtf8Key(name)
+                .withUtf8Value("Hello " + name + " for the " + seen + "th 
time!")
+                .build());
 
-If dynamic state registration is required, it can be achieved using a 
``PersistedStateRegistry``:
-
-```java
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class MyFunction implements StatefulFunction {
-
-       @Persisted
-       private final PersistedStateRegistry registry = new 
PersistedStateRegistry();
-
-       private PersistedValue<Integer> value;
-
-       public void invoke(Context context, Object input) {
-               if (value == null) {
-                       value = PersistedValue.of("my-value", Integer.class);
-                       registry.registerValue(value);
-               }
-               int count = value.getOrDefault(0);
-               // ...
-       }
+        return context.done();
+    }
 }
 ```
-
-Note how the ``PersistedValue`` field doesn't need to be annotated with the 
``@Persisted`` annotations, and is initially
-empty. The state object is dynamically created during invocation and 
registered with the ``PersistedStateRegistry`` so
-that the system picks it up to be managed for fault-tolerance.
-
-### State Expiration
-
-Persisted states may be configured to expire and be deleted after a specified 
duration.
-This is supported by all types of state:
-
+{{< /tab >}}
+{{< tab "Amazon Kinesis" >}}
 ```java
-@Persisted
-PersistedValue<Integer> value = PersistedValue.of(
-    "my-value",
-    Integer.class,
-    Expiration.expireAfterWriting(Duration.ofHours(1)));
-
-@Persisted
-PersistedTable<String, Integer> table = PersistedTable.of(
-    "my-table",
-    String.class,
-    Integer.class,
-    Expiration.expireAfterWriting(Duration.ofMinutes(5)));
-
-@Persisted
-PersistedAppendingBuffer<Integer> buffer = PersistedAppendingBuffer.of(
-    "my-buffer",
-    Integer.class,
-    Expiration.expireAfterWriting(Duration.ofSeconds(30)));
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KinesisEgressMessage;
 
-There are two expiration modes supported:
+public class GreeterFn implements StatefulFunction {
 
-```java
-Expiration.expireAfterWriting(...)
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-Expiration.expireAfterReadingOrWriting(...)
-```
+    static final TypeName KINESIS_EGRESS = 
TypeName.forNameFromString("com.example/greets");

Review comment:
       forNameFromString  => typeNameFromString

##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
 Stateful functions are the building blocks of applications; they are atomic 
units of isolation, distribution, and persistence.
 As objects, they encapsulate the state of a single entity (e.g., a specific 
user, device, or session) and encode its behavior.
 Stateful functions can interact with each other, and external systems, through 
message passing.
-The Java SDK is supported as an [embedded module]({{< ref 
"docs/sdk/overview#embedded-module" >}}).
 
 To get started, add the Java SDK as a dependency to your application.
 
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
 
 ## Defining A Stateful Function
 
-A stateful function is any class that implements the ``StatefulFunction`` 
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction` 
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
 
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-       @Override
-       public void invoke(Context context, Object input) {
-               System.out.println("Hello " + input.toString());
-       }
-}
-```
+public class GreeterFn implements StatefulFunction {
 
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so 
one function can potentially process multiple types of messages.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-The ``Context`` provides metadata about the current message and function, and 
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+    static final TypeName INBOX = 
TypeName.forNameFromString("com.example.fns/inbox");
 
-### Stateful Match Function 
+    static final ValueSpec<Integer> SEEN = 
ValueSpec.named("seen").withIntType();
 
-Stateful functions provide a powerful abstraction for working with events and 
state, allowing developers to build components that can react to any kind of 
message.
-Commonly, functions only need to handle a known set of message types, and the 
``StatefulMatchFunction`` interface provides an opinionated solution to that 
problem.
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!message.is(User.TYPE)) {
+            throw new IllegalStateException("Unknown type");
+        }
 
-#### Simple Match Function
+        User user = message.as(User.TYPE);
+        String name = user.getName();
 
-Stateful match functions are an opinionated variant of stateful functions for 
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed 
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the 
first time an instance is loaded.
+        var storage = context.storage();
+        var seen = storage.get(SEEN).orElse(0);
+        storage.set(SEEN, seen + 1);
 
-```java
-package org.apache.flink.statefun.docs.match;
+        context.send(
+            MessageBuilder.forAddress(INBOX, name)
+                .withValue("Hello " + name + " for the " + seen + "th time!")
+                .build());
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+        return context.done();
+    }
+}
+```
 
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be 
[registered](#exposing-functions) under the logical type name 
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current 
address and stores an integer.
 
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee);
-       }
+Every time a message is sent a greeter, it first validates the message 
containing a `User` and extracts its name. Both messages and state are strongly 
typed - either one of the default [built-in types]({{< ref 
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
 
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from 
the state store and updated
+and the greeting is sent to the users' inbox (another function type). 
 
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
+## Types
 
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-}
-```
+Stateful Functions strongly types ll messages and state values. 
+Because they run in a distributed manner and state values are persisted to 
stable storage, Stateful Functions aims to provide efficient and easy to user 
serializers. 
 
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for 
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more 
complex data structures. 
 
-Similar to the first example, match functions are partial by default and will 
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as 
a catch-all for unmatched input, think of it as a default clause in a Java 
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``, 
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function 
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between 
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical 
Java class, it can be passed to functions written in other langauge SDKs. 
 
 ```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee)
-                       .otherwise(this::catchAll);
-       }
-
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
-
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
-
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-
-       private void catchAll(Context context, Object message) {
-               System.out.println("Hello unexpected message");
-       }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
 
-#### Action Resolution Order
+public class User {
 
-Match functions will always match actions from most to least specific using 
the following resolution rules.
+    private static final ObjectMapper mapper = new ObjectMapper();
 
-First, find an action that matches the type and predicate. If two predicates 
will return true for a particular input, the one registered in the binder first 
wins.
-Next, search for an action that matches the type but does not have an 
associated predicate.
-Finally, if a catch-all exists, it will be executed or an 
``IllegalStateException`` will be thrown.
+    public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+        TypeName.typeNameFromString("com.example/User"),
+        mapper:writeValueAsBytes,
+        bytes -> mapper.readValue(byes, User.class));
 
-## Function Types and Messaging
+    private final String name;
 
-In Java, function types are defined as logical pointers composed of a 
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref 
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+    private final String favoriteColor;
 
-```java
-package org.apache.flink.statefun.docs;
+    @JsonCreator
+    public User(
+        @JsonProperty("name") String name,
+        @JsonProperty("favorite_color" String favoriteColor)) {
 
-import org.apache.flink.statefun.sdk.FunctionType;
+        this.name = Objects.requireNonNull(name);
+        this.favoriteColor = Objects.requireNonNull(favoriteColor);
+    }
 
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+    public String getName() {
+        return name;
+    }
 
-  public static final FunctionType HELLO_TYPE = new 
FunctionType("apache/flink", "hello");
-}
+    public String getFavoriteColor() {
+        return favoriteColor;
+    }
+
+    @Override
+    public String toString() {
+        return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+    }
 ```
 
-This type can then be referenced from other functions to create an address and 
message a particular instance.
+## State
 
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions 
can easily define state that is automatically made fault tolerant by the 
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s 
describing your state values.
+Value specifications are defined with a unique (to the function) name and 
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to 
the address of the current message.
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id 
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the 
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
 
-  @Override
-  public void invoke(Context context, Object input) {
-    context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
-  }
-}
+```java
+// Value specification for a state named `seen` 
+// with the primitive integer type
+ValueSpec
+    .named("seen")
+    .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+    .name("user")
+    .withCustomType(User.TYPE);
 ```
 
-## Sending Delayed Messages
+### State Expiration
 
-Functions are able to send messages on a delay so that they will arrive after 
some duration.
-Functions may even send themselves delayed messages that can serve as a 
callback.
-The delayed message is non-blocking so functions will continue to process 
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted 
after a specified duration.
 
 ```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
-       @Override
-       public void invoke(Context context, Object input) {
-               if (input instanceof Message) {
-                       System.out.println("Hello");
-                       context.sendAfter(Duration.ofMinutes(1), 
context.self(), new DelayedMessage());
-               }
-
-               if (input instanceof DelayedMessage) {
-                       System.out.println("Welcome to the future!");
-               }
-       }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes 
+// more than 30 minutes without being called
+ValueSpec
+    .named("seen")
+    .thatExpiresAfterCall(Duration.ofDays(1))
+    .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+    .named("seen")
+    .thatExpireAfterWrite(Duration.ofDays(1))
+    .withIntType();
 ```
 
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs 
to take care that communication delay with the external system does not 
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will 
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional 
context about the caller.
+## Sending Delayed Messages
 
-When the future completes, either successfully or exceptionally, the caller 
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some 
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process 
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering 
from failure. 
 
-### Success
+This example sends a response back to the calling function after a 30 minute 
delay.
 
-The asynchronous operation has succeeded, and the produced result can be 
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
 
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-The asynchronous operation has failed, and the cause can be obtained via 
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
 
-### Unknown
+    private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
 
-The stateful function was restarted, possibly on a different machine, before 
the ``CompletableFuture`` was completed, therefore it is unknown what is the 
status of the asynchronous operation.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/delayed");
 
-```java
-package org.apache.flink.statefun.docs.async;
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!context.caller().isPresent()) {
+            LOG.debug("Message has no known caller meaning it was sent 
directly from an ingress");
+            return;
+        }
 
-import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.sdk.AsyncOperationResult;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-@SuppressWarnings("unchecked")
-public class EnrichmentFunction implements StatefulFunction {
-
-       private final QueryService client;
-
-       public EnrichmentFunction(QueryService client) {
-               this.client = client;
-       }
-
-       @Override
-       public void invoke(Context context, Object input) {
-               if (input instanceof User) {
-                       onUser(context, (User) input);
-               } else if (input instanceof AsyncOperationResult) {
-                       onAsyncResult((AsyncOperationResult) input);
-               }
-       }
-
-       private void onUser(Context context, User user) {
-               CompletableFuture<UserEnrichment> future = 
client.getDataAsync(user.getUserId());
-               context.registerAsyncOperation(user, future);
-       }
-
-       private void onAsyncResult(AsyncOperationResult<User, UserEnrichment> 
result) {
-               if (result.successful()) {
-                       User metadata = result.metadata();
-                       UserEnrichment value = result.value();
-                       System.out.println(
-                               String.format("Successfully completed future: 
%s %s", metadata, value));
-               } else if (result.failure()) {
-                       System.out.println(
-                               String.format("Something has gone terribly 
wrong %s", result.throwable()));
-               } else {
-                       System.out.println("Not sure what happened, maybe 
retry");
-               }
-       }
+        var caller = context.caller().get();
+        context.sendAfter(Duration.ofMinutes(30), MessageBuilder
+            .forAddress(caller)
+            .withValue("Hello from the future!"));
+    }
 }
 ```
 
-## Persistence
-
-Stateful Functions treats state as a first class citizen and so all stateful 
functions can easily define state that is automatically made fault tolerant by 
the runtime.
-All stateful functions may contain state by merely defining one or more 
persisted fields.
+## Egress
 
-The simplest way to get started is with a ``PersistedValue``, which is defined 
by its name and the class of the type that it stores.
-The data is always scoped to a specific function type and identifier.
-Below is a stateful function that greets users based on the number of times 
they have been seen.
-
-{{< hint info >}}
-All **PersistedValue**, **PersistedTable**, and **PersistedAppendingBuffer** 
fields must be marked with a **@Persisted** annotation or they will not be made 
fault tolerant by the runtime.
-{{< /hint >}}
+Functions can message other stateful functions and egresses, exit points for 
sending messages to the outside world.
+As with other messages, egress messages are always well-typed. 
+Additionally, they contain metadata pertinent to the specific egress type.
 
+{{< tabs "egress" >}}
+{{< tab "Apache Kafka" >}}
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class FnUserGreeter implements StatefulFunction {
-
-       public static FunctionType TYPE = new FunctionType("example", 
"greeter");
-
-       @Persisted
-       private final PersistedValue<Integer> count = 
PersistedValue.of("count", Integer.class);
-
-       public void invoke(Context context, Object input) {
-               String userId = context.self().id();
-               int seen = count.getOrDefault(0);
-
-               switch (seen) {
-                       case 0:
-                               System.out.println(String.format("Hello %s!", 
userId));
-                               break;
-                       case 1:
-                               System.out.println("Hello Again!");
-                               break;
-                       case 2:
-                               System.out.println("Third time is the charm 
:)");
-                               break;
-                       default:
-                               System.out.println(String.format("Hello for the 
%d-th time", seen + 1));
-               }
-
-               count.set(seen + 1);
-       }
-}
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
 
-``PersistedValue`` comes with the right primitive methods to build powerful 
stateful applications.
-Calling ``PersistedValue#get`` will return the current value of an object 
stored in state, or ``null`` if nothing is set.
-Conversely, ``PersistedValue#set`` will update the value in state and 
``PersistedValue#clear`` will delete the value from state.
+public class GreeterFn implements StatefulFunction {
 
-### Collection Types
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-Along with ``PersistedValue``, the Java SDK supports two persisted collection 
types.
-``PersistedTable`` is a collection of keys and values, and 
``PersistedAppendingBuffer`` is an append-only buffer.
+    static final TypeName KAFKA_EGRESS = 
TypeName.forNameFromString("com.example/greets");

Review comment:
       forNameFromString  => typeNameFromString

##########
File path: docs/content/docs/sdk/java.md
##########
@@ -29,451 +29,395 @@ under the License.
 Stateful functions are the building blocks of applications; they are atomic 
units of isolation, distribution, and persistence.
 As objects, they encapsulate the state of a single entity (e.g., a specific 
user, device, or session) and encode its behavior.
 Stateful functions can interact with each other, and external systems, through 
message passing.
-The Java SDK is supported as an [embedded module]({{< ref 
"docs/sdk/overview#embedded-module" >}}).
 
 To get started, add the Java SDK as a dependency to your application.
 
-{{< artifact statefun-sdk >}}
+{{< artifact statefun-sdk-java >}}
 
 ## Defining A Stateful Function
 
-A stateful function is any class that implements the ``StatefulFunction`` 
interface.
-The following is an example of a simple hello world function.
+A stateful function is any class that implements the `StatefulFunction` 
interface.
+In the following example, a `StatefulFunction` maintains a count for every user
+of an application, emitting a customized greeting.
 
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnHelloWorld implements StatefulFunction {
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-       @Override
-       public void invoke(Context context, Object input) {
-               System.out.println("Hello " + input.toString());
-       }
-}
-```
+public class GreeterFn implements StatefulFunction {
 
-Functions process each incoming message through their ``invoke`` method.
-Input's are untyped and passed through the system as a ``java.lang.Object`` so 
one function can potentially process multiple types of messages.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-The ``Context`` provides metadata about the current message and function, and 
is how you can call other functions or external systems.
-Functions are invoked based on a function type and unique identifier.
+    static final TypeName INBOX = 
TypeName.forNameFromString("com.example.fns/inbox");
 
-### Stateful Match Function 
+    static final ValueSpec<Integer> SEEN = 
ValueSpec.named("seen").withIntType();
 
-Stateful functions provide a powerful abstraction for working with events and 
state, allowing developers to build components that can react to any kind of 
message.
-Commonly, functions only need to handle a known set of message types, and the 
``StatefulMatchFunction`` interface provides an opinionated solution to that 
problem.
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!message.is(User.TYPE)) {
+            throw new IllegalStateException("Unknown type");
+        }
 
-#### Simple Match Function
+        User user = message.as(User.TYPE);
+        String name = user.getName();
 
-Stateful match functions are an opinionated variant of stateful functions for 
precisely this pattern.
-Developers outline expected types, optional predicates, and well-typed 
business logic and let the system dispatch each input to the correct action.
-Variants are bound inside a ``configure`` method that is executed once the 
first time an instance is loaded.
+        var storage = context.storage();
+        var seen = storage.get(SEEN).orElse(0);
+        storage.set(SEEN, seen + 1);
 
-```java
-package org.apache.flink.statefun.docs.match;
+        context.send(
+            MessageBuilder.forAddress(INBOX, name)
+                .withValue("Hello " + name + " for the " + seen + "th time!")
+                .build());
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
+        return context.done();
+    }
+}
+```
 
-public class FnMatchGreeter extends StatefulMatchFunction {
+This code declares a greeter function that will be 
[registered](#exposing-functions) under the logical type name 
`com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`.
+It contains a single `ValueSpec`, which is implicitly scoped to the current 
address and stores an integer.
 
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee);
-       }
+Every time a message is sent a greeter, it first validates the message 
containing a `User` and extracts its name. Both messages and state are strongly 
typed - either one of the default [built-in types]({{< ref 
"docs/sdk/appendix#types" >}}) - or a [custom type](#types) as in this case.
 
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
+The function finally builds a custom greeting for the user.
+The number of times that particular user has been seen so far is queried from 
the state store and updated
+and the greeting is sent to the users' inbox (another function type). 
 
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
+## Types
 
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-}
-```
+Stateful Functions strongly types ll messages and state values. 
+Because they run in a distributed manner and state values are persisted to 
stable storage, Stateful Functions aims to provide efficient and easy to user 
serializers. 
 
-#### Making Your Function Complete
+Out of the box, all SDKs offer a set of highly optimized serializers for 
common primitive types; boolean, numerics, and strings.
+Additionally, users are encouraged to plug-in custom types to model more 
complex data structures. 
 
-Similar to the first example, match functions are partial by default and will 
throw an ``IllegalStateException`` on any input that does not match any branch.
-They can be made complete by providing an ``otherwise`` clause that serves as 
a catch-all for unmatched input, think of it as a default clause in a Java 
switch statement.
-The ``otherwise`` action takes its message as an untyped ``java.lang.Object``, 
allowing you to handle any unexpected messages.
+In the [example above](#defining-a-stateful-function), the greeter function 
consumes `User` messages, a POJO type containing several fields.
+By defining a custom type, this object can be passed transparently between 
functions and stored in state.
+And because the type is tied to a logical typename, instead of the physical 
Java class, it can be passed to functions written in other langauge SDKs. 
 
 ```java
-package org.apache.flink.statefun.docs.match;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-
-public class FnMatchGreeterWithCatchAll extends StatefulMatchFunction {
-
-       @Override
-       public void configure(MatchBinder binder) {
-               binder
-                       .predicate(Customer.class, this::greetCustomer)
-                       .predicate(Employee.class, Employee::isManager, 
this::greetManager)
-                       .predicate(Employee.class, this::greetEmployee)
-                       .otherwise(this::catchAll);
-       }
-
-       private void greetCustomer(Context context, Customer message) {
-               System.out.println("Hello customer " + message.getName());
-       }
-
-       private void greetEmployee(Context context, Employee message) {
-               System.out.println("Hello employee " + message.getEmployeeId());
-       }
-
-       private void greetManager(Context context, Employee message) {
-               System.out.println("Hello manager " + message.getEmployeeId());
-       }
-
-       private void catchAll(Context context, Object message) {
-               System.out.println("Hello unexpected message");
-       }
-}
-```
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.types.SimpleType;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import java.util.Objects;
 
-#### Action Resolution Order
+public class User {
 
-Match functions will always match actions from most to least specific using 
the following resolution rules.
+    private static final ObjectMapper mapper = new ObjectMapper();
 
-First, find an action that matches the type and predicate. If two predicates 
will return true for a particular input, the one registered in the binder first 
wins.
-Next, search for an action that matches the type but does not have an 
associated predicate.
-Finally, if a catch-all exists, it will be executed or an 
``IllegalStateException`` will be thrown.
+    public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
+        TypeName.typeNameFromString("com.example/User"),
+        mapper:writeValueAsBytes,
+        bytes -> mapper.readValue(byes, User.class));
 
-## Function Types and Messaging
+    private final String name;
 
-In Java, function types are defined as logical pointers composed of a 
namespace and name.
-The type is bound to the implementing class in the [module]({{< ref 
"docs/sdk/overview#embedded-module" >}}) definition.
-Below is an example function type for the hello world function.
+    private final String favoriteColor;
 
-```java
-package org.apache.flink.statefun.docs;
+    @JsonCreator
+    public User(
+        @JsonProperty("name") String name,
+        @JsonProperty("favorite_color" String favoriteColor)) {
 
-import org.apache.flink.statefun.sdk.FunctionType;
+        this.name = Objects.requireNonNull(name);
+        this.favoriteColor = Objects.requireNonNull(favoriteColor);
+    }
 
-/** A function type that will be bound to {@link FnHelloWorld}. */
-public class Identifiers {
+    public String getName() {
+        return name;
+    }
 
-  public static final FunctionType HELLO_TYPE = new 
FunctionType("apache/flink", "hello");
-}
+    public String getFavoriteColor() {
+        return favoriteColor;
+    }
+
+    @Override
+    public String toString() {
+        return "User{name=" name + ",favoriteColor=" favoriteColor + "}"
+    }
 ```
 
-This type can then be referenced from other functions to create an address and 
message a particular instance.
+## State
 
-```java
-package org.apache.flink.statefun.docs;
+Stateful Functions treats state as a first class citizen and so all functions 
can easily define state that is automatically made fault tolerant by the 
runtime.
+State declaration is as simple as defining one or more `ValueSpec`'s 
describing your state values.
+Value specifications are defined with a unique (to the function) name and 
[type](#types).
+At runtime, functions can `get`, `set`, and `remove` state values scoped to 
the address of the current message.
 
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-/** A simple stateful function that sends a message to the user with id 
"user1" */
-public class FnCaller implements StatefulFunction {
+{{< hint info >}}
+All value specificiations must be earerly registered in the 
`StatefulFuctionSpec` when composing
+the applications [RequestReplyHandler](#exposing-functions).
+{{< /hint >}}
 
-  @Override
-  public void invoke(Context context, Object input) {
-    context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
-  }
-}
+```java
+// Value specification for a state named `seen` 
+// with the primitive integer type
+ValueSpec
+    .named("seen")
+    .withIntType();
+
+// Value specification with a custom type
+ValueSpec
+    .name("user")
+    .withCustomType(User.TYPE);
 ```
 
-## Sending Delayed Messages
+### State Expiration
 
-Functions are able to send messages on a delay so that they will arrive after 
some duration.
-Functions may even send themselves delayed messages that can serve as a 
callback.
-The delayed message is non-blocking so functions will continue to process 
records between the time a delayed message is sent and received.
+By default, state values are persisted until manually `remove`d by the user.
+Optionally, they may be configured to expire and be automatically deleted 
after a specified duration.
 
 ```java
-package org.apache.flink.statefun.docs.delay;
-
-import java.time.Duration;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-public class FnDelayedMessage implements StatefulFunction {
-
-       @Override
-       public void invoke(Context context, Object input) {
-               if (input instanceof Message) {
-                       System.out.println("Hello");
-                       context.sendAfter(Duration.ofMinutes(1), 
context.self(), new DelayedMessage());
-               }
-
-               if (input instanceof DelayedMessage) {
-                       System.out.println("Welcome to the future!");
-               }
-       }
-}
+// Value specification that will automatically
+// delete the value if the function instance goes 
+// more than 30 minutes without being called
+ValueSpec
+    .named("seen")
+    .thatExpiresAfterCall(Duration.ofDays(1))
+    .withIntType();
+
+// Value specification that will automatically
+// delete the value if it goes more than 1 day
+// without being written
+ValueSpec
+    .named("seen")
+    .thatExpireAfterWrite(Duration.ofDays(1))
+    .withIntType();
 ```
 
-## Completing Async Requests
-
-When interacting with external systems, such as a database or API, one needs 
to take care that communication delay with the external system does not 
dominate the application’s total work.
-Stateful Functions allows registering a Java ``CompletableFuture`` that will 
resolve to a value at some point in the future.
-Future's are registered along with a metadata object that provides additional 
context about the caller.
+## Sending Delayed Messages
 
-When the future completes, either successfully or exceptionally, the caller 
function type and id will be invoked with a ``AsyncOperationResult``.
-An asynchronous result can complete in one of three states:
+Functions can send messages on a delay so that they will arrive after some 
duration.
+They may even send themselves delayed messages that can serve as a callback.
+The delayed message is non-blocking, so functions will continue to process 
records between when a delayed message is sent and received.
+Additionally, they are fault-tolerant and never lost, even when recovering 
from failure. 
 
-### Success
+This example sends a response back to the calling function after a 30 minute 
delay.
 
-The asynchronous operation has succeeded, and the produced result can be 
obtained via ``AsyncOperationResult#value``.
+```java
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
 
-### Failure
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.message.Message;
 
-The asynchronous operation has failed, and the cause can be obtained via 
``AsyncOperationResult#throwable``.
+public class DelayedFn implements StatefulFunction {
 
-### Unknown
+    private static final Logger LOG = LoggerFactory.getLogger(DelayedFn.class);
 
-The stateful function was restarted, possibly on a different machine, before 
the ``CompletableFuture`` was completed, therefore it is unknown what is the 
status of the asynchronous operation.
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/delayed");
 
-```java
-package org.apache.flink.statefun.docs.async;
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!context.caller().isPresent()) {
+            LOG.debug("Message has no known caller meaning it was sent 
directly from an ingress");
+            return;
+        }
 
-import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.sdk.AsyncOperationResult;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-@SuppressWarnings("unchecked")
-public class EnrichmentFunction implements StatefulFunction {
-
-       private final QueryService client;
-
-       public EnrichmentFunction(QueryService client) {
-               this.client = client;
-       }
-
-       @Override
-       public void invoke(Context context, Object input) {
-               if (input instanceof User) {
-                       onUser(context, (User) input);
-               } else if (input instanceof AsyncOperationResult) {
-                       onAsyncResult((AsyncOperationResult) input);
-               }
-       }
-
-       private void onUser(Context context, User user) {
-               CompletableFuture<UserEnrichment> future = 
client.getDataAsync(user.getUserId());
-               context.registerAsyncOperation(user, future);
-       }
-
-       private void onAsyncResult(AsyncOperationResult<User, UserEnrichment> 
result) {
-               if (result.successful()) {
-                       User metadata = result.metadata();
-                       UserEnrichment value = result.value();
-                       System.out.println(
-                               String.format("Successfully completed future: 
%s %s", metadata, value));
-               } else if (result.failure()) {
-                       System.out.println(
-                               String.format("Something has gone terribly 
wrong %s", result.throwable()));
-               } else {
-                       System.out.println("Not sure what happened, maybe 
retry");
-               }
-       }
+        var caller = context.caller().get();
+        context.sendAfter(Duration.ofMinutes(30), MessageBuilder
+            .forAddress(caller)
+            .withValue("Hello from the future!"));
+    }
 }
 ```
 
-## Persistence
-
-Stateful Functions treats state as a first class citizen and so all stateful 
functions can easily define state that is automatically made fault tolerant by 
the runtime.
-All stateful functions may contain state by merely defining one or more 
persisted fields.
+## Egress
 
-The simplest way to get started is with a ``PersistedValue``, which is defined 
by its name and the class of the type that it stores.
-The data is always scoped to a specific function type and identifier.
-Below is a stateful function that greets users based on the number of times 
they have been seen.
-
-{{< hint info >}}
-All **PersistedValue**, **PersistedTable**, and **PersistedAppendingBuffer** 
fields must be marked with a **@Persisted** annotation or they will not be made 
fault tolerant by the runtime.
-{{< /hint >}}
+Functions can message other stateful functions and egresses, exit points for 
sending messages to the outside world.
+As with other messages, egress messages are always well-typed. 
+Additionally, they contain metadata pertinent to the specific egress type.
 
+{{< tabs "egress" >}}
+{{< tab "Apache Kafka" >}}
 ```java
-package org.apache.flink.statefun.docs;
-
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class FnUserGreeter implements StatefulFunction {
-
-       public static FunctionType TYPE = new FunctionType("example", 
"greeter");
-
-       @Persisted
-       private final PersistedValue<Integer> count = 
PersistedValue.of("count", Integer.class);
-
-       public void invoke(Context context, Object input) {
-               String userId = context.self().id();
-               int seen = count.getOrDefault(0);
-
-               switch (seen) {
-                       case 0:
-                               System.out.println(String.format("Hello %s!", 
userId));
-                               break;
-                       case 1:
-                               System.out.println("Hello Again!");
-                               break;
-                       case 2:
-                               System.out.println("Third time is the charm 
:)");
-                               break;
-                       default:
-                               System.out.println(String.format("Hello for the 
%d-th time", seen + 1));
-               }
-
-               count.set(seen + 1);
-       }
-}
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
 
-``PersistedValue`` comes with the right primitive methods to build powerful 
stateful applications.
-Calling ``PersistedValue#get`` will return the current value of an object 
stored in state, or ``null`` if nothing is set.
-Conversely, ``PersistedValue#set`` will update the value in state and 
``PersistedValue#clear`` will delete the value from state.
+public class GreeterFn implements StatefulFunction {
 
-### Collection Types
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");
 
-Along with ``PersistedValue``, the Java SDK supports two persisted collection 
types.
-``PersistedTable`` is a collection of keys and values, and 
``PersistedAppendingBuffer`` is an append-only buffer.
+    static final TypeName KAFKA_EGRESS = 
TypeName.forNameFromString("com.example/greets");
 
-These types are functionally equivalent to ``PersistedValue<Map>`` and 
``PersistedValue<Collection>`` respectively but may provide better performance 
in some situations.
+    static final ValueSpec<Integer> SEEN = 
ValueSpec.named("seen").withIntType();
 
-```java
-@Persisted
-PersistedTable<String, Integer> table = PersistedTable.of("my-table", 
String.class, Integer.class);
+    @Override 
+    CompletableFuture<Void> apply(Context context, Message message) {
+        if (!message.is(User.TYPE)) {
+            throw new IllegalStateException("Unknown type");
+        }
 
-@Persisted
-PersistedAppendingBuffer<Integer> buffer = 
PersistedAppendingBuffer.of("my-buffer", Integer.class);
-```
+        User user = message.as(User.TYPE);
+        String name = user.getName();
 
-### Dynamic State Registration
+        var storage = context.storage();
+        var seen = storage.get(SEEN).orElse(0);
+        storage.set(SEEN, seen + 1);
 
-Using the above state types, a function's persisted state must be defined 
eagerly. You cannot use those state types to
-register a new persisted state during invocations (i.e., in the ``invoke`` 
method) or after the function instance is created.
+        context.send(
+            KafkaEgressMessage.forEgress(KAFKA_EGRESS)
+                .withTopic("greetings")
+                .withUtf8Key(name)
+                .withUtf8Value("Hello " + name + " for the " + seen + "th 
time!")
+                .build());
 
-If dynamic state registration is required, it can be achieved using a 
``PersistedStateRegistry``:
-
-```java
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-public class MyFunction implements StatefulFunction {
-
-       @Persisted
-       private final PersistedStateRegistry registry = new 
PersistedStateRegistry();
-
-       private PersistedValue<Integer> value;
-
-       public void invoke(Context context, Object input) {
-               if (value == null) {
-                       value = PersistedValue.of("my-value", Integer.class);
-                       registry.registerValue(value);
-               }
-               int count = value.getOrDefault(0);
-               // ...
-       }
+        return context.done();
+    }
 }
 ```
-
-Note how the ``PersistedValue`` field doesn't need to be annotated with the 
``@Persisted`` annotations, and is initially
-empty. The state object is dynamically created during invocation and 
registered with the ``PersistedStateRegistry`` so
-that the system picks it up to be managed for fault-tolerance.
-
-### State Expiration
-
-Persisted states may be configured to expire and be deleted after a specified 
duration.
-This is supported by all types of state:
-
+{{< /tab >}}
+{{< tab "Amazon Kinesis" >}}
 ```java
-@Persisted
-PersistedValue<Integer> value = PersistedValue.of(
-    "my-value",
-    Integer.class,
-    Expiration.expireAfterWriting(Duration.ofHours(1)));
-
-@Persisted
-PersistedTable<String, Integer> table = PersistedTable.of(
-    "my-table",
-    String.class,
-    Integer.class,
-    Expiration.expireAfterWriting(Duration.ofMinutes(5)));
-
-@Persisted
-PersistedAppendingBuffer<Integer> buffer = PersistedAppendingBuffer.of(
-    "my-buffer",
-    Integer.class,
-    Expiration.expireAfterWriting(Duration.ofSeconds(30)));
-```
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.io.KinesisEgressMessage;
 
-There are two expiration modes supported:
+public class GreeterFn implements StatefulFunction {
 
-```java
-Expiration.expireAfterWriting(...)
+    static final TypeName TYPE = 
TypeName.forNameFromString("com.example.fns/greeter");

Review comment:
       forNameFromString  => typeNameFromString




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to