This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ee051dd25d88d9da38bc33f91788997975fa5e8e
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Mon Nov 30 15:20:42 2020 +0800

    [FLINK-20334] [core] Implement V3 module YAML with function endpoint 
templating
    
    This closes #182.
---
 .../src/test/resources/remote-module/module.yaml   |  18 +-
 .../statefun/module.yaml                           |  15 +-
 .../statefun-python-greeter-example/module.yaml    |  15 +-
 .../statefun-python-k8s-example/module.yaml        |  13 +-
 ...tionSpec.java => HttpFunctionEndpointSpec.java} |  92 ++++-----
 .../flink/core/httpfn/HttpFunctionProvider.java    |   2 +-
 .../flink/core/httpfn/HttpFunctionSpec.java        |   5 -
 ...der.java => TemplatedHttpFunctionProvider.java} |  56 ++++--
 .../flink/core/httpfn/UnixDomainHttpEndpoint.java  |   8 +
 .../flink/core/jsonmodule/FormatVersion.java       |   5 +-
 .../jsonmodule/FunctionEndpointJsonEntity.java     | 206 +++++++++++++++++++++
 ...ormatVersion.java => FunctionEndpointSpec.java} |  44 +++--
 .../statefun/flink/core/jsonmodule/JsonModule.java |  13 +-
 .../core/httpfn/UnixDomainHttpEndpointTest.java    |  11 ++
 .../flink/core/jsonmodule/JsonModuleV3Test.java    | 114 ++++++++++++
 .../src/test/resources/module-v3_0/module.yaml     |  85 +++++++++
 16 files changed, 572 insertions(+), 130 deletions(-)

diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
index d9a80e2..1925210 100644
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
+++ 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
@@ -13,26 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-version: "1.0"
+version: "3.0"
 
 module:
   meta:
     type: remote
   spec:
-    functions:
-      - function:
+    endpoints:
+      - endpoint:
           meta:
             kind: http
-            type: org.apache.flink.statefun.e2e.remote/counter
           spec:
-            endpoint: http://remote-function:8000/service
-            maxNumBatchRequests: 10000
-      - function:
-          meta:
-            kind: http
-            type: org.apache.flink.statefun.e2e.remote/forward-function
-          spec:
-            endpoint: http://remote-function:8000/service
+            typename:
+              namespace: org.apache.flink.statefun.e2e.remote
+            urlPathTemplate: http://remote-function:8000/service
             maxNumBatchRequests: 10000
     ingresses:
       - ingress:
diff --git 
a/statefun-examples/statefun-async-python-example/statefun/module.yaml 
b/statefun-examples/statefun-async-python-example/statefun/module.yaml
index 762e464..f4b197d 100644
--- a/statefun-examples/statefun-async-python-example/statefun/module.yaml
+++ b/statefun-examples/statefun-async-python-example/statefun/module.yaml
@@ -12,20 +12,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-version: "1.0"
+version: "3.0"
 module:
   meta:
     type: remote
   spec:
-    functions:
-      - function:
+    endpoints:
+      - endpoint:
           meta:
             kind: http
-            type: example/greeter
           spec:
-            endpoint: http://python-worker:8000/statefun
+            typename:
+              namespace: example
+              type: greeter
+            urlPathTemplate: http://python-worker:8000/statefun
             maxNumBatchRequests: 500
-            timeout: 2min
+            timeouts:
+              call: 2min
     ingresses:
       - ingress:
           meta:
diff --git a/statefun-examples/statefun-python-greeter-example/module.yaml 
b/statefun-examples/statefun-python-greeter-example/module.yaml
index 762e464..f4b197d 100644
--- a/statefun-examples/statefun-python-greeter-example/module.yaml
+++ b/statefun-examples/statefun-python-greeter-example/module.yaml
@@ -12,20 +12,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-version: "1.0"
+version: "3.0"
 module:
   meta:
     type: remote
   spec:
-    functions:
-      - function:
+    endpoints:
+      - endpoint:
           meta:
             kind: http
-            type: example/greeter
           spec:
-            endpoint: http://python-worker:8000/statefun
+            typename:
+              namespace: example
+              type: greeter
+            urlPathTemplate: http://python-worker:8000/statefun
             maxNumBatchRequests: 500
