This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
The following commit(s) were added to refs/heads/release-3.2 by this push:
new 1ba46ba [FLINK-26155] Add Playground ingress/egress that allows to
ingest and consume messages via curl
1ba46ba is described below
commit 1ba46baa2c645bc70883c5f435fee96a6b576230
Author: Till Rohrmann <[email protected]>
AuthorDate: Sat Feb 12 17:48:50 2022 +0100
[FLINK-26155] Add Playground ingress/egress that allows to ingest and
consume messages via curl
The PlaygroundIngress and PlaygroundIngress spin up a web server that
allows to ingest and consume
messages that are kept in memory via curl.
The way to ingest new messages is by using curl:
curl -X PUT -H "Content-Type: application/vnd.<TYPE_URL>" -d '<DATA>'
localhost:8090/<NAMESPACE>/<FUNCTION>/<ID>
Messages can be consumed from the egress via curl:
curl -X GET localhost:8091/<TOPIC>
In order to use the ingress it needs to be configured in the module.yaml
via:
kind: io.statefun.playground.v1/ingress
spec:
port: 8090
In order to use the egress it needs to be configured in the module.yaml via:
kind: io.statefun.playground.v1/egress
spec:
port: 8091
topics:
- greetings
This closes #25.
---
...pache.flink.statefun.extensions.ExtensionModule | 10 ++
.../statefun-playground-entrypoint/pom.xml | 45 ++++++++
.../entrypoint/LocalEnvironmentEntrypoint.java | 3 +-
.../statefun/playground/internal/io/Constants.java | 29 +++++
.../playground/internal/io/binders/Utils.java | 34 ++++++
.../internal/io/binders/egress/v1/Module.java | 13 +++
.../binders/egress/v1/PlaygroundEgressBinder.java | 24 ++++
.../internal/io/binders/ingress/v1/Module.java | 13 +++
.../ingress/v1/PlaygroundIngressBinder.java | 27 +++++
.../playground/internal/io/flink/EgressRecord.java | 21 ++++
.../internal/io/flink/EgressWebServer.java | 85 ++++++++++++++
.../internal/io/flink/IngressWebServer.java | 122 +++++++++++++++++++++
.../internal/io/flink/ParseException.java | 7 ++
.../internal/io/flink/PlaygroundEgress.java | 66 +++++++++++
.../internal/io/flink/PlaygroundFlinkIoModule.java | 15 +++
.../internal/io/flink/PlaygroundIngress.java | 50 +++++++++
.../internal/io/flink/PlaygroundSinkProvider.java | 22 ++++
.../io/flink/PlaygroundSourceProvider.java | 23 ++++
.../internal/io/flink/RefCountedContainer.java | 68 ++++++++++++
.../internal/io/spec/PlaygroundEgressSpec.java | 59 ++++++++++
.../internal/io/spec/PlaygroundIngressSpec.java | 49 +++++++++
.../LocalEnvironmentEntrypointITCase.java | 13 +++
.../src/test/resources/module.yaml | 31 ++++++
23 files changed, 828 insertions(+), 1 deletion(-)
diff --git
a/playground-internal/statefun-playground-entrypoint/META-INF/services/org.apache.flink.statefun.extensions.ExtensionModule
b/playground-internal/statefun-playground-entrypoint/META-INF/services/org.apache.flink.statefun.extensions.ExtensionModule
new file mode 100644
index 0000000..0966f54
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/META-INF/services/org.apache.flink.statefun.extensions.ExtensionModule
@@ -0,0 +1,10 @@
+org.apache.flink.statefun.playground.internal.io.binders.egress.v1.Module
+org.apache.flink.statefun.playground.internal.io.binders.ingress.v1.Module
+org.apache.flink.statefun.flink.core.httpfn.TransportClientsModule
+org.apache.flink.statefun.flink.core.httpfn.binders.v1.Module
+org.apache.flink.statefun.flink.core.httpfn.binders.v2.Module
+org.apache.flink.statefun.flink.core.nettyclient.NettyTransportModule
+org.apache.flink.statefun.flink.io.kafka.binders.egress.v1.Module
+org.apache.flink.statefun.flink.io.kafka.binders.ingress.v1.Module
+org.apache.flink.statefun.flink.io.kinesis.binders.egress.v1.Module
+org.apache.flink.statefun.flink.io.kinesis.binders.ingress.v1.Module
diff --git a/playground-internal/statefun-playground-entrypoint/pom.xml
b/playground-internal/statefun-playground-entrypoint/pom.xml
index 77cd8ca..b7ccd8d 100644
--- a/playground-internal/statefun-playground-entrypoint/pom.xml
+++ b/playground-internal/statefun-playground-entrypoint/pom.xml
@@ -31,6 +31,7 @@ under the License.
<slf4j.version>1.7.35</slf4j.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
+ <auto-service.version>1.0-rc6</auto-service.version>
</properties>
<dependencies>
@@ -47,6 +48,12 @@ under the License.
<version>${statefun.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-flink-io</artifactId>
+ <version>${statefun.version}</version>
+ </dependency>
+
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -74,6 +81,18 @@ under the License.
<version>${flink.version}</version>
</dependency>
+ <!-- Misc -->
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service-annotations</artifactId>
+ <version>${auto-service.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.undertow</groupId>
+ <artifactId>undertow-core</artifactId>
+ <version>2.2.16.Final</version>
+ </dependency>
+
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
@@ -85,10 +104,36 @@ under the License.
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
+
+ <!-- Testing -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>5.8.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
+ <!-- Java compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <configuration>
+ <source>11</source>
+ <target>11</target>
+ <annotationProcessorPaths>
+ <path>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>${auto-service.version}</version>
+ </path>
+ </annotationProcessorPaths>
+ </configuration>
+ </plugin>
+
<!-- Build a fat executable jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
index 8503cf2..0e4805f 100644
---
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
@@ -74,7 +74,8 @@ public final class LocalEnvironmentEntrypoint {
if (splits.length != 2) {
throw new IllegalArgumentException(
- String.format("The '--%s' value must have the form 'key=value'",
CONFIGURATION_OPTION));
+ String.format(
+ "The '--%s' value must have the form 'key=value'",
CONFIGURATION_OPTION));
}
final String key = splits[0];
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/Constants.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/Constants.java
new file mode 100644
index 0000000..6334280
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/Constants.java
@@ -0,0 +1,29 @@
+package org.apache.flink.statefun.playground.internal.io;
+
+import com.google.protobuf.Message;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public class Constants {
+ private static final String NAMESPACE = "io.statefun.playground";
+ private static final String EGRESS = "egress";
+ private static final String INGRESS = "ingress";
+
+ public static final EgressType EGRESS_TYPE = new EgressType(NAMESPACE,
EGRESS);
+ public static final IngressType INGRESS_TYPE = new IngressType(NAMESPACE,
INGRESS);
+ public static final EgressIdentifier<TypedValue> EGRESS_IDENTIFIER =
+ new EgressIdentifier<>(NAMESPACE, EGRESS, TypedValue.class);
+ public static final IngressIdentifier<Message> INGRESS_IDENTIFIER =
+ new IngressIdentifier<>(Message.class, NAMESPACE, INGRESS);
+
+ public static final String DEFAULT_INGRESS_TYPE = "io.statefun.types/string";
+ public static final String STATEFUN_CONTENT_TYPE_PREFIX = "application/vnd.";
+ public static final String PLAYGROUND_EGRESS_RECORD =
"io.statefun.playground/EgressRecord";
+
+ private Constants() {
+ throw new UnsupportedOperationException("Should not be instantiated.");
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/Utils.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/Utils.java
new file mode 100644
index 0000000..6bf54c2
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/Utils.java
@@ -0,0 +1,34 @@
+package org.apache.flink.statefun.playground.internal.io.binders;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.extensions.ComponentJsonObject;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
+import org.apache.flink.statefun.sdk.TypeName;
+
+public class Utils {
+ private static final ObjectMapper OBJECT_MAPPER =
StateFunObjectMapper.create();
+
+ private Utils() {
+ throw new UnsupportedOperationException("Should not be instantiated.");
+ }
+
+ public static void validateComponent(ComponentJsonObject component, TypeName
expected) {
+ final TypeName typeName = component.binderTypename();
+
+ if (!typeName.equals(expected)) {
+ throw new IllegalArgumentException(
+ String.format("Binder handles types %s but was called for type %s.",
expected, typeName));
+ }
+ }
+
+ public static <T> T parseJson(JsonNode jsonNode, Class<T> clazz) {
+ try {
+ return OBJECT_MAPPER.treeToValue(jsonNode, clazz);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(
+ String.format("Could not parse the %s from %s.",
clazz.getSimpleName(), jsonNode));
+ }
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/Module.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/Module.java
new file mode 100644
index 0000000..ffec0ef
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/Module.java
@@ -0,0 +1,13 @@
+package org.apache.flink.statefun.playground.internal.io.binders.egress.v1;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.extensions.ExtensionModule;
+
+@AutoService(ExtensionModule.class)
+public final class Module implements ExtensionModule {
+ @Override
+ public void configure(Map<String, String> globalConfigurations, Binder
binder) {
+ binder.bindExtension(PlaygroundEgressBinder.KIND_TYPE,
PlaygroundEgressBinder.INSTANCE);
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/PlaygroundEgressBinder.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/PlaygroundEgressBinder.java
new file mode 100644
index 0000000..ae30a8a
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/PlaygroundEgressBinder.java
@@ -0,0 +1,24 @@
+package org.apache.flink.statefun.playground.internal.io.binders.egress.v1;
+
+import org.apache.flink.statefun.extensions.ComponentBinder;
+import org.apache.flink.statefun.extensions.ComponentJsonObject;
+import org.apache.flink.statefun.playground.internal.io.binders.Utils;
+import
org.apache.flink.statefun.playground.internal.io.spec.PlaygroundEgressSpec;
+import org.apache.flink.statefun.sdk.TypeName;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class PlaygroundEgressBinder implements ComponentBinder {
+
+ static final PlaygroundEgressBinder INSTANCE = new PlaygroundEgressBinder();
+ static final TypeName KIND_TYPE =
TypeName.parseFrom("io.statefun.playground.v1/egress");
+
+ @Override
+ public void bind(
+ ComponentJsonObject component, StatefulFunctionModule.Binder
remoteModuleBinder) {
+ Utils.validateComponent(component, KIND_TYPE);
+ final PlaygroundEgressSpec playgroundEgressSpec =
+ Utils.parseJson(component.specJsonNode(), PlaygroundEgressSpec.class);
+
+ remoteModuleBinder.bindEgress(playgroundEgressSpec);
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/Module.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/Module.java
new file mode 100644
index 0000000..e83db60
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/Module.java
@@ -0,0 +1,13 @@
+package org.apache.flink.statefun.playground.internal.io.binders.ingress.v1;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.extensions.ExtensionModule;
+
+@AutoService(ExtensionModule.class)
+public final class Module implements ExtensionModule {
+ @Override
+ public void configure(Map<String, String> globalConfigurations, Binder
binder) {
+ binder.bindExtension(PlaygroundIngressBinder.KIND_TYPE,
PlaygroundIngressBinder.INSTANCE);
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/PlaygroundIngressBinder.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/PlaygroundIngressBinder.java
new file mode 100644
index 0000000..8539a64
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/PlaygroundIngressBinder.java
@@ -0,0 +1,27 @@
+package org.apache.flink.statefun.playground.internal.io.binders.ingress.v1;
+
+import org.apache.flink.statefun.extensions.ComponentBinder;
+import org.apache.flink.statefun.extensions.ComponentJsonObject;
+import org.apache.flink.statefun.flink.io.common.AutoRoutableProtobufRouter;
+import org.apache.flink.statefun.playground.internal.io.binders.Utils;
+import
org.apache.flink.statefun.playground.internal.io.spec.PlaygroundIngressSpec;
+import org.apache.flink.statefun.sdk.TypeName;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class PlaygroundIngressBinder implements ComponentBinder {
+
+ static final PlaygroundIngressBinder INSTANCE = new
PlaygroundIngressBinder();
+ static final TypeName KIND_TYPE =
TypeName.parseFrom("io.statefun.playground.v1/ingress");
+
+ @Override
+ public void bind(
+ ComponentJsonObject component, StatefulFunctionModule.Binder
remoteModuleBinder) {
+ Utils.validateComponent(component, KIND_TYPE);
+ final PlaygroundIngressSpec playgroundIngressSpec =
+ Utils.parseJson(component.specJsonNode(), PlaygroundIngressSpec.class);
+
+ remoteModuleBinder.bindIngress(playgroundIngressSpec);
+ remoteModuleBinder.bindIngressRouter(
+ playgroundIngressSpec.id(), new AutoRoutableProtobufRouter());
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressRecord.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressRecord.java
new file mode 100644
index 0000000..e53d2f2
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressRecord.java
@@ -0,0 +1,21 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EgressRecord {
+ @JsonProperty("topic")
+ private String topic;
+
+ @JsonProperty("payload")
+ private String payload;
+
+ public EgressRecord() {}
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressWebServer.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressWebServer.java
new file mode 100644
index 0000000..dc29fa9
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressWebServer.java
@@ -0,0 +1,85 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import com.google.protobuf.ByteString;
+import io.undertow.Undertow;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.Headers;
+import io.undertow.util.StatusCodes;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class EgressWebServer implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(EgressWebServer.class);
+
+ private final ConcurrentMap<String, BlockingQueue<ByteString>> queues;
+ private final Undertow server;
+
+ EgressWebServer(int port, Set<String> topics) {
+ this.queues = createQueues(topics);
+ this.server =
+ Undertow.builder()
+ .addHttpListener(port, "0.0.0.0")
+ .setHandler(new EgressHttpHandler(queues))
+ .build();
+
+ server.start();
+ }
+
+ private static ConcurrentMap<String, BlockingQueue<ByteString>>
createQueues(Set<String> topics) {
+ return topics.stream()
+ .collect(
+ Collectors.toConcurrentMap(
+ Function.identity(), ignored -> new ArrayBlockingQueue<>(1 <<
20)));
+ }
+
+ @Override
+ public void close() {
+ server.stop();
+ }
+
+ public void offer(String topic, ByteString message) {
+ if (!Preconditions.checkNotNull(queues.get(topic)).offer(message)) {
+ LOG.info(
+ "Dropping message {} for topic {} because the queue is currently
full.", message, topic);
+ }
+ }
+
+ private static final class EgressHttpHandler implements HttpHandler {
+ private final Map<String, BlockingQueue<ByteString>> queues;
+
+ private EgressHttpHandler(Map<String, BlockingQueue<ByteString>> queues) {
+ this.queues = queues;
+ }
+
+ @Override
+ public void handleRequest(HttpServerExchange exchange) {
+ final String topic = exchange.getRelativePath().substring(1);
+
+ final BlockingQueue<ByteString> queue = queues.get(topic);
+
+ if (queue != null) {
+ final ByteString message = queue.poll();
+
+ if (message != null) {
+ exchange.getResponseHeaders().put(Headers.STATUS, StatusCodes.OK);
+ exchange.getResponseSender().send(message.asReadOnlyByteBuffer());
+ } else {
+ exchange.getResponseHeaders().put(Headers.STATUS,
StatusCodes.NOT_FOUND);
+ }
+ } else {
+ exchange.getResponseHeaders().put(Headers.STATUS,
StatusCodes.METHOD_NOT_ALLOWED);
+ }
+
+ exchange.endExchange();
+ }
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/IngressWebServer.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/IngressWebServer.java
new file mode 100644
index 0000000..dc932b1
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/IngressWebServer.java
@@ -0,0 +1,122 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import com.google.protobuf.ByteString;
+import io.undertow.Undertow;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.HeaderMap;
+import io.undertow.util.HeaderValues;
+import io.undertow.util.Headers;
+import io.undertow.util.StatusCodes;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+
+class IngressWebServer {
+ private final Undertow server;
+
+ IngressWebServer(int port, BlockingQueue<AutoRoutable> messageQueue) {
+ this.server =
+ Undertow.builder()
+ .addHttpListener(port, "0.0.0.0")
+ .setHandler(new IngressHttpHandler(messageQueue))
+ .build();
+
+ server.start();
+ }
+
+ void stop() {
+ server.stop();
+ }
+
+ private static final class IngressHttpHandler implements HttpHandler {
+ private final BlockingQueue<AutoRoutable> messageQueue;
+
+ private IngressHttpHandler(BlockingQueue<AutoRoutable> messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ @Override
+ public void handleRequest(HttpServerExchange httpServerExchange) {
+
httpServerExchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
+ }
+
+ private void onRequestBody(HttpServerExchange exchange, byte[] payload) {
+ exchange.dispatch();
+
+ try {
+ final Address address = parseAddress(exchange.getRelativePath());
+ final String typeUrl = parseTypeUrl(exchange.getRequestHeaders());
+
+ final RoutingConfig routingConfig = createRoutingConfig(address,
typeUrl);
+
+ final AutoRoutable autoRoutable = createAutoRoutable(payload, address,
routingConfig);
+
+ messageQueue.put(autoRoutable);
+ exchange.getResponseHeaders().put(Headers.STATUS, StatusCodes.OK);
+ } catch (ParseException | InterruptedException e) {
+ e.printStackTrace(System.out);
+ exchange.getResponseHeaders().put(Headers.STATUS,
StatusCodes.INTERNAL_SERVER_ERROR);
+ }
+
+ exchange.endExchange();
+ }
+
+ private AutoRoutable createAutoRoutable(
+ byte[] payload, Address address, RoutingConfig routingConfig) {
+ return AutoRoutable.newBuilder()
+ .setId(address.key)
+ .setConfig(routingConfig)
+ .setPayloadBytes(ByteString.copyFrom(payload))
+ .build();
+ }
+
+ private RoutingConfig createRoutingConfig(Address address, String typeUrl)
{
+ return RoutingConfig.newBuilder()
+ .setTypeUrl(typeUrl)
+ .addAllTargetFunctionTypes(
+ Collections.singleton(
+ TargetFunctionType.newBuilder()
+ .setNamespace(address.namespace)
+ .setType(address.functionType)
+ .build()))
+ .build();
+ }
+
+ private String parseTypeUrl(HeaderMap requestHeaders) {
+ final HeaderValues headerValues =
requestHeaders.get(Headers.CONTENT_TYPE);
+
+ return headerValues.stream()
+ .filter(value ->
value.startsWith(Constants.STATEFUN_CONTENT_TYPE_PREFIX))
+ .findFirst()
+ .map(type ->
type.substring(Constants.STATEFUN_CONTENT_TYPE_PREFIX.length()))
+ .orElse(Constants.DEFAULT_INGRESS_TYPE);
+ }
+
+ private Address parseAddress(String relativePath) throws ParseException {
+ final String[] split = relativePath.substring(1).split("/");
+
+ if (split.length != 3) {
+ throw new ParseException(
+ "Invalid URL. Please specify
'/namespace/function_type/function_id");
+ }
+
+ return new Address(split[0], split[1], split[2]);
+ }
+ }
+
+ private static final class Address {
+ final String namespace;
+ final String functionType;
+ final String key;
+
+ private Address(String namespace, String functionType, String key) {
+ this.namespace = namespace;
+ this.functionType = functionType;
+ this.key = key;
+ }
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/ParseException.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/ParseException.java
new file mode 100644
index 0000000..1666a6f
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/ParseException.java
@@ -0,0 +1,7 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+class ParseException extends Exception {
+ ParseException(String message) {
+ super(message);
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundEgress.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundEgress.java
new file mode 100644
index 0000000..d98ff8c
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundEgress.java
@@ -0,0 +1,66 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.ByteString;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+class PlaygroundEgress<T> extends RichSinkFunction<T> {
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final RefCountedContainer<EgressWebServer> container = new
RefCountedContainer<>();
+
+ private final int port;
+ private final Set<String> topics;
+
+ private transient RefCountedContainer<EgressWebServer>.Lease handle;
+
+ PlaygroundEgress(int port, Set<String> topics) {
+ this.port = port;
+ this.topics = new HashSet<>(topics);
+ this.handle = null;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.handle = container.getOrCreate(() -> new EgressWebServer(port,
topics));
+ }
+
+ @Override
+ public void invoke(T value, Context context) throws Exception {
+ final EgressRecord egressRecord = asEgressRecord(value);
+
+ final String topic = egressRecord.getTopic();
+
+ if (!topics.contains(topic)) {
+ throw new IllegalArgumentException(
+ String.format("Message was targeted to unknown topic %s.", topic));
+ }
+
+ handle.deref().offer(topic,
ByteString.copyFromUtf8(egressRecord.getPayload()));
+ }
+
+ private EgressRecord asEgressRecord(T value) throws IOException {
+ if (value instanceof TypedValue) {
+ final TypedValue typedValue = ((TypedValue) value);
+
+ if (typedValue.getTypename().equals(Constants.PLAYGROUND_EGRESS_RECORD))
{
+ return objectMapper.readValue(typedValue.getValue().toByteArray(),
EgressRecord.class);
+ }
+ }
+ throw new IllegalArgumentException(String.format("Received unexpected
value %s.", value));
+ }
+
+ @Override
+ public void finish() throws Exception {
+ super.finish();
+ handle.close();
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundFlinkIoModule.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundFlinkIoModule.java
new file mode 100644
index 0000000..6c3357b
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundFlinkIoModule.java
@@ -0,0 +1,15 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+
+@AutoService(FlinkIoModule.class)
+public class PlaygroundFlinkIoModule implements FlinkIoModule {
+ @Override
+ public void configure(Map<String, String> globalConfiguration, Binder
binder) {
+ binder.bindSourceProvider(Constants.INGRESS_TYPE, new
PlaygroundSourceProvider());
+ binder.bindSinkProvider(Constants.EGRESS_TYPE, new
PlaygroundSinkProvider());
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundIngress.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundIngress.java
new file mode 100644
index 0000000..e1b2be9
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundIngress.java
@@ -0,0 +1,50 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+class PlaygroundIngress<T> extends RichSourceFunction<T> {
+
+ private final int port;
+ private final BlockingQueue<AutoRoutable> messageQueue;
+ private transient IngressWebServer server;
+
+ private volatile boolean running = true;
+
+ PlaygroundIngress(int port) {
+ this.port = port;
+ this.messageQueue = new ArrayBlockingQueue<>(1 << 20);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.server = new IngressWebServer(port, messageQueue);
+ }
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+ while (running) {
+ final AutoRoutable message = messageQueue.poll(50L,
TimeUnit.MILLISECONDS);
+
+ if (message != null) {
+ synchronized (sourceContext.getCheckpointLock()) {
+ sourceContext.collect((T) message);
+ }
+ }
+ }
+
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSinkProvider.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSinkProvider.java
new file mode 100644
index 0000000..d40b89b
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSinkProvider.java
@@ -0,0 +1,22 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import org.apache.flink.statefun.flink.io.spi.SinkProvider;
+import
org.apache.flink.statefun.playground.internal.io.spec.PlaygroundEgressSpec;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+public class PlaygroundSinkProvider implements SinkProvider {
+ @Override
+ public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
+ final PlaygroundEgressSpec egressSpec = asPlaygroundEgressSpec(spec);
+ return new PlaygroundEgress<>(egressSpec.getPort(),
egressSpec.getTopics());
+ }
+
+ private static <T> PlaygroundEgressSpec asPlaygroundEgressSpec(EgressSpec<T>
spec) {
+ if (spec instanceof PlaygroundEgressSpec) {
+ return (PlaygroundEgressSpec) spec;
+ }
+
+ throw new IllegalArgumentException(String.format("Unknown egress spec
%s.", spec));
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSourceProvider.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSourceProvider.java
new file mode 100644
index 0000000..1d0f06b
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSourceProvider.java
@@ -0,0 +1,23 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import org.apache.flink.statefun.flink.io.spi.SourceProvider;
+import
org.apache.flink.statefun.playground.internal.io.spec.PlaygroundIngressSpec;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+public class PlaygroundSourceProvider implements SourceProvider {
+ @Override
+ public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
+ final PlaygroundIngressSpec ingressSpec = asPlaygroundIngressSpec(spec);
+
+ return new PlaygroundIngress<T>(ingressSpec.getPort());
+ }
+
+ private static <T> PlaygroundIngressSpec
asPlaygroundIngressSpec(IngressSpec<T> spec) {
+ if (spec instanceof PlaygroundIngressSpec) {
+ return (PlaygroundIngressSpec) spec;
+ }
+
+ throw new IllegalArgumentException(String.format("Unknown ingress spec
%s", spec));
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/RefCountedContainer.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/RefCountedContainer.java
new file mode 100644
index 0000000..433217e
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/RefCountedContainer.java
@@ -0,0 +1,68 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.flink.util.Preconditions;
+
+final class RefCountedContainer<T extends AutoCloseable> {
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ @Nullable
+ private T value;
+
+ @GuardedBy("lock")
+ private int refCounter;
+
+ RefCountedContainer() {
+ this.value = null;
+ this.refCounter = 0;
+ }
+
+ Lease getOrCreate(Supplier<T> supplier) {
+ synchronized (lock) {
+ if (value == null) {
+ value = supplier.get();
+ }
+
+ return new Lease();
+ }
+ }
+
+ class Lease implements AutoCloseable {
+
+ private boolean valid;
+
+ private Lease() {
+ valid = true;
+ synchronized (lock) {
+ refCounter += 1;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (valid) {
+ synchronized (lock) {
+ refCounter -= 1;
+
+ if (refCounter == 0 && value != null) {
+ value.close();
+ value = null;
+ }
+ }
+
+ valid = false;
+ }
+ }
+
+ T deref() {
+ Preconditions.checkState(valid, "Lease is no longer valid");
+ synchronized (lock) {
+ return Preconditions.checkNotNull(
+ value, "Value is null while there are still valid leases. This
should not happen.");
+ }
+ }
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundEgressSpec.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundEgressSpec.java
new file mode 100644
index 0000000..b8368da
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundEgressSpec.java
@@ -0,0 +1,59 @@
+package org.apache.flink.statefun.playground.internal.io.spec;
+
+import java.util.Collections;
+import java.util.Set;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@JsonDeserialize(builder = PlaygroundEgressSpec.Builder.class)
+public class PlaygroundEgressSpec implements EgressSpec<TypedValue> {
+
+ private final int port;
+ private final Set<String> topics;
+
+ private PlaygroundEgressSpec(int port, Set<String> topics) {
+ this.port = port;
+ this.topics = topics;
+ }
+
+ @Override
+ public EgressIdentifier<TypedValue> id() {
+ return Constants.EGRESS_IDENTIFIER;
+ }
+
+ @Override
+ public EgressType type() {
+ return Constants.EGRESS_TYPE;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public Set<String> getTopics() {
+ return Collections.unmodifiableSet(topics);
+ }
+
+ @JsonPOJOBuilder
+ public static final class Builder {
+ private final int port;
+ private final Set<String> topics;
+
+ @JsonCreator
+ private Builder(@JsonProperty("port") int port, @JsonProperty("topics")
Set<String> topics) {
+ this.port = port;
+ this.topics = topics;
+ }
+
+ public PlaygroundEgressSpec build() {
+ return new PlaygroundEgressSpec(port, topics);
+ }
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundIngressSpec.java
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundIngressSpec.java
new file mode 100644
index 0000000..d5d7d92
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundIngressSpec.java
@@ -0,0 +1,49 @@
+package org.apache.flink.statefun.playground.internal.io.spec;
+
+import com.google.protobuf.Message;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+
+@JsonDeserialize(builder = PlaygroundIngressSpec.Builder.class)
+public class PlaygroundIngressSpec implements IngressSpec<Message> {
+
+ private final int port;
+
+ private PlaygroundIngressSpec(int port) {
+ this.port = port;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public IngressIdentifier<Message> id() {
+ return Constants.INGRESS_IDENTIFIER;
+ }
+
+ @Override
+ public IngressType type() {
+ return Constants.INGRESS_TYPE;
+ }
+
+ @JsonPOJOBuilder
+ public static final class Builder {
+ private final int port;
+
+ @JsonCreator
+ private Builder(@JsonProperty("port") int port) {
+ this.port = port;
+ }
+
+ public PlaygroundIngressSpec build() {
+ return new PlaygroundIngressSpec(port);
+ }
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/test/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypointITCase.java
b/playground-internal/statefun-playground-entrypoint/src/test/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypointITCase.java
new file mode 100644
index 0000000..5cfe016
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/test/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypointITCase.java
@@ -0,0 +1,13 @@
+package org.apache.flink.statefun.playground.internal.entrypoint;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+public class LocalEnvironmentEntrypointITCase {
+
+ @Disabled("Test for manual verification")
+ @Test
+ public void testLocalEnvironmentEntrypoint() throws Exception {
+ LocalEnvironmentEntrypoint.main(new String[] {"--module",
"classpath:module.yaml"});
+ }
+}
diff --git
a/playground-internal/statefun-playground-entrypoint/src/test/resources/module.yaml
b/playground-internal/statefun-playground-entrypoint/src/test/resources/module.yaml
new file mode 100644
index 0000000..466be42
--- /dev/null
+++
b/playground-internal/statefun-playground-entrypoint/src/test/resources/module.yaml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kind: io.statefun.endpoints.v2/http
+spec:
+ functions: greeter.fns/*
+ urlPathTemplate: http://localhost:1108/
+ transport:
+ type: io.statefun.transports.v1/async
+---
+kind: io.statefun.playground.v1/ingress
+spec:
+ port: 8090
+---
+kind: io.statefun.playground.v1/egress
+spec:
+ port: 8091
+ topics:
+ - greetings