This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git


The following commit(s) were added to refs/heads/release-3.2 by this push:
     new 1ba46ba  [FLINK-26155] Add Playground ingress/egress that allows to 
ingest and consume messages via curl
1ba46ba is described below

commit 1ba46baa2c645bc70883c5f435fee96a6b576230
Author: Till Rohrmann <[email protected]>
AuthorDate: Sat Feb 12 17:48:50 2022 +0100

    [FLINK-26155] Add Playground ingress/egress that allows to ingest and 
consume messages via curl
    
    The PlaygroundIngress and PlaygroundIngress spin up a web server that 
allows to ingest and consume
    messages that are kept in memory via curl.
    
    The way to ingest new messages is by using curl:
    
    curl -X PUT -H "Content-Type: application/vnd.<TYPE_URL>" -d '<DATA>' 
localhost:8090/<NAMESPACE>/<FUNCTION>/<ID>
    
    Messages can be consumed from the egress via curl:
    
    curl -X GET localhost:8091/<TOPIC>
    
    In order to use the ingress it needs to be configured in the module.yaml 
via:
    
    kind: io.statefun.playground.v1/ingress
    spec:
      port: 8090
    
    In order to use the egress it needs to be configured in the module.yaml via:
    
    kind: io.statefun.playground.v1/egress
    spec:
      port: 8091
      topics:
        - greetings
    
    This closes #25.
---
 ...pache.flink.statefun.extensions.ExtensionModule |  10 ++
 .../statefun-playground-entrypoint/pom.xml         |  45 ++++++++
 .../entrypoint/LocalEnvironmentEntrypoint.java     |   3 +-
 .../statefun/playground/internal/io/Constants.java |  29 +++++
 .../playground/internal/io/binders/Utils.java      |  34 ++++++
 .../internal/io/binders/egress/v1/Module.java      |  13 +++
 .../binders/egress/v1/PlaygroundEgressBinder.java  |  24 ++++
 .../internal/io/binders/ingress/v1/Module.java     |  13 +++
 .../ingress/v1/PlaygroundIngressBinder.java        |  27 +++++
 .../playground/internal/io/flink/EgressRecord.java |  21 ++++
 .../internal/io/flink/EgressWebServer.java         |  85 ++++++++++++++
 .../internal/io/flink/IngressWebServer.java        | 122 +++++++++++++++++++++
 .../internal/io/flink/ParseException.java          |   7 ++
 .../internal/io/flink/PlaygroundEgress.java        |  66 +++++++++++
 .../internal/io/flink/PlaygroundFlinkIoModule.java |  15 +++
 .../internal/io/flink/PlaygroundIngress.java       |  50 +++++++++
 .../internal/io/flink/PlaygroundSinkProvider.java  |  22 ++++
 .../io/flink/PlaygroundSourceProvider.java         |  23 ++++
 .../internal/io/flink/RefCountedContainer.java     |  68 ++++++++++++
 .../internal/io/spec/PlaygroundEgressSpec.java     |  59 ++++++++++
 .../internal/io/spec/PlaygroundIngressSpec.java    |  49 +++++++++
 .../LocalEnvironmentEntrypointITCase.java          |  13 +++
 .../src/test/resources/module.yaml                 |  31 ++++++
 23 files changed, 828 insertions(+), 1 deletion(-)

diff --git 
a/playground-internal/statefun-playground-entrypoint/META-INF/services/org.apache.flink.statefun.extensions.ExtensionModule
 