-            timeout: 2min
+            timeouts:
+              call: 2min
     ingresses:
       - ingress:
           meta:
diff --git a/statefun-examples/statefun-python-k8s-example/module.yaml 
b/statefun-examples/statefun-python-k8s-example/module.yaml
index cb59fd5..813ee1f 100644
--- a/statefun-examples/statefun-python-k8s-example/module.yaml
+++ b/statefun-examples/statefun-python-k8s-example/module.yaml
@@ -12,20 +12,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-version: "1.0"
+version: "3.0"
 module:
   meta:
     type: remote
   spec:
-    functions:
-      - function:
+    endpoints:
+      - endpoint:
           meta:
             kind: http
-            type: k8s-demo/greeter
           spec:
+            typename:
+              namespace: k8s-demo
+              type: greeter
             endpoint: http://statefun-python:8000/statefun
             maxNumBatchRequests: 500
-            timeout: 2min
+            timeouts:
+              call: 2min
     ingresses:
       - ingress:
           meta:
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
similarity index 68%
copy from 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
copy to 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
index 3a8f653..d5731d1 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
@@ -17,18 +17,14 @@
  */
 package org.apache.flink.statefun.flink.core.httpfn;
 
-import java.io.Serializable;
-import java.net.URI;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Objects;
-import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec;
+import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
+import org.apache.flink.types.Either;
 
