This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit ee051dd25d88d9da38bc33f91788997975fa5e8e Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Mon Nov 30 15:20:42 2020 +0800 [FLINK-20334] [core] Implement V3 module YAML with function endpoint templating This closes #182. --- .../src/test/resources/remote-module/module.yaml | 18 +- .../statefun/module.yaml | 15 +- .../statefun-python-greeter-example/module.yaml | 15 +- .../statefun-python-k8s-example/module.yaml | 13 +- ...tionSpec.java => HttpFunctionEndpointSpec.java} | 92 ++++----- .../flink/core/httpfn/HttpFunctionProvider.java | 2 +- .../flink/core/httpfn/HttpFunctionSpec.java | 5 - ...der.java => TemplatedHttpFunctionProvider.java} | 56 ++++-- .../flink/core/httpfn/UnixDomainHttpEndpoint.java | 8 + .../flink/core/jsonmodule/FormatVersion.java | 5 +- .../jsonmodule/FunctionEndpointJsonEntity.java | 206 +++++++++++++++++++++ ...ormatVersion.java => FunctionEndpointSpec.java} | 44 +++-- .../statefun/flink/core/jsonmodule/JsonModule.java | 13 +- .../core/httpfn/UnixDomainHttpEndpointTest.java | 11 ++ .../flink/core/jsonmodule/JsonModuleV3Test.java | 114 ++++++++++++ .../src/test/resources/module-v3_0/module.yaml | 85 +++++++++ 16 files changed, 572 insertions(+), 130 deletions(-) diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml index d9a80e2..1925210 100644 --- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml +++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml @@ -13,26 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -version: "1.0" +version: "3.0" module: meta: type: remote spec: - functions: - - function: + endpoints: + - endpoint: meta: kind: http - type: org.apache.flink.statefun.e2e.remote/counter spec: - endpoint: http://remote-function:8000/service - maxNumBatchRequests: 10000 - - function: - meta: - kind: http - type: org.apache.flink.statefun.e2e.remote/forward-function - spec: - endpoint: http://remote-function:8000/service + typename: + namespace: org.apache.flink.statefun.e2e.remote + urlPathTemplate: http://remote-function:8000/service maxNumBatchRequests: 10000 ingresses: - ingress: diff --git a/statefun-examples/statefun-async-python-example/statefun/module.yaml b/statefun-examples/statefun-async-python-example/statefun/module.yaml index 762e464..f4b197d 100644 --- a/statefun-examples/statefun-async-python-example/statefun/module.yaml +++ b/statefun-examples/statefun-async-python-example/statefun/module.yaml @@ -12,20 +12,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -version: "1.0" +version: "3.0" module: meta: type: remote spec: - functions: - - function: + endpoints: + - endpoint: meta: kind: http - type: example/greeter spec: - endpoint: http://python-worker:8000/statefun + typename: + namespace: example + type: greeter + urlPathTemplate: http://python-worker:8000/statefun maxNumBatchRequests: 500 - timeout: 2min + timeouts: + call: 2min ingresses: - ingress: meta: diff --git a/statefun-examples/statefun-python-greeter-example/module.yaml b/statefun-examples/statefun-python-greeter-example/module.yaml index 762e464..f4b197d 100644 --- a/statefun-examples/statefun-python-greeter-example/module.yaml +++ b/statefun-examples/statefun-python-greeter-example/module.yaml @@ -12,20 +12,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -version: "1.0" +version: "3.0" module: meta: type: remote spec: - functions: - - function: + endpoints: + - endpoint: meta: kind: http - type: example/greeter spec: - endpoint: http://python-worker:8000/statefun + typename: + namespace: example + type: greeter + urlPathTemplate: http://python-worker:8000/statefun maxNumBatchRequests: 500 - timeout: 2min + timeouts: + call: 2min ingresses: - ingress: meta: diff --git a/statefun-examples/statefun-python-k8s-example/module.yaml b/statefun-examples/statefun-python-k8s-example/module.yaml index cb59fd5..813ee1f 100644 --- a/statefun-examples/statefun-python-k8s-example/module.yaml +++ b/statefun-examples/statefun-python-k8s-example/module.yaml @@ -12,20 +12,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -version: "1.0" +version: "3.0" module: meta: type: remote spec: - functions: - - function: + endpoints: + - endpoint: meta: kind: http - type: k8s-demo/greeter spec: + typename: + namespace: k8s-demo + type: greeter endpoint: http://statefun-python:8000/statefun maxNumBatchRequests: 500 - timeout: 2min + timeouts: + call: 2min ingresses: - ingress: meta: diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java similarity index 68% copy from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java copy to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java index 3a8f653..d5731d1 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java @@ -17,18 +17,14 @@ */ package org.apache.flink.statefun.flink.core.httpfn; -import java.io.Serializable; -import java.net.URI; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; -import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec; +import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec; import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher; +import org.apache.flink.types.Either; -public final class HttpFunctionSpec implements FunctionSpec, Serializable { - - private static final long serialVersionUID = 1; +public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec { private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1); private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10); @@ -36,41 +32,40 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10); private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000; - private final FunctionType functionType; - private final URI endpoint; - private final List<StateSpec> states; + private final Either<FunctionType, FunctionTypeNamespaceMatcher> target; + private final UrlPathTemplate urlPathTemplate; + private final Duration maxRequestDuration; private final Duration connectTimeout; private final Duration readTimeout; private final Duration writeTimeout; private final int maxNumBatchRequests; - private HttpFunctionSpec( - FunctionType functionType, - URI endpoint, - List<StateSpec> states, + public static Builder builder( + Either<FunctionType, FunctionTypeNamespaceMatcher> target, UrlPathTemplate urlPathTemplate) { + return new Builder(target, urlPathTemplate); + } + + private HttpFunctionEndpointSpec( + Either<FunctionType, FunctionTypeNamespaceMatcher> target, + UrlPathTemplate urlPathTemplate, Duration maxRequestDuration, Duration connectTimeout, Duration readTimeout, Duration writeTimeout, int maxNumBatchRequests) { - this.functionType = Objects.requireNonNull(functionType); - this.endpoint = Objects.requireNonNull(endpoint); - this.states = Objects.requireNonNull(states); - this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration); - this.connectTimeout = Objects.requireNonNull(connectTimeout); - this.readTimeout = Objects.requireNonNull(readTimeout); - this.writeTimeout = Objects.requireNonNull(writeTimeout); + this.target = target; + this.urlPathTemplate = urlPathTemplate; + this.maxRequestDuration = maxRequestDuration; + this.connectTimeout = connectTimeout; + this.readTimeout = readTimeout; + this.writeTimeout = writeTimeout; this.maxNumBatchRequests = maxNumBatchRequests; } - public static Builder builder(FunctionType functionType, URI endpoint) { - return new Builder(functionType, endpoint); - } - @Override - public FunctionType functionType() { - return functionType; + public Either<FunctionType, FunctionTypeNamespaceMatcher> target() { + return target; } @Override @@ -78,17 +73,9 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { return Kind.HTTP; } - public URI endpoint() { - return endpoint; - } - - public boolean isUnixDomainSocket() { - String scheme = endpoint.getScheme(); - return "http+unix".equalsIgnoreCase(scheme) || "https+unix".equalsIgnoreCase(scheme); - } - - public List<StateSpec> states() { - return states; + @Override + public UrlPathTemplate urlPathTemplate() { + return urlPathTemplate; } public Duration maxRequestDuration() { @@ -113,24 +100,20 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { public static final class Builder { - private final FunctionType functionType; - private final URI endpoint; + private final Either<FunctionType, FunctionTypeNamespaceMatcher> target; + private final UrlPathTemplate urlPathTemplate; - private final List<StateSpec> states = new ArrayList<>(); private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT; private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT; private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT; private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT; private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS; - private Builder(FunctionType functionType, URI endpoint) { - this.functionType = Objects.requireNonNull(functionType); - this.endpoint = Objects.requireNonNull(endpoint); - } - - public Builder withState(StateSpec stateSpec) { - this.states.add(stateSpec); - return this; + private Builder( + Either<FunctionType, FunctionTypeNamespaceMatcher> target, + UrlPathTemplate urlPathTemplate) { + this.target = Objects.requireNonNull(target); + this.urlPathTemplate = Objects.requireNonNull(urlPathTemplate); } public Builder withMaxRequestDuration(Duration duration) { @@ -158,13 +141,12 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { return this; } - public HttpFunctionSpec build() { + public HttpFunctionEndpointSpec build() { validateTimeouts(); - return new HttpFunctionSpec( - functionType, - endpoint, - states, + return new HttpFunctionEndpointSpec( + target, + urlPathTemplate, maxRequestDuration, connectTimeout, readTimeout, diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java index e1ef27e..67c2c68 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java @@ -72,7 +72,7 @@ public class HttpFunctionProvider implements StatefulFunctionProvider, ManagingR clientBuilder.writeTimeout(spec.writeTimeout()); final HttpUrl url; - if (spec.isUnixDomainSocket()) { + if (UnixDomainHttpEndpoint.validate(spec.endpoint())) { UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(spec.endpoint()); url = diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java index 3a8f653..42a7abb 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java @@ -82,11 +82,6 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { return endpoint; } - public boolean isUnixDomainSocket() { - String scheme = endpoint.getScheme(); - return "http+unix".equalsIgnoreCase(scheme) || "https+unix".equalsIgnoreCase(scheme); - } - public List<StateSpec> states() { return states; } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java similarity index 59% copy from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java copy to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java index e1ef27e..ae4d7f4 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java @@ -15,12 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.statefun.flink.core.httpfn; import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket; +import java.net.URI; +import java.util.Collections; import java.util.Map; +import java.util.Objects; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import okhttp3.HttpUrl; @@ -30,38 +32,52 @@ import org.apache.flink.statefun.flink.core.reqreply.PersistedRemoteFunctionValu import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient; import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction; import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.StatefulFunctionProvider; @NotThreadSafe -public class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources { - private final Map<FunctionType, HttpFunctionSpec> supportedTypes; +public final class TemplatedHttpFunctionProvider + implements StatefulFunctionProvider, ManagingResources { + + private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs; + private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs; /** lazily initialized by {code buildHttpClient} */ @Nullable private OkHttpClient sharedClient; private volatile boolean shutdown; - public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) { - this.supportedTypes = supportedTypes; + public TemplatedHttpFunctionProvider( + Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs, + Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) { + this.specificTypeEndpointSpecs = Objects.requireNonNull(specificTypeEndpointSpecs); + this.perNamespaceEndpointSpecs = Objects.requireNonNull(perNamespaceEndpointSpecs); } @Override - public RequestReplyFunction functionOfType(FunctionType type) { - HttpFunctionSpec spec = supportedTypes.get(type); - if (spec == null) { - throw new IllegalArgumentException("Unsupported type " + type); - } + public StatefulFunction functionOfType(FunctionType functionType) { + final HttpFunctionEndpointSpec endpointsSpec = getEndpointsSpecOrThrow(functionType); return new RequestReplyFunction( - new PersistedRemoteFunctionValues(spec.states()), - spec.maxNumBatchRequests(), - buildHttpClient(spec)); + new PersistedRemoteFunctionValues(Collections.emptyList()), + endpointsSpec.maxNumBatchRequests(), + buildHttpClient(endpointsSpec, functionType)); } - public HttpFunctionSpec getFunctionSpec(FunctionType type) { - return supportedTypes.get(type); + private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) { + HttpFunctionEndpointSpec endpointSpec = specificTypeEndpointSpecs.get(functionType); + if (endpointSpec != null) { + return endpointSpec; + } + endpointSpec = perNamespaceEndpointSpecs.get(functionType.namespace()); + if (endpointSpec != null) { + return endpointSpec; + } + + throw new IllegalStateException("Unknown type: " + functionType); } - private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) { + private RequestReplyClient buildHttpClient( + HttpFunctionEndpointSpec spec, FunctionType functionType) { if (sharedClient == null) { sharedClient = OkHttpUtils.newClient(); } @@ -71,9 +87,11 @@ public class HttpFunctionProvider implements StatefulFunctionProvider, ManagingR clientBuilder.readTimeout(spec.readTimeout()); clientBuilder.writeTimeout(spec.writeTimeout()); + URI endpointUrl = spec.urlPathTemplate().apply(functionType); + final HttpUrl url; - if (spec.isUnixDomainSocket()) { - UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(spec.endpoint()); + if (UnixDomainHttpEndpoint.validate(endpointUrl)) { + UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl); url = new HttpUrl.Builder() @@ -84,7 +102,7 @@ public class HttpFunctionProvider implements StatefulFunctionProvider, ManagingR configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile); } else { - url = HttpUrl.get(spec.endpoint()); + url = HttpUrl.get(endpointUrl); } return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java index c628fdb..01c49d5 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java @@ -23,12 +23,20 @@ import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Objects; +import org.apache.flink.util.Preconditions; /** Represents a Unix domain file path and an http endpoint */ final class UnixDomainHttpEndpoint { + /** Checks whether or not an endpoint is using UNIX domain sockets. */ + static boolean validate(URI endpoint) { + String scheme = endpoint.getScheme(); + return "http+unix".equalsIgnoreCase(scheme) || "https+unix".equalsIgnoreCase(scheme); + } + /** Parses a URI of the form {@code http+unix://<file system path>.sock/<http endpoint>}. */ static UnixDomainHttpEndpoint parseFrom(URI endpoint) { + Preconditions.checkArgument(validate(endpoint)); final Path path = Paths.get(endpoint.getPath()); final int sockPathIndex = indexOfSockFile(path); final String filePath = "/" + path.subpath(0, sockPathIndex + 1).toString(); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java index debb28c..87f4ec4 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java @@ -20,7 +20,8 @@ package org.apache.flink.statefun.flink.core.jsonmodule; enum FormatVersion { v1_0("1.0"), - v2_0("2.0"); + v2_0("2.0"), + v3_0("3.0"); private String versionStr; @@ -39,6 +40,8 @@ enum FormatVersion { return v1_0; case "2.0": return v2_0; + case "3.0": + return v3_0; default: throw new IllegalArgumentException("Unrecognized format version: " + versionStr); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java new file mode 100644 index 0000000..fac1bbe --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.flink.core.jsonmodule; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.stream.StreamSupport; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.common.json.Selectors; +import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec; +import org.apache.flink.statefun.flink.core.httpfn.TemplatedHttpFunctionProvider; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher; +import org.apache.flink.statefun.sdk.StatefulFunctionProvider; +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; +import org.apache.flink.types.Either; +import org.apache.flink.util.TimeUtils; + +public final class FunctionEndpointJsonEntity implements JsonEntity { + + private static final JsonPointer FUNCTION_ENDPOINTS_POINTER = JsonPointer.compile("/endpoints"); + + private static final class MetaPointers { + private static final JsonPointer KIND = JsonPointer.compile("/endpoint/meta/kind"); + } + + private static final class SpecPointers { + private static final JsonPointer TYPENAME = JsonPointer.compile("/endpoint/spec/typename"); + private static final JsonPointer URL_PATH_TEMPLATE = + JsonPointer.compile("/endpoint/spec/urlPathTemplate"); + private static final JsonPointer TIMEOUTS = JsonPointer.compile("/endpoint/spec/timeouts"); + private static final JsonPointer MAX_NUM_BATCH_REQUESTS = + JsonPointer.compile("/endpoint/spec/maxNumBatchRequests"); + } + + private static final class TypenamePointers { + private static final JsonPointer NAMESPACE = JsonPointer.compile("/namespace"); + private static final JsonPointer FUNCTION_NAME = JsonPointer.compile("/type"); + } + + private static final class TimeoutPointers { + private static final JsonPointer CALL = JsonPointer.compile("/call"); + private static final JsonPointer CONNECT = JsonPointer.compile("/connect"); + private static final JsonPointer READ = JsonPointer.compile("/read"); + private static final JsonPointer WRITE = JsonPointer.compile("/write"); + } + + @Override + public void bind( + StatefulFunctionModule.Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion) { + if (formatVersion != FormatVersion.v3_0) { + throw new IllegalArgumentException("endpoints is only supported with format version 3.0."); + } + + final Iterable<? extends JsonNode> functionEndpointsSpecNodes = + functionEndpointSpecNodes(moduleSpecNode); + + for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> entry : + parseFunctionEndpointSpecs(functionEndpointsSpecNodes).entrySet()) { + final Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs = new HashMap<>(); + final Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs = new HashMap<>(); + + entry + .getValue() + .forEach( + spec -> { + Either<FunctionType, FunctionTypeNamespaceMatcher> target = spec.target(); + if (target.isLeft()) { + specificTypeEndpointSpecs.put(target.left(), spec); + } else { + perNamespaceEndpointSpecs.put(target.right().targetNamespace(), spec); + } + }); + + StatefulFunctionProvider provider = + functionProvider(entry.getKey(), specificTypeEndpointSpecs, perNamespaceEndpointSpecs); + specificTypeEndpointSpecs + .keySet() + .forEach(specificType -> binder.bindFunctionProvider(specificType, provider)); + perNamespaceEndpointSpecs + .keySet() + .forEach( + namespace -> + binder.bindFunctionProvider( + FunctionTypeNamespaceMatcher.targetNamespace(namespace), provider)); + } + } + + private static Iterable<? extends JsonNode> functionEndpointSpecNodes( + JsonNode moduleSpecRootNode) { + return Selectors.listAt(moduleSpecRootNode, FUNCTION_ENDPOINTS_POINTER); + } + + private static Map<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> + parseFunctionEndpointSpecs(Iterable<? extends JsonNode> functionEndpointsSpecNodes) { + return StreamSupport.stream(functionEndpointsSpecNodes.spliterator(), false) + .map(FunctionEndpointJsonEntity::parseFunctionEndpointsSpec) + .collect(groupingBy(FunctionEndpointSpec::kind, toList())); + } + + private static FunctionEndpointSpec parseFunctionEndpointsSpec( + JsonNode functionEndpointSpecNode) { + FunctionEndpointSpec.Kind kind = endpointKind(functionEndpointSpecNode); + + switch (kind) { + case HTTP: + final HttpFunctionEndpointSpec.Builder specBuilder = + HttpFunctionEndpointSpec.builder( + target(functionEndpointSpecNode), urlPathTemplate(functionEndpointSpecNode)); + + JsonNode timeoutsNode = functionEndpointSpecNode.at(SpecPointers.TIMEOUTS); + optionalMaxNumBatchRequests(functionEndpointSpecNode) + .ifPresent(specBuilder::withMaxNumBatchRequests); + optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CALL) + .ifPresent(specBuilder::withMaxRequestDuration); + optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CONNECT) + .ifPresent(specBuilder::withConnectTimeoutDuration); + optionalTimeoutDuration(timeoutsNode, TimeoutPointers.READ) + .ifPresent(specBuilder::withReadTimeoutDuration); + optionalTimeoutDuration(timeoutsNode, TimeoutPointers.WRITE) + .ifPresent(specBuilder::withWriteTimeoutDuration); + + return specBuilder.build(); + case GRPC: + throw new UnsupportedOperationException("GRPC endpoints are not supported yet."); + default: + throw new IllegalArgumentException("Unrecognized function endpoint kind " + kind); + } + } + + private static FunctionEndpointSpec.Kind endpointKind(JsonNode functionEndpointSpecNode) { + String endpointKind = Selectors.textAt(functionEndpointSpecNode, MetaPointers.KIND); + return FunctionEndpointSpec.Kind.valueOf(endpointKind.toUpperCase(Locale.getDefault())); + } + + private static Either<FunctionType, FunctionTypeNamespaceMatcher> target( + JsonNode functionEndpointSpecNode) { + JsonNode targetNode = functionEndpointSpecNode.at(SpecPointers.TYPENAME); + String namespace = Selectors.textAt(targetNode, TypenamePointers.NAMESPACE); + Optional<String> functionName = + Selectors.optionalTextAt(targetNode, TypenamePointers.FUNCTION_NAME); + return (functionName.isPresent()) + ? Either.Left(new FunctionType(namespace, functionName.get())) + : Either.Right(FunctionTypeNamespaceMatcher.targetNamespace(namespace)); + } + + private static FunctionEndpointSpec.UrlPathTemplate urlPathTemplate( + JsonNode functionEndpointSpecNode) { + String template = Selectors.textAt(functionEndpointSpecNode, SpecPointers.URL_PATH_TEMPLATE); + return new FunctionEndpointSpec.UrlPathTemplate(template); + } + + private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) { + return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS); + } + + private static Optional<Duration> optionalTimeoutDuration( + JsonNode node, JsonPointer timeoutPointer) { + return Selectors.optionalTextAt(node, timeoutPointer).map(TimeUtils::parseDuration); + } + + private static StatefulFunctionProvider functionProvider( + FunctionEndpointSpec.Kind kind, + Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs, + Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs) { + switch (kind) { + case HTTP: + return new TemplatedHttpFunctionProvider( + castValues(specificTypeEndpointSpecs), castValues(perNamespaceEndpointSpecs)); + case GRPC: + throw new UnsupportedOperationException("GRPC endpoints are not supported yet."); + default: + throw new IllegalStateException("Unexpected kind: " + kind); + } + } + + @SuppressWarnings("unchecked") + private static <K, NV extends FunctionEndpointSpec> Map<K, NV> castValues( + Map<K, FunctionEndpointSpec> toCast) { + return new HashMap(toCast); + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java similarity index 53% copy from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java copy to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java index debb28c..940db87 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java @@ -15,32 +15,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.statefun.flink.core.jsonmodule; -enum FormatVersion { - v1_0("1.0"), - v2_0("2.0"); +import java.net.URI; +import java.util.Objects; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher; +import org.apache.flink.types.Either; - private String versionStr; +public interface FunctionEndpointSpec { - FormatVersion(String versionStr) { - this.versionStr = versionStr; - } + Either<FunctionType, FunctionTypeNamespaceMatcher> target(); + + Kind kind(); + + UrlPathTemplate urlPathTemplate(); - @Override - public String toString() { - return versionStr; + enum Kind { + HTTP, + GRPC } - static FormatVersion fromString(String versionStr) { - switch (versionStr) { - case "1.0": - return v1_0; - case "2.0": - return v2_0; - default: - throw new IllegalArgumentException("Unrecognized format version: " + versionStr); + class UrlPathTemplate { + private static final String FUNCTION_NAME_HOLDER = "{typename.function}"; + + private final String template; + + public UrlPathTemplate(String template) { + this.template = Objects.requireNonNull(template); + } + + public URI apply(FunctionType functionType) { + return URI.create(template.replace(FUNCTION_NAME_HOLDER, functionType.name())); } } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java index cedeb97..658bb9b 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java @@ -37,6 +37,13 @@ final class JsonModule implements StatefulFunctionModule { new RouterJsonEntity(), new EgressJsonEntity()); + private static final List<JsonEntity> V3_ENTITIES = + Arrays.asList( + new FunctionEndpointJsonEntity(), + new IngressJsonEntity(), + new RouterJsonEntity(), + new EgressJsonEntity()); + private final JsonNode moduleSpecNode; private final FormatVersion formatVersion; private final URL moduleUrl; @@ -49,7 +56,11 @@ final class JsonModule implements StatefulFunctionModule { public void configure(Map<String, String> conf, Binder binder) { try { - ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion)); + if (formatVersion == FormatVersion.v3_0) { + V3_ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion)); + } else { + ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion)); + } } catch (Throwable t) { throw new ModuleConfigurationException( format("Error while parsing module at %s", moduleUrl), t); diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java index f96ac3e..127775a 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java @@ -1,6 +1,7 @@ package org.apache.flink.statefun.flink.core.httpfn; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import java.net.URI; import org.junit.Test; @@ -29,4 +30,14 @@ public class UnixDomainHttpEndpointTest { public void missingSockFile() { UnixDomainHttpEndpoint.parseFrom(URI.create("http+unix:///some/path/hello")); } + + @Test + public void validateUdsEndpoint() { + assertFalse(UnixDomainHttpEndpoint.validate(URI.create("http:///bar.foo.com/some/path"))); + } + + @Test(expected = IllegalArgumentException.class) + public void parseNonUdsEndpoint() { + UnixDomainHttpEndpoint.parseFrom(URI.create("http:///bar.foo.com/some/path")); + } } diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java new file mode 100644 index 0000000..fcbb885 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.flink.core.jsonmodule; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import java.net.URL; +import java.util.Collections; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; +import org.apache.flink.statefun.flink.core.message.MessageFactoryKey; +import org.apache.flink.statefun.flink.core.message.MessageFactoryType; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.io.IngressIdentifier; +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; +import org.junit.Test; + +public class JsonModuleV3Test { + + private static final String modulePath = "module-v3_0/module.yaml"; + + @Test + public void exampleUsage() { + StatefulFunctionModule module = fromPath(modulePath); + + assertThat(module, notNullValue()); + } + + @Test + public void testFunctions() { + StatefulFunctionModule module = fromPath(modulePath); + + StatefulFunctionsUniverse universe = emptyUniverse(); + module.configure(Collections.emptyMap(), universe); + + assertThat( + universe.functions(), + allOf( + hasKey(new FunctionType("com.foo.bar", "specific_function")), + hasKey(new FunctionType("com.other.namespace", "hello")))); + + assertThat(universe.namespaceFunctions(), hasKey("com.foo.bar")); + } + + @Test + public void testRouters() { + StatefulFunctionModule module = fromPath(modulePath); + + StatefulFunctionsUniverse universe = emptyUniverse(); + module.configure(Collections.emptyMap(), universe); + + assertThat( + universe.routers(), + hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", "names"))); + } + + @Test + public void testIngresses() { + StatefulFunctionModule module = fromPath(modulePath); + + StatefulFunctionsUniverse universe = emptyUniverse(); + module.configure(Collections.emptyMap(), universe); + + assertThat( + universe.ingress(), + hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", "names"))); + } + + @Test + public void testEgresses() { + StatefulFunctionModule module = fromPath(modulePath); + + StatefulFunctionsUniverse universe = emptyUniverse(); + module.configure(Collections.emptyMap(), universe); + + assertThat( + universe.egress(), hasKey(new EgressIdentifier<>("com.mycomp.foo", "bar", Any.class))); + } + + private static StatefulFunctionModule fromPath(String path) { + URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path); + assertThat(moduleUrl, not(nullValue())); + ObjectMapper mapper = JsonServiceLoader.mapper(); + return JsonServiceLoader.fromUrl(mapper, moduleUrl); + } + + private static StatefulFunctionsUniverse emptyUniverse() { + return new StatefulFunctionsUniverse( + MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null)); + } +} diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml new file mode 100644 index 0000000..f7d3d7c --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "3.0" + +module: + meta: + type: remote + spec: + endpoints: + - endpoint: + meta: + kind: http + spec: + typename: + namespace: com.foo.bar + urlPathTemplate: http://bar.foo.com:8080/functions/{typename.function} + timeouts: + call: 1minutes + connect: 10seconds + read: 10second + write: 10seconds + maxNumBatchRequests: 10000 + - endpoint: + meta: + kind: http + spec: + typename: + namespace: com.foo.bar + type: specific_function + urlPathTemplate: http://bar.foo.com:8080/functions/abc + - endpoint: + meta: + kind: http + spec: + typename: + namespace: com.other.namespace + type: hello + urlPathTemplate: http://namespace.other.com:8080/hello + routers: + - router: + meta: + type: org.apache.flink.statefun.sdk/protobuf-router + spec: + ingress: com.mycomp.igal/names + target: "com.example/hello/{{$.name}}" + messageType: org.apache.flink.test.SimpleMessage + descriptorSet: classpath:test.desc + ingresses: + - ingress: + meta: + type: statefun.kafka.io/protobuf-ingress + id: com.mycomp.igal/names + spec: + address: kafka-broker:9092 + topics: + - names + properties: + - consumer.group: greeter + messageType: org.apache.flink.test.SimpleMessage + descriptorSet: classpath:test.desc + egresses: + - egress: + meta: + type: statefun.kafka.io/generic-egress + id: com.mycomp.foo/bar + spec: + address: kafka-broker:9092 + deliverySemantic: + type: exactly-once + transactionTimeoutMillis: 100000 + properties: + - foo.config: bar \ No newline at end of file