b/playground-internal/statefun-playground-entrypoint/META-INF/services/org.apache.flink.statefun.extensions.ExtensionModule
new file mode 100644
index 0000000..0966f54
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/META-INF/services/org.apache.flink.statefun.extensions.ExtensionModule
@@ -0,0 +1,10 @@
+org.apache.flink.statefun.playground.internal.io.binders.egress.v1.Module
+org.apache.flink.statefun.playground.internal.io.binders.ingress.v1.Module
+org.apache.flink.statefun.flink.core.httpfn.TransportClientsModule
+org.apache.flink.statefun.flink.core.httpfn.binders.v1.Module
+org.apache.flink.statefun.flink.core.httpfn.binders.v2.Module
+org.apache.flink.statefun.flink.core.nettyclient.NettyTransportModule
+org.apache.flink.statefun.flink.io.kafka.binders.egress.v1.Module
+org.apache.flink.statefun.flink.io.kafka.binders.ingress.v1.Module
+org.apache.flink.statefun.flink.io.kinesis.binders.egress.v1.Module
+org.apache.flink.statefun.flink.io.kinesis.binders.ingress.v1.Module
diff --git a/playground-internal/statefun-playground-entrypoint/pom.xml 
b/playground-internal/statefun-playground-entrypoint/pom.xml
index 77cd8ca..b7ccd8d 100644
--- a/playground-internal/statefun-playground-entrypoint/pom.xml
+++ b/playground-internal/statefun-playground-entrypoint/pom.xml
@@ -31,6 +31,7 @@ under the License.
         <slf4j.version>1.7.35</slf4j.version>
         <maven.compiler.source>11</maven.compiler.source>
         <maven.compiler.target>11</maven.compiler.target>
+        <auto-service.version>1.0-rc6</auto-service.version>
     </properties>
 
     <dependencies>