-public final class HttpFunctionSpec implements FunctionSpec, Serializable {
-
-  private static final long serialVersionUID = 1;
+public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec {
 
   private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
   private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = 
Duration.ofSeconds(10);
@@ -36,41 +32,40 @@ public final class HttpFunctionSpec implements 
FunctionSpec, Serializable {
   private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = 
Duration.ofSeconds(10);
   private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
 
-  private final FunctionType functionType;
-  private final URI endpoint;
-  private final List<StateSpec> states;
+  private final Either<FunctionType, FunctionTypeNamespaceMatcher> target;
+  private final UrlPathTemplate urlPathTemplate;
+
   private final Duration maxRequestDuration;
   private final Duration connectTimeout;
   private final Duration readTimeout;
   private final Duration writeTimeout;
   private final int maxNumBatchRequests;
 
-  private HttpFunctionSpec(
-      FunctionType functionType,
-      URI endpoint,
-      List<StateSpec> states,
+  public static Builder builder(
+      Either<FunctionType, FunctionTypeNamespaceMatcher> target, 
UrlPathTemplate urlPathTemplate) {
+    return new Builder(target, urlPathTemplate);
+  }
+
+  private HttpFunctionEndpointSpec(
+      Either<FunctionType, FunctionTypeNamespaceMatcher> target,
+      UrlPathTemplate urlPathTemplate,
       Duration maxRequestDuration,
       Duration connectTimeout,
       Duration readTimeout,
       Duration writeTimeout,
       int maxNumBatchRequests) {
-    this.functionType = Objects.requireNonNull(functionType);
-    this.endpoint = Objects.requireNonNull(endpoint);
-    this.states = Objects.requireNonNull(states);
-    this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration);
-    this.connectTimeout = Objects.requireNonNull(connectTimeout);
-    this.readTimeout = Objects.requireNonNull(readTimeout);
-    this.writeTimeout = Objects.requireNonNull(writeTimeout);
+    this.target = target;
+    this.urlPathTemplate = urlPathTemplate;
+    this.maxRequestDuration = maxRequestDuration;
+    this.connectTimeout = connectTimeout;
+    this.readTimeout = readTimeout;
+    this.writeTimeout = writeTimeout;
     this.maxNumBatchRequests = maxNumBatchRequests;
   }
 
-  public static Builder builder(FunctionType functionType, URI endpoint) {
-    return new Builder(functionType, endpoint);
-  }
-
   @Override
-  public FunctionType functionType() {
-    return functionType;
+  public Either<FunctionType, FunctionTypeNamespaceMatcher> target() {
+    return target;
   }
 
   @Override
@@ -78,17 +73,9 @@ public final class HttpFunctionSpec implements FunctionSpec, 
Serializable {
     return Kind.HTTP;
   }
 
-  public URI endpoint() {
-    return endpoint;
-  }
-
-  public boolean isUnixDomainSocket() {
-    String scheme = endpoint.getScheme();
-    return "http+unix".equalsIgnoreCase(scheme) || 
"https+unix".equalsIgnoreCase(scheme);
-  }
-
-  public List<StateSpec> states() {
-    return states;
+  @Override
+  public UrlPathTemplate urlPathTemplate() {
+    return urlPathTemplate;
   }
 
   public Duration maxRequestDuration() {
@@ -113,24 +100,20 @@ public final class HttpFunctionSpec implements 
FunctionSpec, Serializable {
 
   public static final class Builder {
 
-    private final FunctionType functionType;
-    private final URI endpoint;
+    private final Either<FunctionType, FunctionTypeNamespaceMatcher> target;
+    private final UrlPathTemplate urlPathTemplate;
 
-    private final List<StateSpec> states = new ArrayList<>();
     private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
     private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT;
     private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT;
     private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
     private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
 
-    private Builder(FunctionType functionType, URI endpoint) {
-      this.functionType = Objects.requireNonNull(functionType);
-      this.endpoint = Objects.requireNonNull(endpoint);
-    }
-
-    public Builder withState(StateSpec stateSpec) {
-      this.states.add(stateSpec);
-      return this;
+    private Builder(
+        Either<FunctionType, FunctionTypeNamespaceMatcher> target,
+        UrlPathTemplate urlPathTemplate) {
+      this.target = Objects.requireNonNull(target);
+      this.urlPathTemplate = Objects.requireNonNull(urlPathTemplate);
     }
 
     public Builder withMaxRequestDuration(Duration duration) {
@@ -158,13 +141,12 @@ public final class HttpFunctionSpec implements 
FunctionSpec, Serializable {
       return this;
     }
 
-    public HttpFunctionSpec build() {
+    public HttpFunctionEndpointSpec build() {
       validateTimeouts();
 
-      return new HttpFunctionSpec(
-          functionType,
-          endpoint,
-          states,
+      return new HttpFunctionEndpointSpec(
+          target,
+          urlPathTemplate,
           maxRequestDuration,
           connectTimeout,
           readTimeout,
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index e1ef27e..67c2c68 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -72,7 +72,7 @@ public class HttpFunctionProvider implements 
StatefulFunctionProvider, ManagingR
     clientBuilder.writeTimeout(spec.writeTimeout());
 
     final HttpUrl url;
-    if (spec.isUnixDomainSocket()) {
+    if (UnixDomainHttpEndpoint.validate(spec.endpoint())) {
       UnixDomainHttpEndpoint endpoint = 
UnixDomainHttpEndpoint.parseFrom(spec.endpoint());
 
       url =
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index 3a8f653..42a7abb 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -82,11 +82,6 @@ public final class HttpFunctionSpec implements FunctionSpec, 
Serializable {
     return endpoint;
   }
 
-  public boolean isUnixDomainSocket() {
-    String scheme = endpoint.getScheme();
-    return "http+unix".equalsIgnoreCase(scheme) || 
"https+unix".equalsIgnoreCase(scheme);
-  }
-
   public List<StateSpec> states() {
     return states;
   }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java
similarity index 59%
copy from 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
copy to 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java
index e1ef27e..ae4d7f4 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java
@@ -15,12 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.statefun.flink.core.httpfn;
 
 import static 
org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
 
+import java.net.URI;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import okhttp3.HttpUrl;
@@ -30,38 +32,52 @@ import 
org.apache.flink.statefun.flink.core.reqreply.PersistedRemoteFunctionValu
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
 import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 
 @NotThreadSafe
-public class HttpFunctionProvider implements StatefulFunctionProvider, 
ManagingResources {
-  private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
+public final class TemplatedHttpFunctionProvider
+    implements StatefulFunctionProvider, ManagingResources {
+
+  private final Map<FunctionType, HttpFunctionEndpointSpec> 
specificTypeEndpointSpecs;
+  private final Map<String, HttpFunctionEndpointSpec> 
perNamespaceEndpointSpecs;
 
   /** lazily initialized by {code buildHttpClient} */
   @Nullable private OkHttpClient sharedClient;
 
   private volatile boolean shutdown;
 
-  public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> 
supportedTypes) {
-    this.supportedTypes = supportedTypes;
+  public TemplatedHttpFunctionProvider(
+      Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs,
+      Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) {
+    this.specificTypeEndpointSpecs = 
Objects.requireNonNull(specificTypeEndpointSpecs);
+    this.perNamespaceEndpointSpecs = 
Objects.requireNonNull(perNamespaceEndpointSpecs);
   }
 
   @Override
-  public RequestReplyFunction functionOfType(FunctionType type) {
-    HttpFunctionSpec spec = supportedTypes.get(type);
-    if (spec == null) {
-      throw new IllegalArgumentException("Unsupported type " + type);
-    }
+  public StatefulFunction functionOfType(FunctionType functionType) {
+    final HttpFunctionEndpointSpec endpointsSpec = 
getEndpointsSpecOrThrow(functionType);
     return new RequestReplyFunction(
-        new PersistedRemoteFunctionValues(spec.states()),
-        spec.maxNumBatchRequests(),
-        buildHttpClient(spec));
+        new PersistedRemoteFunctionValues(Collections.emptyList()),
+        endpointsSpec.maxNumBatchRequests(),
+        buildHttpClient(endpointsSpec, functionType));
   }
 
-  public HttpFunctionSpec getFunctionSpec(FunctionType type) {
-    return supportedTypes.get(type);
+  private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType 
functionType) {
+    HttpFunctionEndpointSpec endpointSpec = 
specificTypeEndpointSpecs.get(functionType);
+    if (endpointSpec != null) {
+      return endpointSpec;
+    }
+    endpointSpec = perNamespaceEndpointSpecs.get(functionType.namespace());
+    if (endpointSpec != null) {
+      return endpointSpec;
+    }
+
+    throw new IllegalStateException("Unknown type: " + functionType);
   }
 
-  private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+  private RequestReplyClient buildHttpClient(
+      HttpFunctionEndpointSpec spec, FunctionType functionType) {
     if (sharedClient == null) {
       sharedClient = OkHttpUtils.newClient();
     }
@@ -71,9 +87,11 @@ public class HttpFunctionProvider implements 
StatefulFunctionProvider, ManagingR
     clientBuilder.readTimeout(spec.readTimeout());
     clientBuilder.writeTimeout(spec.writeTimeout());
 
+    URI endpointUrl = spec.urlPathTemplate().apply(functionType);
+
     final HttpUrl url;
-    if (spec.isUnixDomainSocket()) {
-      UnixDomainHttpEndpoint endpoint = 
UnixDomainHttpEndpoint.parseFrom(spec.endpoint());
+    if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
+      UnixDomainHttpEndpoint endpoint = 
UnixDomainHttpEndpoint.parseFrom(endpointUrl);
 
       url =
           new HttpUrl.Builder()
@@ -84,7 +102,7 @@ public class HttpFunctionProvider implements 
StatefulFunctionProvider, ManagingR
 
       configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
     } else {
-      url = HttpUrl.get(spec.endpoint());
+      url = HttpUrl.get(endpointUrl);
     }
     return new HttpRequestReplyClient(url, clientBuilder.build(), () -> 
shutdown);
   }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java
index c628fdb..01c49d5 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java
@@ -23,12 +23,20 @@ import java.net.URI;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Objects;
+import org.apache.flink.util.Preconditions;
 
 /** Represents a Unix domain file path and an http endpoint */
 final class UnixDomainHttpEndpoint {
 
+  /** Checks whether or not an endpoint is using UNIX domain sockets. */
+  static boolean validate(URI endpoint) {
+    String scheme = endpoint.getScheme();
+    return "http+unix".equalsIgnoreCase(scheme) || 
"https+unix".equalsIgnoreCase(scheme);
+  }
+
   /** Parses a URI of the form {@code http+unix://<file system 
path>.sock/<http endpoint>}. */
   static UnixDomainHttpEndpoint parseFrom(URI endpoint) {
+    Preconditions.checkArgument(validate(endpoint));
     final Path path = Paths.get(endpoint.getPath());
     final int sockPathIndex = indexOfSockFile(path);
     final String filePath = "/" + path.subpath(0, sockPathIndex + 
1).toString();
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
index debb28c..87f4ec4 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
@@ -20,7 +20,8 @@ package org.apache.flink.statefun.flink.core.jsonmodule;
 
 enum FormatVersion {
   v1_0("1.0"),
-  v2_0("2.0");
+  v2_0("2.0"),
+  v3_0("3.0");
 
   private String versionStr;
 
@@ -39,6 +40,8 @@ enum FormatVersion {
         return v1_0;
       case "2.0":
         return v2_0;
+      case "3.0":
+        return v3_0;
       default:
         throw new IllegalArgumentException("Unrecognized format version: " + 
versionStr);
     }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
new file mode 100644
index 0000000..fac1bbe
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.StreamSupport;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
+import 
org.apache.flink.statefun.flink.core.httpfn.TemplatedHttpFunctionProvider;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.TimeUtils;
+
+public final class FunctionEndpointJsonEntity implements JsonEntity {
+
+  private static final JsonPointer FUNCTION_ENDPOINTS_POINTER = 
JsonPointer.compile("/endpoints");
+
+  private static final class MetaPointers {
+    private static final JsonPointer KIND = 
JsonPointer.compile("/endpoint/meta/kind");
+  }
+
+  private static final class SpecPointers {
+    private static final JsonPointer TYPENAME = 
JsonPointer.compile("/endpoint/spec/typename");
+    private static final JsonPointer URL_PATH_TEMPLATE =
+        JsonPointer.compile("/endpoint/spec/urlPathTemplate");
+    private static final JsonPointer TIMEOUTS = 
JsonPointer.compile("/endpoint/spec/timeouts");
+    private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
+        JsonPointer.compile("/endpoint/spec/maxNumBatchRequests");
+  }
+
+  private static final class TypenamePointers {
+    private static final JsonPointer NAMESPACE = 
JsonPointer.compile("/namespace");
+    private static final JsonPointer FUNCTION_NAME = 
JsonPointer.compile("/type");
+  }
+
+  private static final class TimeoutPointers {
+    private static final JsonPointer CALL = JsonPointer.compile("/call");
+    private static final JsonPointer CONNECT = JsonPointer.compile("/connect");
+    private static final JsonPointer READ = JsonPointer.compile("/read");
+    private static final JsonPointer WRITE = JsonPointer.compile("/write");
+  }
+
+  @Override
+  public void bind(
+      StatefulFunctionModule.Binder binder, JsonNode moduleSpecNode, 
FormatVersion formatVersion) {
+    if (formatVersion != FormatVersion.v3_0) {
+      throw new IllegalArgumentException("endpoints is only supported with 
format version 3.0.");
+    }
+
+    final Iterable<? extends JsonNode> functionEndpointsSpecNodes =
+        functionEndpointSpecNodes(moduleSpecNode);
+
+    for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> 
entry :
+        parseFunctionEndpointSpecs(functionEndpointsSpecNodes).entrySet()) {
+      final Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs 
= new HashMap<>();
+      final Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs = new 
HashMap<>();
+
+      entry
+          .getValue()
+          .forEach(
+              spec -> {
+                Either<FunctionType, FunctionTypeNamespaceMatcher> target = 
spec.target();
+                if (target.isLeft()) {
+                  specificTypeEndpointSpecs.put(target.left(), spec);
+                } else {
+                  
perNamespaceEndpointSpecs.put(target.right().targetNamespace(), spec);
+                }
+              });
+
+      StatefulFunctionProvider provider =
+          functionProvider(entry.getKey(), specificTypeEndpointSpecs, 
perNamespaceEndpointSpecs);
+      specificTypeEndpointSpecs
+          .keySet()
+          .forEach(specificType -> binder.bindFunctionProvider(specificType, 
provider));
+      perNamespaceEndpointSpecs
+          .keySet()
+          .forEach(
+              namespace ->
+                  binder.bindFunctionProvider(
+                      FunctionTypeNamespaceMatcher.targetNamespace(namespace), 
provider));
+    }
+  }
+
+  private static Iterable<? extends JsonNode> functionEndpointSpecNodes(
+      JsonNode moduleSpecRootNode) {
+    return Selectors.listAt(moduleSpecRootNode, FUNCTION_ENDPOINTS_POINTER);
+  }
+
+  private static Map<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>>
+      parseFunctionEndpointSpecs(Iterable<? extends JsonNode> 
functionEndpointsSpecNodes) {
+    return StreamSupport.stream(functionEndpointsSpecNodes.spliterator(), 
false)
+        .map(FunctionEndpointJsonEntity::parseFunctionEndpointsSpec)
+        .collect(groupingBy(FunctionEndpointSpec::kind, toList()));
+  }
+
+  private static FunctionEndpointSpec parseFunctionEndpointsSpec(
+      JsonNode functionEndpointSpecNode) {
+    FunctionEndpointSpec.Kind kind = endpointKind(functionEndpointSpecNode);
+
+    switch (kind) {
+      case HTTP:
+        final HttpFunctionEndpointSpec.Builder specBuilder =
+            HttpFunctionEndpointSpec.builder(
+                target(functionEndpointSpecNode), 
urlPathTemplate(functionEndpointSpecNode));
+
+        JsonNode timeoutsNode = 
functionEndpointSpecNode.at(SpecPointers.TIMEOUTS);
+        optionalMaxNumBatchRequests(functionEndpointSpecNode)
+            .ifPresent(specBuilder::withMaxNumBatchRequests);
+        optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CALL)
+            .ifPresent(specBuilder::withMaxRequestDuration);
+        optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CONNECT)
+            .ifPresent(specBuilder::withConnectTimeoutDuration);
+        optionalTimeoutDuration(timeoutsNode, TimeoutPointers.READ)
+            .ifPresent(specBuilder::withReadTimeoutDuration);
+        optionalTimeoutDuration(timeoutsNode, TimeoutPointers.WRITE)
+            .ifPresent(specBuilder::withWriteTimeoutDuration);
+
+        return specBuilder.build();
+      case GRPC:
+        throw new UnsupportedOperationException("GRPC endpoints are not 
supported yet.");
+      default:
+        throw new IllegalArgumentException("Unrecognized function endpoint 
kind " + kind);
+    }
+  }
+
+  private static FunctionEndpointSpec.Kind endpointKind(JsonNode 
functionEndpointSpecNode) {
+    String endpointKind = Selectors.textAt(functionEndpointSpecNode, 
MetaPointers.KIND);
+    return 
FunctionEndpointSpec.Kind.valueOf(endpointKind.toUpperCase(Locale.getDefault()));
+  }
+
+  private static Either<FunctionType, FunctionTypeNamespaceMatcher> target(
+      JsonNode functionEndpointSpecNode) {
+    JsonNode targetNode = functionEndpointSpecNode.at(SpecPointers.TYPENAME);
+    String namespace = Selectors.textAt(targetNode, 
TypenamePointers.NAMESPACE);
+    Optional<String> functionName =
+        Selectors.optionalTextAt(targetNode, TypenamePointers.FUNCTION_NAME);
+    return (functionName.isPresent())
+        ? Either.Left(new FunctionType(namespace, functionName.get()))
+        : 
Either.Right(FunctionTypeNamespaceMatcher.targetNamespace(namespace));
+  }
+
+  private static FunctionEndpointSpec.UrlPathTemplate urlPathTemplate(
+      JsonNode functionEndpointSpecNode) {
+    String template = Selectors.textAt(functionEndpointSpecNode, 
SpecPointers.URL_PATH_TEMPLATE);
+    return new FunctionEndpointSpec.UrlPathTemplate(template);
+  }
+
+  private static OptionalInt optionalMaxNumBatchRequests(JsonNode 
functionNode) {
+    return Selectors.optionalIntegerAt(functionNode, 
SpecPointers.MAX_NUM_BATCH_REQUESTS);
+  }
+
+  private static Optional<Duration> optionalTimeoutDuration(
+      JsonNode node, JsonPointer timeoutPointer) {
+    return Selectors.optionalTextAt(node, 
timeoutPointer).map(TimeUtils::parseDuration);
+  }
+
+  private static StatefulFunctionProvider functionProvider(
+      FunctionEndpointSpec.Kind kind,
+      Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs,
+      Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
+    switch (kind) {
+      case HTTP:
+        return new TemplatedHttpFunctionProvider(
+            castValues(specificTypeEndpointSpecs), 
castValues(perNamespaceEndpointSpecs));
+      case GRPC:
+        throw new UnsupportedOperationException("GRPC endpoints are not 
supported yet.");
+      default:
+        throw new IllegalStateException("Unexpected kind: " + kind);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <K, NV extends FunctionEndpointSpec> Map<K, NV> castValues(
+      Map<K, FunctionEndpointSpec> toCast) {
+    return new HashMap(toCast);
+  }
+}
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java
similarity index 53%
copy from 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
copy to 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java
index debb28c..940db87 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java
@@ -15,32 +15,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
-enum FormatVersion {
-  v1_0("1.0"),
-  v2_0("2.0");
+import java.net.URI;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
+import org.apache.flink.types.Either;
 
-  private String versionStr;
+public interface FunctionEndpointSpec {
 
-  FormatVersion(String versionStr) {
-    this.versionStr = versionStr;
-  }
+  Either<FunctionType, FunctionTypeNamespaceMatcher> target();
+
+  Kind kind();
+
+  UrlPathTemplate urlPathTemplate();
 
-  @Override
-  public String toString() {
-    return versionStr;
+  enum Kind {
+    HTTP,
+    GRPC
   }
 
-  static FormatVersion fromString(String versionStr) {
-    switch (versionStr) {
-      case "1.0":
-        return v1_0;
-      case "2.0":
-        return v2_0;
-      default:
-        throw new IllegalArgumentException("Unrecognized format version: " + 
versionStr);
+  class UrlPathTemplate {
+    private static final String FUNCTION_NAME_HOLDER = "{typename.function}";
+
+    private final String template;
+
+    public UrlPathTemplate(String template) {
+      this.template = Objects.requireNonNull(template);
+    }
+
+    public URI apply(FunctionType functionType) {
+      return URI.create(template.replace(FUNCTION_NAME_HOLDER, 
functionType.name()));
     }
   }
 }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index cedeb97..658bb9b 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -37,6 +37,13 @@ final class JsonModule implements StatefulFunctionModule {
           new RouterJsonEntity(),
           new EgressJsonEntity());
 
+  private static final List<JsonEntity> V3_ENTITIES =
+      Arrays.asList(
+          new FunctionEndpointJsonEntity(),
+          new IngressJsonEntity(),
+          new RouterJsonEntity(),
+          new EgressJsonEntity());
+
   private final JsonNode moduleSpecNode;
   private final FormatVersion formatVersion;
   private final URL moduleUrl;
@@ -49,7 +56,11 @@ final class JsonModule implements StatefulFunctionModule {
 
   public void configure(Map<String, String> conf, Binder binder) {
     try {
-      ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, 
formatVersion));
+      if (formatVersion == FormatVersion.v3_0) {
+        V3_ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, 
moduleSpecNode, formatVersion));
+      } else {
+        ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, 
formatVersion));
+      }
     } catch (Throwable t) {
       throw new ModuleConfigurationException(
           format("Error while parsing module at %s", moduleUrl), t);
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java
index f96ac3e..127775a 100644
--- 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java
@@ -1,6 +1,7 @@
 package org.apache.flink.statefun.flink.core.httpfn;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.net.URI;
 import org.junit.Test;
@@ -29,4 +30,14 @@ public class UnixDomainHttpEndpointTest {
   public void missingSockFile() {
     
UnixDomainHttpEndpoint.parseFrom(URI.create("http+unix:///some/path/hello"));
   }
+
+  @Test
+  public void validateUdsEndpoint() {
+    
assertFalse(UnixDomainHttpEndpoint.validate(URI.create("http:///bar.foo.com/some/path";)));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void parseNonUdsEndpoint() {
+    
UnixDomainHttpEndpoint.parseFrom(URI.create("http:///bar.foo.com/some/path";));
+  }
 }
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java
new file mode 100644
index 0000000..fcbb885
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.jsonmodule;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.Message;
+import java.net.URL;
+import java.util.Collections;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.junit.Test;
+
+public class JsonModuleV3Test {
+
+  private static final String modulePath = "module-v3_0/module.yaml";
+
+  @Test
+  public void exampleUsage() {
+    StatefulFunctionModule module = fromPath(modulePath);
+
+    assertThat(module, notNullValue());
+  }
+
+  @Test
+  public void testFunctions() {
+    StatefulFunctionModule module = fromPath(modulePath);
+
+    StatefulFunctionsUniverse universe = emptyUniverse();
+    module.configure(Collections.emptyMap(), universe);
+
+    assertThat(
+        universe.functions(),
+        allOf(
+            hasKey(new FunctionType("com.foo.bar", "specific_function")),
+            hasKey(new FunctionType("com.other.namespace", "hello"))));
+
+    assertThat(universe.namespaceFunctions(), hasKey("com.foo.bar"));
+  }
+
+  @Test
+  public void testRouters() {
+    StatefulFunctionModule module = fromPath(modulePath);
+
+    StatefulFunctionsUniverse universe = emptyUniverse();
+    module.configure(Collections.emptyMap(), universe);
+
+    assertThat(
+        universe.routers(),
+        hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", 
"names")));
+  }
+
+  @Test
+  public void testIngresses() {
+    StatefulFunctionModule module = fromPath(modulePath);
+
+    StatefulFunctionsUniverse universe = emptyUniverse();
+    module.configure(Collections.emptyMap(), universe);
+
+    assertThat(
+        universe.ingress(),
+        hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", 
"names")));
+  }
+
+  @Test
+  public void testEgresses() {
+    StatefulFunctionModule module = fromPath(modulePath);
+
+    StatefulFunctionsUniverse universe = emptyUniverse();
+    module.configure(Collections.emptyMap(), universe);
+
+    assertThat(
+        universe.egress(), hasKey(new EgressIdentifier<>("com.mycomp.foo", 
"bar", Any.class)));
+  }
+
+  private static StatefulFunctionModule fromPath(String path) {
+    URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path);
+    assertThat(moduleUrl, not(nullValue()));
+    ObjectMapper mapper = JsonServiceLoader.mapper();
+    return JsonServiceLoader.fromUrl(mapper, moduleUrl);
+  }
+
+  private static StatefulFunctionsUniverse emptyUniverse() {
+    return new StatefulFunctionsUniverse(
+        MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, 
null));
+  }
+}
diff --git 
a/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml 
b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml
new file mode 100644
index 0000000..f7d3d7c
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.0"
+
+module:
+  meta:
+    type: remote
+  spec:
+    endpoints:
+      - endpoint:
+          meta:
+            kind: http
+          spec:
+            typename:
+              namespace: com.foo.bar
+            urlPathTemplate: 
http://bar.foo.com:8080/functions/{typename.function}
+            timeouts:
+              call: 1minutes
+              connect: 10seconds
+              read: 10second
+              write: 10seconds
+            maxNumBatchRequests: 10000
+      - endpoint:
+          meta:
+            kind: http
+          spec:
+            typename:
+              namespace: com.foo.bar
+              type: specific_function
+            urlPathTemplate: http://bar.foo.com:8080/functions/abc
+      - endpoint:
+          meta:
+            kind: http
+          spec:
+            typename:
+              namespace: com.other.namespace
+              type: hello
+            urlPathTemplate: http://namespace.other.com:8080/hello
+    routers:
+      - router:
+          meta:
+            type: org.apache.flink.statefun.sdk/protobuf-router
+          spec:
+            ingress: com.mycomp.igal/names
+            target: "com.example/hello/{{$.name}}"
+            messageType: org.apache.flink.test.SimpleMessage
+            descriptorSet: classpath:test.desc
+    ingresses:
+      - ingress:
+          meta:
+            type: statefun.kafka.io/protobuf-ingress
+            id: com.mycomp.igal/names
+          spec:
+            address: kafka-broker:9092
+            topics:
+              - names
+            properties:
+              - consumer.group: greeter
+            messageType: org.apache.flink.test.SimpleMessage
+            descriptorSet: classpath:test.desc
+    egresses:
+      - egress:
+          meta:
+            type: statefun.kafka.io/generic-egress
+            id: com.mycomp.foo/bar
+          spec:
+            address: kafka-broker:9092
+            deliverySemantic:
+              type: exactly-once
+              transactionTimeoutMillis: 100000
+            properties:
+              - foo.config: bar
\ No newline at end of file

Reply via email to