@@ -47,6 +48,12 @@ under the License.
             <version>${statefun.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-io</artifactId>
+            <version>${statefun.version}</version>
+        </dependency>
+
         <!-- Flink dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -74,6 +81,18 @@ under the License.
             <version>${flink.version}</version>
         </dependency>
 
+        <!-- Misc -->
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service-annotations</artifactId>
+            <version>${auto-service.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.undertow</groupId>
+            <artifactId>undertow-core</artifactId>
+            <version>2.2.16.Final</version>
+        </dependency>
+
         <!-- Logging -->
         <dependency>
             <groupId>org.slf4j</groupId>
@@ -85,10 +104,36 @@ under the License.
             <artifactId>slf4j-log4j12</artifactId>
             <version>${slf4j.version}</version>
         </dependency>
+
+        <!-- Testing -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>5.8.1</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
+            <!-- Java compiler -->
+            <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <version>3.8.1</version>
+            <configuration>
+                <source>11</source>
+                <target>11</target>
+                <annotationProcessorPaths>
+                    <path>
+                        <groupId>com.google.auto.service</groupId>
+                        <artifactId>auto-service</artifactId>
+                        <version>${auto-service.version}</version>
+                    </path>
+                </annotationProcessorPaths>
+            </configuration>
+            </plugin>
+
             <!-- Build a fat executable jar -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
index 8503cf2..0e4805f 100644
--- 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
@@ -74,7 +74,8 @@ public final class LocalEnvironmentEntrypoint {
 
         if (splits.length != 2) {
           throw new IllegalArgumentException(
-              String.format("The '--%s' value must have the form 'key=value'", 
CONFIGURATION_OPTION));
+              String.format(
+                  "The '--%s' value must have the form 'key=value'", 
CONFIGURATION_OPTION));
         }
 
         final String key = splits[0];
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/Constants.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/Constants.java
new file mode 100644
index 0000000..6334280
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/Constants.java
@@ -0,0 +1,29 @@
+package org.apache.flink.statefun.playground.internal.io;
+
+import com.google.protobuf.Message;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public class Constants {
+  private static final String NAMESPACE = "io.statefun.playground";
+  private static final String EGRESS = "egress";
+  private static final String INGRESS = "ingress";
+
+  public static final EgressType EGRESS_TYPE = new EgressType(NAMESPACE, 
EGRESS);
+  public static final IngressType INGRESS_TYPE = new IngressType(NAMESPACE, 
INGRESS);
+  public static final EgressIdentifier<TypedValue> EGRESS_IDENTIFIER =
+      new EgressIdentifier<>(NAMESPACE, EGRESS, TypedValue.class);
+  public static final IngressIdentifier<Message> INGRESS_IDENTIFIER =
+      new IngressIdentifier<>(Message.class, NAMESPACE, INGRESS);
+
+  public static final String DEFAULT_INGRESS_TYPE = "io.statefun.types/string";
+  public static final String STATEFUN_CONTENT_TYPE_PREFIX = "application/vnd.";
+  public static final String PLAYGROUND_EGRESS_RECORD = 
"io.statefun.playground/EgressRecord";
+
+  private Constants() {
+    throw new UnsupportedOperationException("Should not be instantiated.");
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/Utils.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/Utils.java
new file mode 100644
index 0000000..6bf54c2
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/Utils.java
@@ -0,0 +1,34 @@
+package org.apache.flink.statefun.playground.internal.io.binders;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.extensions.ComponentJsonObject;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
+import org.apache.flink.statefun.sdk.TypeName;
+
+public class Utils {
+  private static final ObjectMapper OBJECT_MAPPER = 
StateFunObjectMapper.create();
+
+  private Utils() {
+    throw new UnsupportedOperationException("Should not be instantiated.");
+  }
+
+  public static void validateComponent(ComponentJsonObject component, TypeName 
expected) {
+    final TypeName typeName = component.binderTypename();
+
+    if (!typeName.equals(expected)) {
+      throw new IllegalArgumentException(
+          String.format("Binder handles types %s but was called for type %s.", 
expected, typeName));
+    }
+  }
+
+  public static <T> T parseJson(JsonNode jsonNode, Class<T> clazz) {
+    try {
+      return OBJECT_MAPPER.treeToValue(jsonNode, clazz);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(
+          String.format("Could not parse the %s from %s.", 
clazz.getSimpleName(), jsonNode));
+    }
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/Module.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/Module.java
new file mode 100644
index 0000000..ffec0ef
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/Module.java
@@ -0,0 +1,13 @@
+package org.apache.flink.statefun.playground.internal.io.binders.egress.v1;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.extensions.ExtensionModule;
+
+@AutoService(ExtensionModule.class)
+public final class Module implements ExtensionModule {
+  @Override
+  public void configure(Map<String, String> globalConfigurations, Binder 
binder) {
+    binder.bindExtension(PlaygroundEgressBinder.KIND_TYPE, 
PlaygroundEgressBinder.INSTANCE);
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/PlaygroundEgressBinder.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/PlaygroundEgressBinder.java
new file mode 100644
index 0000000..ae30a8a
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/egress/v1/PlaygroundEgressBinder.java
@@ -0,0 +1,24 @@
+package org.apache.flink.statefun.playground.internal.io.binders.egress.v1;
+
+import org.apache.flink.statefun.extensions.ComponentBinder;
+import org.apache.flink.statefun.extensions.ComponentJsonObject;
+import org.apache.flink.statefun.playground.internal.io.binders.Utils;
+import 
org.apache.flink.statefun.playground.internal.io.spec.PlaygroundEgressSpec;
+import org.apache.flink.statefun.sdk.TypeName;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class PlaygroundEgressBinder implements ComponentBinder {
+
+  static final PlaygroundEgressBinder INSTANCE = new PlaygroundEgressBinder();
+  static final TypeName KIND_TYPE = 
TypeName.parseFrom("io.statefun.playground.v1/egress");
+
+  @Override
+  public void bind(
+      ComponentJsonObject component, StatefulFunctionModule.Binder 
remoteModuleBinder) {
+    Utils.validateComponent(component, KIND_TYPE);
+    final PlaygroundEgressSpec playgroundEgressSpec =
+        Utils.parseJson(component.specJsonNode(), PlaygroundEgressSpec.class);
+
+    remoteModuleBinder.bindEgress(playgroundEgressSpec);
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/Module.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/Module.java
new file mode 100644
index 0000000..e83db60
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/Module.java
@@ -0,0 +1,13 @@
+package org.apache.flink.statefun.playground.internal.io.binders.ingress.v1;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.extensions.ExtensionModule;
+
+@AutoService(ExtensionModule.class)
+public final class Module implements ExtensionModule {
+  @Override
+  public void configure(Map<String, String> globalConfigurations, Binder 
binder) {
+    binder.bindExtension(PlaygroundIngressBinder.KIND_TYPE, 
PlaygroundIngressBinder.INSTANCE);
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/PlaygroundIngressBinder.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/PlaygroundIngressBinder.java
new file mode 100644
index 0000000..8539a64
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/binders/ingress/v1/PlaygroundIngressBinder.java
@@ -0,0 +1,27 @@
+package org.apache.flink.statefun.playground.internal.io.binders.ingress.v1;
+
+import org.apache.flink.statefun.extensions.ComponentBinder;
+import org.apache.flink.statefun.extensions.ComponentJsonObject;
+import org.apache.flink.statefun.flink.io.common.AutoRoutableProtobufRouter;
+import org.apache.flink.statefun.playground.internal.io.binders.Utils;
+import 
org.apache.flink.statefun.playground.internal.io.spec.PlaygroundIngressSpec;
+import org.apache.flink.statefun.sdk.TypeName;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+public class PlaygroundIngressBinder implements ComponentBinder {
+
+  static final PlaygroundIngressBinder INSTANCE = new 
PlaygroundIngressBinder();
+  static final TypeName KIND_TYPE = 
TypeName.parseFrom("io.statefun.playground.v1/ingress");
+
+  @Override
+  public void bind(
+      ComponentJsonObject component, StatefulFunctionModule.Binder 
remoteModuleBinder) {
+    Utils.validateComponent(component, KIND_TYPE);
+    final PlaygroundIngressSpec playgroundIngressSpec =
+        Utils.parseJson(component.specJsonNode(), PlaygroundIngressSpec.class);
+
+    remoteModuleBinder.bindIngress(playgroundIngressSpec);
+    remoteModuleBinder.bindIngressRouter(
+        playgroundIngressSpec.id(), new AutoRoutableProtobufRouter());
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressRecord.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressRecord.java
new file mode 100644
index 0000000..e53d2f2
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressRecord.java
@@ -0,0 +1,21 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EgressRecord {
+    @JsonProperty("topic")
+    private String topic;
+
+    @JsonProperty("payload")
+    private String payload;
+
+    public EgressRecord() {}
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public String getPayload() {
+        return payload;
+    }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressWebServer.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressWebServer.java
new file mode 100644
index 0000000..dc29fa9
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressWebServer.java
@@ -0,0 +1,85 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import com.google.protobuf.ByteString;
+import io.undertow.Undertow;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.Headers;
+import io.undertow.util.StatusCodes;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class EgressWebServer implements AutoCloseable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(EgressWebServer.class);
+
+  private final ConcurrentMap<String, BlockingQueue<ByteString>> queues;
+  private final Undertow server;
+
+  EgressWebServer(int port, Set<String> topics) {
+    this.queues = createQueues(topics);
+    this.server =
+        Undertow.builder()
+            .addHttpListener(port, "0.0.0.0")
+            .setHandler(new EgressHttpHandler(queues))
+            .build();
+
+    server.start();
+  }
+
+  private static ConcurrentMap<String, BlockingQueue<ByteString>> 
createQueues(Set<String> topics) {
+    return topics.stream()
+        .collect(
+            Collectors.toConcurrentMap(
+                Function.identity(), ignored -> new ArrayBlockingQueue<>(1 << 
20)));
+  }
+
+  @Override
+  public void close() {
+    server.stop();
+  }
+
+  public void offer(String topic, ByteString message) {
+    if (!Preconditions.checkNotNull(queues.get(topic)).offer(message)) {
+      LOG.info(
+          "Dropping message {} for topic {} because the queue is currently 
full.", message, topic);
+    }
+  }
+
+  private static final class EgressHttpHandler implements HttpHandler {
+    private final Map<String, BlockingQueue<ByteString>> queues;
+
+    private EgressHttpHandler(Map<String, BlockingQueue<ByteString>> queues) {
+      this.queues = queues;
+    }
+
+    @Override
+    public void handleRequest(HttpServerExchange exchange) {
+      final String topic = exchange.getRelativePath().substring(1);
+
+      final BlockingQueue<ByteString> queue = queues.get(topic);
+
+      if (queue != null) {
+        final ByteString message = queue.poll();
+
+        if (message != null) {
+          exchange.getResponseHeaders().put(Headers.STATUS, StatusCodes.OK);
+          exchange.getResponseSender().send(message.asReadOnlyByteBuffer());
+        } else {
+          exchange.getResponseHeaders().put(Headers.STATUS, 
StatusCodes.NOT_FOUND);
+        }
+      } else {
+        exchange.getResponseHeaders().put(Headers.STATUS, 
StatusCodes.METHOD_NOT_ALLOWED);
+      }
+
+      exchange.endExchange();
+    }
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/IngressWebServer.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/IngressWebServer.java
new file mode 100644
index 0000000..dc932b1
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/IngressWebServer.java
@@ -0,0 +1,122 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import com.google.protobuf.ByteString;
+import io.undertow.Undertow;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.HeaderMap;
+import io.undertow.util.HeaderValues;
+import io.undertow.util.Headers;
+import io.undertow.util.StatusCodes;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
+import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+
+class IngressWebServer {
+  private final Undertow server;
+
+  IngressWebServer(int port, BlockingQueue<AutoRoutable> messageQueue) {
+    this.server =
+        Undertow.builder()
+            .addHttpListener(port, "0.0.0.0")
+            .setHandler(new IngressHttpHandler(messageQueue))
+            .build();
+
+    server.start();
+  }
+
+  void stop() {
+    server.stop();
+  }
+
+  private static final class IngressHttpHandler implements HttpHandler {
+    private final BlockingQueue<AutoRoutable> messageQueue;
+
+    private IngressHttpHandler(BlockingQueue<AutoRoutable> messageQueue) {
+      this.messageQueue = messageQueue;
+    }
+
+    @Override
+    public void handleRequest(HttpServerExchange httpServerExchange) {
+      
httpServerExchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
+    }
+
+    private void onRequestBody(HttpServerExchange exchange, byte[] payload) {
+      exchange.dispatch();
+
+      try {
+        final Address address = parseAddress(exchange.getRelativePath());
+        final String typeUrl = parseTypeUrl(exchange.getRequestHeaders());
+
+        final RoutingConfig routingConfig = createRoutingConfig(address, 
typeUrl);
+
+        final AutoRoutable autoRoutable = createAutoRoutable(payload, address, 
routingConfig);
+
+        messageQueue.put(autoRoutable);
+        exchange.getResponseHeaders().put(Headers.STATUS, StatusCodes.OK);
+      } catch (ParseException | InterruptedException e) {
+        e.printStackTrace(System.out);
+        exchange.getResponseHeaders().put(Headers.STATUS, 
StatusCodes.INTERNAL_SERVER_ERROR);
+      }
+
+      exchange.endExchange();
+    }
+
+    private AutoRoutable createAutoRoutable(
+        byte[] payload, Address address, RoutingConfig routingConfig) {
+      return AutoRoutable.newBuilder()
+          .setId(address.key)
+          .setConfig(routingConfig)
+          .setPayloadBytes(ByteString.copyFrom(payload))
+          .build();
+    }
+
+    private RoutingConfig createRoutingConfig(Address address, String typeUrl) 
{
+      return RoutingConfig.newBuilder()
+          .setTypeUrl(typeUrl)
+          .addAllTargetFunctionTypes(
+              Collections.singleton(
+                  TargetFunctionType.newBuilder()
+                      .setNamespace(address.namespace)
+                      .setType(address.functionType)
+                      .build()))
+          .build();
+    }
+
+    private String parseTypeUrl(HeaderMap requestHeaders) {
+      final HeaderValues headerValues = 
requestHeaders.get(Headers.CONTENT_TYPE);
+
+      return headerValues.stream()
+          .filter(value -> 
value.startsWith(Constants.STATEFUN_CONTENT_TYPE_PREFIX))
+          .findFirst()
+          .map(type -> 
type.substring(Constants.STATEFUN_CONTENT_TYPE_PREFIX.length()))
+          .orElse(Constants.DEFAULT_INGRESS_TYPE);
+    }
+
+    private Address parseAddress(String relativePath) throws ParseException {
+      final String[] split = relativePath.substring(1).split("/");
+
+      if (split.length != 3) {
+        throw new ParseException(
+            "Invalid URL. Please specify 
'/namespace/function_type/function_id");
+      }
+
+      return new Address(split[0], split[1], split[2]);
+    }
+  }
+
+  private static final class Address {
+    final String namespace;
+    final String functionType;
+    final String key;
+
+    private Address(String namespace, String functionType, String key) {
+      this.namespace = namespace;
+      this.functionType = functionType;
+      this.key = key;
+    }
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/ParseException.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/ParseException.java
new file mode 100644
index 0000000..1666a6f
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/ParseException.java
@@ -0,0 +1,7 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+class ParseException extends Exception {
+  ParseException(String message) {
+    super(message);
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundEgress.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundEgress.java
new file mode 100644
index 0000000..d98ff8c
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundEgress.java
@@ -0,0 +1,66 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.ByteString;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+class PlaygroundEgress<T> extends RichSinkFunction<T> {
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  private static final RefCountedContainer<EgressWebServer> container = new 
RefCountedContainer<>();
+
+  private final int port;
+  private final Set<String> topics;
+
+  private transient RefCountedContainer<EgressWebServer>.Lease handle;
+
+  PlaygroundEgress(int port, Set<String> topics) {
+    this.port = port;
+    this.topics = new HashSet<>(topics);
+    this.handle = null;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    this.handle = container.getOrCreate(() -> new EgressWebServer(port, 
topics));
+  }
+
+  @Override
+  public void invoke(T value, Context context) throws Exception {
+    final EgressRecord egressRecord = asEgressRecord(value);
+
+    final String topic = egressRecord.getTopic();
+
+    if (!topics.contains(topic)) {
+      throw new IllegalArgumentException(
+          String.format("Message was targeted to unknown topic %s.", topic));
+    }
+
+    handle.deref().offer(topic, 
ByteString.copyFromUtf8(egressRecord.getPayload()));
+  }
+
+  private EgressRecord asEgressRecord(T value) throws IOException {
+    if (value instanceof TypedValue) {
+      final TypedValue typedValue = ((TypedValue) value);
+
+      if (typedValue.getTypename().equals(Constants.PLAYGROUND_EGRESS_RECORD)) 
{
+        return objectMapper.readValue(typedValue.getValue().toByteArray(), 
EgressRecord.class);
+      }
+    }
+    throw new IllegalArgumentException(String.format("Received unexpected 
value %s.", value));
+  }
+
+  @Override
+  public void finish() throws Exception {
+    super.finish();
+    handle.close();
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundFlinkIoModule.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundFlinkIoModule.java
new file mode 100644
index 0000000..6c3357b
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundFlinkIoModule.java
@@ -0,0 +1,15 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+
+@AutoService(FlinkIoModule.class)
+public class PlaygroundFlinkIoModule implements FlinkIoModule {
+  @Override
+  public void configure(Map<String, String> globalConfiguration, Binder 
binder) {
+    binder.bindSourceProvider(Constants.INGRESS_TYPE, new 
PlaygroundSourceProvider());
+    binder.bindSinkProvider(Constants.EGRESS_TYPE, new 
PlaygroundSinkProvider());
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundIngress.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundIngress.java
new file mode 100644
index 0000000..e1b2be9
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundIngress.java
@@ -0,0 +1,50 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+class PlaygroundIngress<T> extends RichSourceFunction<T> {
+
+  private final int port;
+  private final BlockingQueue<AutoRoutable> messageQueue;
+  private transient IngressWebServer server;
+
+  private volatile boolean running = true;
+
+  PlaygroundIngress(int port) {
+    this.port = port;
+    this.messageQueue = new ArrayBlockingQueue<>(1 << 20);
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    this.server = new IngressWebServer(port, messageQueue);
+  }
+
+  @Override
+  public void run(SourceContext<T> sourceContext) throws Exception {
+    while (running) {
+      final AutoRoutable message = messageQueue.poll(50L, 
TimeUnit.MILLISECONDS);
+
+      if (message != null) {
+        synchronized (sourceContext.getCheckpointLock()) {
+          sourceContext.collect((T) message);
+        }
+      }
+    }
+
+    if (server != null) {
+      server.stop();
+    }
+  }
+
+  @Override
+  public void cancel() {
+    running = false;
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSinkProvider.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSinkProvider.java
new file mode 100644
index 0000000..d40b89b
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSinkProvider.java
@@ -0,0 +1,22 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import org.apache.flink.statefun.flink.io.spi.SinkProvider;
+import 
org.apache.flink.statefun.playground.internal.io.spec.PlaygroundEgressSpec;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+public class PlaygroundSinkProvider implements SinkProvider {
+  @Override
+  public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
+    final PlaygroundEgressSpec egressSpec = asPlaygroundEgressSpec(spec);
+    return new PlaygroundEgress<>(egressSpec.getPort(), 
egressSpec.getTopics());
+  }
+
+  private static <T> PlaygroundEgressSpec asPlaygroundEgressSpec(EgressSpec<T> 
spec) {
+    if (spec instanceof PlaygroundEgressSpec) {
+      return (PlaygroundEgressSpec) spec;
+    }
+
+    throw new IllegalArgumentException(String.format("Unknown egress spec 
%s.", spec));
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSourceProvider.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSourceProvider.java
new file mode 100644
index 0000000..1d0f06b
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/PlaygroundSourceProvider.java
@@ -0,0 +1,23 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import org.apache.flink.statefun.flink.io.spi.SourceProvider;
+import 
org.apache.flink.statefun.playground.internal.io.spec.PlaygroundIngressSpec;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+public class PlaygroundSourceProvider implements SourceProvider {
+  @Override
+  public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
+    final PlaygroundIngressSpec ingressSpec = asPlaygroundIngressSpec(spec);
+
+    return new PlaygroundIngress<T>(ingressSpec.getPort());
+  }
+
+  private static <T> PlaygroundIngressSpec 
asPlaygroundIngressSpec(IngressSpec<T> spec) {
+    if (spec instanceof PlaygroundIngressSpec) {
+      return (PlaygroundIngressSpec) spec;
+    }
+
+    throw new IllegalArgumentException(String.format("Unknown ingress spec 
%s", spec));
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/RefCountedContainer.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/RefCountedContainer.java
new file mode 100644
index 0000000..433217e
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/RefCountedContainer.java
@@ -0,0 +1,68 @@
+package org.apache.flink.statefun.playground.internal.io.flink;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.flink.util.Preconditions;
+
+final class RefCountedContainer<T extends AutoCloseable> {
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  @Nullable
+  private T value;
+
+  @GuardedBy("lock")
+  private int refCounter;
+
+  RefCountedContainer() {
+    this.value = null;
+    this.refCounter = 0;
+  }
+
+  Lease getOrCreate(Supplier<T> supplier) {
+    synchronized (lock) {
+      if (value == null) {
+        value = supplier.get();
+      }
+
+      return new Lease();
+    }
+  }
+
+  class Lease implements AutoCloseable {
+
+    private boolean valid;
+
+    private Lease() {
+      valid = true;
+      synchronized (lock) {
+        refCounter += 1;
+      }
+    }
+
+    @Override
+    public void close() throws Exception {
+      if (valid) {
+        synchronized (lock) {
+          refCounter -= 1;
+
+          if (refCounter == 0 && value != null) {
+            value.close();
+            value = null;
+          }
+        }
+
+        valid = false;
+      }
+    }
+
+    T deref() {
+      Preconditions.checkState(valid, "Lease is no longer valid");
+      synchronized (lock) {
+        return Preconditions.checkNotNull(
+            value, "Value is null while there are still valid leases. This 
should not happen.");
+      }
+    }
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundEgressSpec.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundEgressSpec.java
new file mode 100644
index 0000000..b8368da
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundEgressSpec.java
@@ -0,0 +1,59 @@
+package org.apache.flink.statefun.playground.internal.io.spec;
+
+import java.util.Collections;
+import java.util.Set;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+import org.apache.flink.statefun.sdk.EgressType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@JsonDeserialize(builder = PlaygroundEgressSpec.Builder.class)
+public class PlaygroundEgressSpec implements EgressSpec<TypedValue> {
+
+  private final int port;
+  private final Set<String> topics;
+
+  private PlaygroundEgressSpec(int port, Set<String> topics) {
+    this.port = port;
+    this.topics = topics;
+  }
+
+  @Override
+  public EgressIdentifier<TypedValue> id() {
+    return Constants.EGRESS_IDENTIFIER;
+  }
+
+  @Override
+  public EgressType type() {
+    return Constants.EGRESS_TYPE;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public Set<String> getTopics() {
+    return Collections.unmodifiableSet(topics);
+  }
+
+  @JsonPOJOBuilder
+  public static final class Builder {
+    private final int port;
+    private final Set<String> topics;
+
+    @JsonCreator
+    private Builder(@JsonProperty("port") int port, @JsonProperty("topics") 
Set<String> topics) {
+      this.port = port;
+      this.topics = topics;
+    }
+
+    public PlaygroundEgressSpec build() {
+      return new PlaygroundEgressSpec(port, topics);
+    }
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundIngressSpec.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundIngressSpec.java
new file mode 100644
index 0000000..d5d7d92
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/spec/PlaygroundIngressSpec.java
@@ -0,0 +1,49 @@
+package org.apache.flink.statefun.playground.internal.io.spec;
+
+import com.google.protobuf.Message;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import org.apache.flink.statefun.playground.internal.io.Constants;
+import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+
+@JsonDeserialize(builder = PlaygroundIngressSpec.Builder.class)
+public class PlaygroundIngressSpec implements IngressSpec<Message> {
+
+  private final int port;
+
+  private PlaygroundIngressSpec(int port) {
+    this.port = port;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public IngressIdentifier<Message> id() {
+    return Constants.INGRESS_IDENTIFIER;
+  }
+
+  @Override
+  public IngressType type() {
+    return Constants.INGRESS_TYPE;
+  }
+
+  @JsonPOJOBuilder
+  public static final class Builder {
+    private final int port;
+
+    @JsonCreator
+    private Builder(@JsonProperty("port") int port) {
+      this.port = port;
+    }
+
+    public PlaygroundIngressSpec build() {
+      return new PlaygroundIngressSpec(port);
+    }
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/test/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypointITCase.java
 
b/playground-internal/statefun-playground-entrypoint/src/test/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypointITCase.java
new file mode 100644
index 0000000..5cfe016
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/test/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypointITCase.java
@@ -0,0 +1,13 @@
+package org.apache.flink.statefun.playground.internal.entrypoint;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+public class LocalEnvironmentEntrypointITCase {
+
+  @Disabled("Test for manual verification")
+  @Test
+  public void testLocalEnvironmentEntrypoint() throws Exception {
+    LocalEnvironmentEntrypoint.main(new String[] {"--module", 
"classpath:module.yaml"});
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/test/resources/module.yaml
 
b/playground-internal/statefun-playground-entrypoint/src/test/resources/module.yaml
new file mode 100644
index 0000000..466be42
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/test/resources/module.yaml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kind: io.statefun.endpoints.v2/http
+spec:
+  functions: greeter.fns/*
+  urlPathTemplate: http://localhost:1108/
+  transport:
+    type: io.statefun.transports.v1/async
+---
+kind: io.statefun.playground.v1/ingress
+spec:
+  port: 8090
+---
+kind: io.statefun.playground.v1/egress
+spec:
+  port: 8091
+  topics:
+    - greetings

Reply via email to