This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit d1744eaa888a530edf102396675dfa4377489560 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Fri Dec 4 12:24:14 2020 +0800 [FLINK-20335] [core] Remove support for module YAML versions 1.0 / 2.0 This closes #184. --- .../statefun/examples/datastream/Example.java | 1 - .../core/httpfn/HttpFunctionEndpointSpec.java | 23 +- .../flink/core/httpfn/HttpFunctionProvider.java | 53 +++-- .../flink/core/httpfn/HttpFunctionSpec.java | 194 --------------- .../statefun/flink/core/httpfn/StateSpec.java | 48 ---- .../core/httpfn/TemplatedHttpFunctionProvider.java | 115 --------- .../flink/core/jsonmodule/FormatVersion.java | 9 + .../jsonmodule/FunctionEndpointJsonEntity.java | 42 ++-- .../core/jsonmodule/FunctionEndpointSpec.java | 71 +++++- .../flink/core/jsonmodule/FunctionJsonEntity.java | 265 --------------------- .../statefun/flink/core/jsonmodule/JsonModule.java | 13 +- .../flink/core/jsonmodule/JsonServiceLoader.java | 11 +- .../reqreply/PersistedRemoteFunctionValues.java | 26 +- .../flink/core/reqreply/RequestReplyFunction.java | 16 +- .../flink/core/jsonmodule/JsonModuleTest.java | 31 +-- .../flink/core/jsonmodule/JsonModuleV3Test.java | 114 --------- .../PersistedRemoteFunctionValuesTest.java | 18 +- .../core/reqreply/RequestReplyFunctionTest.java | 18 +- .../datastream/RequestReplyFunctionBuilder.java | 37 +-- .../SerializableHttpFunctionProvider.java | 9 +- .../StatefulFunctionDataStreamBuilder.java | 8 +- .../statefun/sdk/FunctionTypeNamespaceMatcher.java | 5 +- 22 files changed, 214 insertions(+), 913 deletions(-) diff --git a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java index 76ffde5..aa825b3 100644 --- a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java +++ b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java @@ -79,7 +79,6 @@ public class Example { .withRequestReplyRemoteFunction( requestReplyFunctionBuilder( REMOTE_GREET, URI.create("http://localhost:5000/statefun")) - .withPersistedState("seen_count") .withMaxRequestDuration(Duration.ofSeconds(15)) .withMaxNumBatchRequests(500)) .withEgressId(GREETINGS) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java index d5731d1..85df004 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java @@ -17,14 +17,14 @@ */ package org.apache.flink.statefun.flink.core.httpfn; +import java.io.Serializable; import java.time.Duration; import java.util.Objects; import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec; -import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher; -import org.apache.flink.types.Either; -public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec { +public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec, Serializable { + + private static final long serialVersionUID = 1; private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1); private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10); @@ -32,7 +32,7 @@ public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec { private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10); private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000; - private final Either<FunctionType, FunctionTypeNamespaceMatcher> target; + private final Target target; private final UrlPathTemplate urlPathTemplate; private final Duration maxRequestDuration; @@ -41,13 +41,12 @@ public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec { private final Duration writeTimeout; private final int maxNumBatchRequests; - public static Builder builder( - Either<FunctionType, FunctionTypeNamespaceMatcher> target, UrlPathTemplate urlPathTemplate) { + public static Builder builder(Target target, UrlPathTemplate urlPathTemplate) { return new Builder(target, urlPathTemplate); } private HttpFunctionEndpointSpec( - Either<FunctionType, FunctionTypeNamespaceMatcher> target, + Target target, UrlPathTemplate urlPathTemplate, Duration maxRequestDuration, Duration connectTimeout, @@ -64,7 +63,7 @@ public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec { } @Override - public Either<FunctionType, FunctionTypeNamespaceMatcher> target() { + public Target target() { return target; } @@ -100,7 +99,7 @@ public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec { public static final class Builder { - private final Either<FunctionType, FunctionTypeNamespaceMatcher> target; + private final Target target; private final UrlPathTemplate urlPathTemplate; private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT; @@ -109,9 +108,7 @@ public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec { private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT; private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS; - private Builder( - Either<FunctionType, FunctionTypeNamespaceMatcher> target, - UrlPathTemplate urlPathTemplate) { + private Builder(Target target, UrlPathTemplate urlPathTemplate) { this.target = Objects.requireNonNull(target); this.urlPathTemplate = Objects.requireNonNull(urlPathTemplate); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java index 67c2c68..5445500 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java @@ -15,53 +15,64 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.statefun.flink.core.httpfn; import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket; +import java.net.URI; import java.util.Map; +import java.util.Objects; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import org.apache.flink.statefun.flink.core.common.ManagingResources; -import org.apache.flink.statefun.flink.core.reqreply.PersistedRemoteFunctionValues; import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient; import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction; import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.StatefulFunctionProvider; @NotThreadSafe -public class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources { - private final Map<FunctionType, HttpFunctionSpec> supportedTypes; +public final class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources { + + private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs; + private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs; /** lazily initialized by {code buildHttpClient} */ @Nullable private OkHttpClient sharedClient; private volatile boolean shutdown; - public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) { - this.supportedTypes = supportedTypes; + public HttpFunctionProvider( + Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs, + Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) { + this.specificTypeEndpointSpecs = Objects.requireNonNull(specificTypeEndpointSpecs); + this.perNamespaceEndpointSpecs = Objects.requireNonNull(perNamespaceEndpointSpecs); } @Override - public RequestReplyFunction functionOfType(FunctionType type) { - HttpFunctionSpec spec = supportedTypes.get(type); - if (spec == null) { - throw new IllegalArgumentException("Unsupported type " + type); - } + public StatefulFunction functionOfType(FunctionType functionType) { + final HttpFunctionEndpointSpec endpointsSpec = getEndpointsSpecOrThrow(functionType); return new RequestReplyFunction( - new PersistedRemoteFunctionValues(spec.states()), - spec.maxNumBatchRequests(), - buildHttpClient(spec)); + endpointsSpec.maxNumBatchRequests(), buildHttpClient(endpointsSpec, functionType)); } - public HttpFunctionSpec getFunctionSpec(FunctionType type) { - return supportedTypes.get(type); + private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) { + HttpFunctionEndpointSpec endpointSpec = specificTypeEndpointSpecs.get(functionType); + if (endpointSpec != null) { + return endpointSpec; + } + endpointSpec = perNamespaceEndpointSpecs.get(functionType.namespace()); + if (endpointSpec != null) { + return endpointSpec; + } + + throw new IllegalStateException("Unknown type: " + functionType); } - private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) { + private RequestReplyClient buildHttpClient( + HttpFunctionEndpointSpec spec, FunctionType functionType) { if (sharedClient == null) { sharedClient = OkHttpUtils.newClient(); } @@ -71,9 +82,11 @@ public class HttpFunctionProvider implements StatefulFunctionProvider, ManagingR clientBuilder.readTimeout(spec.readTimeout()); clientBuilder.writeTimeout(spec.writeTimeout()); + URI endpointUrl = spec.urlPathTemplate().apply(functionType); + final HttpUrl url; - if (UnixDomainHttpEndpoint.validate(spec.endpoint())) { - UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(spec.endpoint()); + if (UnixDomainHttpEndpoint.validate(endpointUrl)) { + UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl); url = new HttpUrl.Builder() @@ -84,7 +97,7 @@ public class HttpFunctionProvider implements StatefulFunctionProvider, ManagingR configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile); } else { - url = HttpUrl.get(spec.endpoint()); + url = HttpUrl.get(endpointUrl); } return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java deleted file mode 100644 index 42a7abb..0000000 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.statefun.flink.core.httpfn; - -import java.io.Serializable; -import java.net.URI; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec; -import org.apache.flink.statefun.sdk.FunctionType; - -public final class HttpFunctionSpec implements FunctionSpec, Serializable { - - private static final long serialVersionUID = 1; - - private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1); - private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10); - private static final Duration DEFAULT_HTTP_READ_TIMEOUT = Duration.ofSeconds(10); - private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10); - private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000; - - private final FunctionType functionType; - private final URI endpoint; - private final List<StateSpec> states; - private final Duration maxRequestDuration; - private final Duration connectTimeout; - private final Duration readTimeout; - private final Duration writeTimeout; - private final int maxNumBatchRequests; - - private HttpFunctionSpec( - FunctionType functionType, - URI endpoint, - List<StateSpec> states, - Duration maxRequestDuration, - Duration connectTimeout, - Duration readTimeout, - Duration writeTimeout, - int maxNumBatchRequests) { - this.functionType = Objects.requireNonNull(functionType); - this.endpoint = Objects.requireNonNull(endpoint); - this.states = Objects.requireNonNull(states); - this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration); - this.connectTimeout = Objects.requireNonNull(connectTimeout); - this.readTimeout = Objects.requireNonNull(readTimeout); - this.writeTimeout = Objects.requireNonNull(writeTimeout); - this.maxNumBatchRequests = maxNumBatchRequests; - } - - public static Builder builder(FunctionType functionType, URI endpoint) { - return new Builder(functionType, endpoint); - } - - @Override - public FunctionType functionType() { - return functionType; - } - - @Override - public Kind kind() { - return Kind.HTTP; - } - - public URI endpoint() { - return endpoint; - } - - public List<StateSpec> states() { - return states; - } - - public Duration maxRequestDuration() { - return maxRequestDuration; - } - - public Duration connectTimeout() { - return connectTimeout; - } - - public Duration readTimeout() { - return readTimeout; - } - - public Duration writeTimeout() { - return writeTimeout; - } - - public int maxNumBatchRequests() { - return maxNumBatchRequests; - } - - public static final class Builder { - - private final FunctionType functionType; - private final URI endpoint; - - private final List<StateSpec> states = new ArrayList<>(); - private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT; - private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT; - private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT; - private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT; - private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS; - - private Builder(FunctionType functionType, URI endpoint) { - this.functionType = Objects.requireNonNull(functionType); - this.endpoint = Objects.requireNonNull(endpoint); - } - - public Builder withState(StateSpec stateSpec) { - this.states.add(stateSpec); - return this; - } - - public Builder withMaxRequestDuration(Duration duration) { - this.maxRequestDuration = requireNonZeroDuration(duration); - return this; - } - - public Builder withConnectTimeoutDuration(Duration duration) { - this.connectTimeout = requireNonZeroDuration(duration); - return this; - } - - public Builder withReadTimeoutDuration(Duration duration) { - this.readTimeout = requireNonZeroDuration(duration); - return this; - } - - public Builder withWriteTimeoutDuration(Duration duration) { - this.writeTimeout = requireNonZeroDuration(duration); - return this; - } - - public Builder withMaxNumBatchRequests(int maxNumBatchRequests) { - this.maxNumBatchRequests = maxNumBatchRequests; - return this; - } - - public HttpFunctionSpec build() { - validateTimeouts(); - - return new HttpFunctionSpec( - functionType, - endpoint, - states, - maxRequestDuration, - connectTimeout, - readTimeout, - writeTimeout, - maxNumBatchRequests); - } - - private Duration requireNonZeroDuration(Duration duration) { - Objects.requireNonNull(duration); - if (duration.equals(Duration.ZERO)) { - throw new IllegalArgumentException("Timeout durations must be larger than 0."); - } - - return duration; - } - - private void validateTimeouts() { - if (connectTimeout.compareTo(maxRequestDuration) > 0) { - throw new IllegalArgumentException( - "Connect timeout cannot be larger than request timeout."); - } - - if (readTimeout.compareTo(maxRequestDuration) > 0) { - throw new IllegalArgumentException("Read timeout cannot be larger than request timeout."); - } - - if (writeTimeout.compareTo(maxRequestDuration) > 0) { - throw new IllegalArgumentException("Write timeout cannot be larger than request timeout."); - } - } - } -} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java deleted file mode 100644 index 8bb3c84..0000000 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.statefun.flink.core.httpfn; - -import java.io.Serializable; -import java.util.Objects; -import org.apache.flink.statefun.sdk.state.Expiration; - -public final class StateSpec implements Serializable { - - private static final long serialVersionUID = 1; - - private final String name; - private final Expiration ttlExpiration; - - public StateSpec(String name) { - this(name, Expiration.none()); - } - - public StateSpec(String name, Expiration ttlExpiration) { - this.name = Objects.requireNonNull(name); - this.ttlExpiration = Objects.requireNonNull(ttlExpiration); - } - - public String name() { - return name; - } - - public Expiration ttlExpiration() { - return ttlExpiration; - } -} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java deleted file mode 100644 index ae4d7f4..0000000 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.statefun.flink.core.httpfn; - -import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket; - -import java.net.URI; -import java.util.Collections; -import java.util.Map; -import java.util.Objects; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; -import okhttp3.HttpUrl; -import okhttp3.OkHttpClient; -import org.apache.flink.statefun.flink.core.common.ManagingResources; -import org.apache.flink.statefun.flink.core.reqreply.PersistedRemoteFunctionValues; -import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient; -import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction; -import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.StatefulFunction; -import org.apache.flink.statefun.sdk.StatefulFunctionProvider; - -@NotThreadSafe -public final class TemplatedHttpFunctionProvider - implements StatefulFunctionProvider, ManagingResources { - - private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs; - private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs; - - /** lazily initialized by {code buildHttpClient} */ - @Nullable private OkHttpClient sharedClient; - - private volatile boolean shutdown; - - public TemplatedHttpFunctionProvider( - Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs, - Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) { - this.specificTypeEndpointSpecs = Objects.requireNonNull(specificTypeEndpointSpecs); - this.perNamespaceEndpointSpecs = Objects.requireNonNull(perNamespaceEndpointSpecs); - } - - @Override - public StatefulFunction functionOfType(FunctionType functionType) { - final HttpFunctionEndpointSpec endpointsSpec = getEndpointsSpecOrThrow(functionType); - return new RequestReplyFunction( - new PersistedRemoteFunctionValues(Collections.emptyList()), - endpointsSpec.maxNumBatchRequests(), - buildHttpClient(endpointsSpec, functionType)); - } - - private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) { - HttpFunctionEndpointSpec endpointSpec = specificTypeEndpointSpecs.get(functionType); - if (endpointSpec != null) { - return endpointSpec; - } - endpointSpec = perNamespaceEndpointSpecs.get(functionType.namespace()); - if (endpointSpec != null) { - return endpointSpec; - } - - throw new IllegalStateException("Unknown type: " + functionType); - } - - private RequestReplyClient buildHttpClient( - HttpFunctionEndpointSpec spec, FunctionType functionType) { - if (sharedClient == null) { - sharedClient = OkHttpUtils.newClient(); - } - OkHttpClient.Builder clientBuilder = sharedClient.newBuilder(); - clientBuilder.callTimeout(spec.maxRequestDuration()); - clientBuilder.connectTimeout(spec.connectTimeout()); - clientBuilder.readTimeout(spec.readTimeout()); - clientBuilder.writeTimeout(spec.writeTimeout()); - - URI endpointUrl = spec.urlPathTemplate().apply(functionType); - - final HttpUrl url; - if (UnixDomainHttpEndpoint.validate(endpointUrl)) { - UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl); - - url = - new HttpUrl.Builder() - .scheme("http") - .host("unused") - .addPathSegment(endpoint.pathSegment) - .build(); - - configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile); - } else { - url = HttpUrl.get(endpointUrl); - } - return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown); - } - - @Override - public void shutdown() { - shutdown = true; - OkHttpUtils.closeSilently(sharedClient); - } -} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java index 87f4ec4..580e6c7 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java @@ -19,8 +19,17 @@ package org.apache.flink.statefun.flink.core.jsonmodule; enum FormatVersion { + // ============================================================ + // EOL versions + // ============================================================ + v1_0("1.0"), v2_0("2.0"), + + // ============================================================ + // Supported versions + // ============================================================ + v3_0("3.0"); private String versionStr; diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java index fac1bbe..e88f439 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java @@ -32,12 +32,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.statefun.flink.common.json.Selectors; import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec; -import org.apache.flink.statefun.flink.core.httpfn.TemplatedHttpFunctionProvider; +import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher; import org.apache.flink.statefun.sdk.StatefulFunctionProvider; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; -import org.apache.flink.types.Either; import org.apache.flink.util.TimeUtils; public final class FunctionEndpointJsonEntity implements JsonEntity { @@ -82,17 +81,18 @@ public final class FunctionEndpointJsonEntity implements JsonEntity { for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> entry : parseFunctionEndpointSpecs(functionEndpointsSpecNodes).entrySet()) { final Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs = new HashMap<>(); - final Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs = new HashMap<>(); + final Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs = + new HashMap<>(); entry .getValue() .forEach( spec -> { - Either<FunctionType, FunctionTypeNamespaceMatcher> target = spec.target(); - if (target.isLeft()) { - specificTypeEndpointSpecs.put(target.left(), spec); + FunctionEndpointSpec.Target target = spec.target(); + if (target.isSpecificFunctionType()) { + specificTypeEndpointSpecs.put(target.asSpecificFunctionType(), spec); } else { - perNamespaceEndpointSpecs.put(target.right().targetNamespace(), spec); + perNamespaceEndpointSpecs.put(target.asNamespace(), spec); } }); @@ -103,10 +103,7 @@ public final class FunctionEndpointJsonEntity implements JsonEntity { .forEach(specificType -> binder.bindFunctionProvider(specificType, provider)); perNamespaceEndpointSpecs .keySet() - .forEach( - namespace -> - binder.bindFunctionProvider( - FunctionTypeNamespaceMatcher.targetNamespace(namespace), provider)); + .forEach(namespace -> binder.bindFunctionProvider(namespace, provider)); } } @@ -157,15 +154,14 @@ public final class FunctionEndpointJsonEntity implements JsonEntity { return FunctionEndpointSpec.Kind.valueOf(endpointKind.toUpperCase(Locale.getDefault())); } - private static Either<FunctionType, FunctionTypeNamespaceMatcher> target( - JsonNode functionEndpointSpecNode) { + private static FunctionEndpointSpec.Target target(JsonNode functionEndpointSpecNode) { JsonNode targetNode = functionEndpointSpecNode.at(SpecPointers.TYPENAME); String namespace = Selectors.textAt(targetNode, TypenamePointers.NAMESPACE); Optional<String> functionName = Selectors.optionalTextAt(targetNode, TypenamePointers.FUNCTION_NAME); return (functionName.isPresent()) - ? Either.Left(new FunctionType(namespace, functionName.get())) - : Either.Right(FunctionTypeNamespaceMatcher.targetNamespace(namespace)); + ? FunctionEndpointSpec.Target.functionType(new FunctionType(namespace, functionName.get())) + : FunctionEndpointSpec.Target.namespace(namespace); } private static FunctionEndpointSpec.UrlPathTemplate urlPathTemplate( @@ -186,11 +182,12 @@ public final class FunctionEndpointJsonEntity implements JsonEntity { private static StatefulFunctionProvider functionProvider( FunctionEndpointSpec.Kind kind, Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs, - Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs) { + Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) { switch (kind) { case HTTP: - return new TemplatedHttpFunctionProvider( - castValues(specificTypeEndpointSpecs), castValues(perNamespaceEndpointSpecs)); + return new HttpFunctionProvider( + castValues(specificTypeEndpointSpecs), + castValues(namespaceAsKey(perNamespaceEndpointSpecs))); case GRPC: throw new UnsupportedOperationException("GRPC endpoints are not supported yet."); default: @@ -203,4 +200,13 @@ public final class FunctionEndpointJsonEntity implements JsonEntity { Map<K, FunctionEndpointSpec> toCast) { return new HashMap(toCast); } + + private static Map<String, FunctionEndpointSpec> namespaceAsKey( + Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) { + final Map<String, FunctionEndpointSpec> converted = + new HashMap<>(perNamespaceEndpointSpecs.size()); + perNamespaceEndpointSpecs.forEach( + (namespaceMatcher, spec) -> converted.put(namespaceMatcher.targetNamespace(), spec)); + return converted; + } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java index 940db87..9af6345 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java @@ -17,15 +17,15 @@ */ package org.apache.flink.statefun.flink.core.jsonmodule; +import java.io.Serializable; import java.net.URI; import java.util.Objects; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher; -import org.apache.flink.types.Either; public interface FunctionEndpointSpec { - Either<FunctionType, FunctionTypeNamespaceMatcher> target(); + Target target(); Kind kind(); @@ -36,7 +36,72 @@ public interface FunctionEndpointSpec { GRPC } - class UrlPathTemplate { + abstract class Target implements Serializable { + + public static Target namespace(String namespace) { + return new NamespaceTarget(FunctionTypeNamespaceMatcher.targetNamespace(namespace)); + } + + public static Target functionType(FunctionType functionType) { + return new FunctionTypeTarget(functionType); + } + + public boolean isSpecificFunctionType() { + return this.getClass() == FunctionTypeTarget.class; + } + + public boolean isNamespace() { + return this.getClass() == NamespaceTarget.class; + } + + public abstract FunctionTypeNamespaceMatcher asNamespace(); + + public abstract FunctionType asSpecificFunctionType(); + + private static class NamespaceTarget extends Target { + private static final long serialVersionUID = 1; + + private final FunctionTypeNamespaceMatcher namespaceMatcher; + + private NamespaceTarget(FunctionTypeNamespaceMatcher namespaceMatcher) { + this.namespaceMatcher = Objects.requireNonNull(namespaceMatcher); + } + + @Override + public FunctionTypeNamespaceMatcher asNamespace() { + return namespaceMatcher; + } + + @Override + public FunctionType asSpecificFunctionType() { + throw new IllegalStateException("This target is not a specific function type"); + } + } + + private static class FunctionTypeTarget extends Target { + private static final long serialVersionUID = 1; + + private final FunctionType functionType; + + private FunctionTypeTarget(FunctionType functionType) { + this.functionType = Objects.requireNonNull(functionType); + } + + @Override + public FunctionTypeNamespaceMatcher asNamespace() { + throw new IllegalStateException("This target is not a namespace."); + } + + @Override + public FunctionType asSpecificFunctionType() { + return functionType; + } + } + } + + class UrlPathTemplate implements Serializable { + private static final long serialVersionUID = 1; + private static final String FUNCTION_NAME_HOLDER = "{typename.function}"; private final String template; diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java deleted file mode 100644 index 2b9c866..0000000 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.statefun.flink.core.jsonmodule; - -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.toMap; -import static org.apache.flink.statefun.flink.core.common.Maps.transformValues; -import static org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind; - -import java.net.InetSocketAddress; -import java.net.URI; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collector; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -import javax.annotation.Nullable; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.statefun.flink.common.json.NamespaceNamePair; -import org.apache.flink.statefun.flink.common.json.Selectors; -import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider; -import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec; -import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider; -import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec; -import org.apache.flink.statefun.flink.core.httpfn.StateSpec; -import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.StatefulFunctionProvider; -import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder; -import org.apache.flink.statefun.sdk.state.Expiration; -import org.apache.flink.util.TimeUtils; - -final class FunctionJsonEntity implements JsonEntity { - - private static final JsonPointer FUNCTION_SPECS_POINTER = JsonPointer.compile("/functions"); - - private static final class MetaPointers { - private static final JsonPointer KIND = JsonPointer.compile("/function/meta/kind"); - private static final JsonPointer TYPE = JsonPointer.compile("/function/meta/type"); - } - - private static final class SpecPointers { - private static final JsonPointer HOSTNAME = JsonPointer.compile("/function/spec/host"); - private static final JsonPointer ENDPOINT = JsonPointer.compile("/function/spec/endpoint"); - private static final JsonPointer PORT = JsonPointer.compile("/function/spec/port"); - private static final JsonPointer STATES = JsonPointer.compile("/function/spec/states"); - - private static final JsonPointer TIMEOUT = JsonPointer.compile("/function/spec/timeout"); - private static final JsonPointer CONNECT_TIMEOUT = - JsonPointer.compile("/function/spec/connectTimeout"); - private static final JsonPointer READ_TIMEOUT = - JsonPointer.compile("/function/spec/readTimeout"); - private static final JsonPointer WRITE_TIMEOUT = - JsonPointer.compile("/function/spec/writeTimeout"); - - private static final JsonPointer MAX_NUM_BATCH_REQUESTS = - JsonPointer.compile("/function/spec/maxNumBatchRequests"); - } - - private static final class StateSpecPointers { - private static final JsonPointer NAME = JsonPointer.compile("/name"); - private static final JsonPointer EXPIRE_DURATION = JsonPointer.compile("/expireAfter"); - private static final JsonPointer EXPIRE_MODE = JsonPointer.compile("/expireMode"); - } - - @Override - public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) { - final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode); - - for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry : - parse(functionSpecNodes, formatVersion).entrySet()) { - StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue()); - Set<FunctionType> functionTypes = entry.getValue().keySet(); - for (FunctionType type : functionTypes) { - binder.bindFunctionProvider(type, provider); - } - } - } - - private Map<Kind, Map<FunctionType, FunctionSpec>> parse( - Iterable<? extends JsonNode> functionSpecNodes, FormatVersion formatVersion) { - return StreamSupport.stream(functionSpecNodes.spliterator(), false) - .map(functionSpecNode -> parseFunctionSpec(functionSpecNode, formatVersion)) - .collect(groupingBy(FunctionSpec::kind, groupByFunctionType())); - } - - private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode moduleSpecRootNode) { - return Selectors.listAt(moduleSpecRootNode, FUNCTION_SPECS_POINTER); - } - - private static FunctionSpec parseFunctionSpec( - JsonNode functionNode, FormatVersion formatVersion) { - String functionKind = Selectors.textAt(functionNode, MetaPointers.KIND); - FunctionSpec.Kind kind = - FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault())); - FunctionType functionType = functionType(functionNode); - switch (kind) { - case HTTP: - final HttpFunctionSpec.Builder specBuilder = - HttpFunctionSpec.builder(functionType, functionUri(functionNode)); - - final Function<JsonNode, List<StateSpec>> stateSpecParser = - functionStateParserOf(formatVersion); - for (StateSpec state : stateSpecParser.apply(functionNode)) { - specBuilder.withState(state); - } - optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests); - optionalTimeoutDuration(functionNode, SpecPointers.TIMEOUT) - .ifPresent(specBuilder::withMaxRequestDuration); - optionalTimeoutDuration(functionNode, SpecPointers.CONNECT_TIMEOUT) - .ifPresent(specBuilder::withConnectTimeoutDuration); - optionalTimeoutDuration(functionNode, SpecPointers.READ_TIMEOUT) - .ifPresent(specBuilder::withReadTimeoutDuration); - optionalTimeoutDuration(functionNode, SpecPointers.WRITE_TIMEOUT) - .ifPresent(specBuilder::withWriteTimeoutDuration); - - return specBuilder.build(); - case GRPC: - return new GrpcFunctionSpec(functionType, functionAddress(functionNode)); - default: - throw new IllegalArgumentException("Unrecognized function kind " + functionKind); - } - } - - private static Function<JsonNode, List<StateSpec>> functionStateParserOf( - FormatVersion formatVersion) { - switch (formatVersion) { - case v1_0: - return FunctionJsonEntity::functionStateSpecParserV1; - case v2_0: - return FunctionJsonEntity::functionStateSpecParserV2; - default: - throw new IllegalStateException("Unrecognized format version: " + formatVersion); - } - } - - private static List<StateSpec> functionStateSpecParserV1(JsonNode functionNode) { - final List<String> stateNames = Selectors.textListAt(functionNode, SpecPointers.STATES); - return stateNames.stream().map(StateSpec::new).collect(Collectors.toList()); - } - - private static List<StateSpec> functionStateSpecParserV2(JsonNode functionNode) { - final Iterable<? extends JsonNode> stateSpecNodes = - Selectors.listAt(functionNode, SpecPointers.STATES); - final List<StateSpec> stateSpecs = new ArrayList<>(); - - stateSpecNodes.forEach( - stateSpecNode -> { - final String name = Selectors.textAt(stateSpecNode, StateSpecPointers.NAME); - final Expiration expiration = stateTtlExpiration(stateSpecNode); - stateSpecs.add(new StateSpec(name, expiration)); - }); - return stateSpecs; - } - - private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) { - return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS); - } - - private static Optional<Duration> optionalTimeoutDuration( - JsonNode functionNode, JsonPointer timeoutPointer) { - return Selectors.optionalTextAt(functionNode, timeoutPointer).map(TimeUtils::parseDuration); - } - - private static Expiration stateTtlExpiration(JsonNode stateSpecNode) { - final Optional<Duration> duration = - Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION) - .map(TimeUtils::parseDuration); - - if (!duration.isPresent()) { - return Expiration.none(); - } - - final Optional<String> mode = - Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_MODE); - if (!mode.isPresent()) { - return Expiration.expireAfterReadingOrWriting(duration.get()); - } - - switch (mode.get()) { - case "after-invoke": - return Expiration.expireAfterReadingOrWriting(duration.get()); - case "after-write": - return Expiration.expireAfterWriting(duration.get()); - default: - throw new IllegalArgumentException( - "Invalid state ttl expire mode; must be one of [after-invoke, after-write]."); - } - } - - private static FunctionType functionType(JsonNode functionNode) { - String namespaceName = Selectors.textAt(functionNode, MetaPointers.TYPE); - NamespaceNamePair nn = NamespaceNamePair.from(namespaceName); - return new FunctionType(nn.namespace(), nn.name()); - } - - private static InetSocketAddress functionAddress(JsonNode functionNode) { - String host = Selectors.textAt(functionNode, SpecPointers.HOSTNAME); - int port = Selectors.integerAt(functionNode, SpecPointers.PORT); - return new InetSocketAddress(host, port); - } - - private static URI functionUri(JsonNode functionNode) { - String uri = Selectors.textAt(functionNode, SpecPointers.ENDPOINT); - URI typedUri = URI.create(uri); - @Nullable String scheme = typedUri.getScheme(); - if (scheme == null) { - throw new IllegalArgumentException( - "Missing scheme in function endpoint " - + uri - + "; an http or https scheme must be provided."); - } - if (scheme.equalsIgnoreCase("http") - || scheme.equalsIgnoreCase("https") - || scheme.equalsIgnoreCase("http+unix") - || scheme.equalsIgnoreCase("https+unix")) { - return typedUri; - } - throw new IllegalArgumentException( - "Missing scheme in function endpoint " - + uri - + "; an http or https or http+unix or https+unix scheme must be provided."); - } - - private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() { - return toMap(FunctionSpec::functionType, Function.identity()); - } - - private static StatefulFunctionProvider functionProvider( - Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) { - switch (kind) { - case HTTP: - return new HttpFunctionProvider( - transformValues(definedFunctions, HttpFunctionSpec.class::cast)); - case GRPC: - return new GrpcFunctionProvider( - transformValues(definedFunctions, GrpcFunctionSpec.class::cast)); - default: - throw new IllegalStateException("Unexpected value: " + kind); - } - } -} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java index 658bb9b..eb26c44 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java @@ -32,13 +32,6 @@ final class JsonModule implements StatefulFunctionModule { /** Entities in the JSON moduleSpecNode that should be parsed and bound to the module. */ private static final List<JsonEntity> ENTITIES = Arrays.asList( - new FunctionJsonEntity(), - new IngressJsonEntity(), - new RouterJsonEntity(), - new EgressJsonEntity()); - - private static final List<JsonEntity> V3_ENTITIES = - Arrays.asList( new FunctionEndpointJsonEntity(), new IngressJsonEntity(), new RouterJsonEntity(), @@ -56,11 +49,7 @@ final class JsonModule implements StatefulFunctionModule { public void configure(Map<String, String> conf, Binder binder) { try { - if (formatVersion == FormatVersion.v3_0) { - V3_ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion)); - } else { - ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion)); - } + ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion)); } catch (Throwable t) { throw new ModuleConfigurationException( format("Error while parsing module at %s", moduleUrl), t); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java index a507540..d5c7e77 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java @@ -97,8 +97,15 @@ public final class JsonServiceLoader { } private static FormatVersion requireValidFormatVersion(JsonNode root) { - final String formatVersion = Selectors.textAt(root, FORMAT_VERSION); - return FormatVersion.fromString(formatVersion); + final String formatVersionStr = Selectors.textAt(root, FORMAT_VERSION); + final FormatVersion formatVersion = FormatVersion.fromString(formatVersionStr); + if (formatVersion.compareTo(FormatVersion.v3_0) < 0) { + throw new IllegalArgumentException( + "Only format versions higher than or equal to 3.0 is supported. Was version " + + formatVersion + + "."); + } + return formatVersion; } @VisibleForTesting diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java index 857bcb6..6553a1a 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java @@ -23,8 +23,6 @@ import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import org.apache.flink.statefun.flink.core.httpfn.StateSpec; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec; @@ -39,20 +37,7 @@ public final class PersistedRemoteFunctionValues { @Persisted private final PersistedStateRegistry stateRegistry = new PersistedStateRegistry(); - private final Map<String, PersistedValue<byte[]>> managedStates; - - /** - * @deprecated {@link PersistedRemoteFunctionValues} should no longer be instantiated with eagerly - * declared state specs. State can now be dynamically registered with {@link - * #registerStates(List)}. This constructor will be removed once old module specification - * formats, which supports eager state declarations, are removed. - */ - @Deprecated - public PersistedRemoteFunctionValues(List<StateSpec> stateSpecs) { - Objects.requireNonNull(stateSpecs); - this.managedStates = new HashMap<>(stateSpecs.size()); - stateSpecs.forEach(this::createAndRegisterEagerValueState); - } + private final Map<String, PersistedValue<byte[]>> managedStates = new HashMap<>(); void attachStateValues(InvocationBatchRequest.Builder batchBuilder) { for (Map.Entry<String, PersistedValue<byte[]>> managedStateEntry : managedStates.entrySet()) { @@ -134,15 +119,6 @@ public final class PersistedRemoteFunctionValues { } } - private void createAndRegisterEagerValueState(StateSpec stateSpec) { - final String stateName = stateSpec.name(); - - final PersistedValue<byte[]> stateValue = - PersistedValue.of(stateName, byte[].class, stateSpec.ttlExpiration()); - stateRegistry.registerValue(stateValue); - managedStates.put(stateName, stateValue); - } - private PersistedValue<byte[]> getStateHandleOrThrow(String stateName) { final PersistedValue<byte[]> handle = managedStates.get(stateName); if (handle == null) { diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java index 01ee950..b2054f2 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java @@ -25,6 +25,7 @@ import com.google.protobuf.Any; import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.statefun.flink.core.backpressure.InternalContext; import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction; @@ -70,13 +71,16 @@ public final class RequestReplyFunction implements StatefulFunction { @Persisted private final PersistedRemoteFunctionValues managedStates; - public RequestReplyFunction( - PersistedRemoteFunctionValues managedStates, - int maxNumBatchRequests, - RequestReplyClient client) { - this.managedStates = Objects.requireNonNull(managedStates); - this.client = Objects.requireNonNull(client); + public RequestReplyFunction(int maxNumBatchRequests, RequestReplyClient client) { + this(new PersistedRemoteFunctionValues(), maxNumBatchRequests, client); + } + + @VisibleForTesting + RequestReplyFunction( + PersistedRemoteFunctionValues states, int maxNumBatchRequests, RequestReplyClient client) { + this.managedStates = Objects.requireNonNull(states); this.maxNumBatchRequests = maxNumBatchRequests; + this.client = Objects.requireNonNull(client); } @Override diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java index 3001684..92014a9 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java @@ -17,14 +17,16 @@ */ package org.apache.flink.statefun.flink.core.jsonmodule; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import com.google.protobuf.Any; import com.google.protobuf.Message; import java.net.URL; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; @@ -35,24 +37,10 @@ import org.apache.flink.statefun.sdk.io.EgressIdentifier; import org.apache.flink.statefun.sdk.io.IngressIdentifier; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) public class JsonModuleTest { - @Parameterized.Parameters(name = "Format version = {0}, module path = \"{1}\"") - public static Collection<?> modules() { - return Arrays.asList( - new Object[] {FormatVersion.v1_0, "module-v1_0/module.yaml"}, - new Object[] {FormatVersion.v2_0, "module-v2_0/module.yaml"}); - } - - private final String modulePath; - - public JsonModuleTest(FormatVersion ignored, String modulePath) { - this.modulePath = modulePath; - } + private static final String modulePath = "module-v3_0/module.yaml"; @Test public void exampleUsage() { @@ -71,9 +59,10 @@ public class JsonModuleTest { assertThat( universe.functions(), allOf( - hasKey(new FunctionType("com.example", "hello")), - hasKey(new FunctionType("com.foo", "world")), - hasKey(new FunctionType("com.bar", "world")))); + hasKey(new FunctionType("com.foo.bar", "specific_function")), + hasKey(new FunctionType("com.other.namespace", "hello")))); + + assertThat(universe.namespaceFunctions(), hasKey("com.foo.bar")); } @Test diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java deleted file mode 100644 index fcbb885..0000000 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.statefun.flink.core.jsonmodule; - -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; - -import com.google.protobuf.Any; -import com.google.protobuf.Message; -import java.net.URL; -import java.util.Collections; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; -import org.apache.flink.statefun.flink.core.message.MessageFactoryKey; -import org.apache.flink.statefun.flink.core.message.MessageFactoryType; -import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.io.EgressIdentifier; -import org.apache.flink.statefun.sdk.io.IngressIdentifier; -import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; -import org.junit.Test; - -public class JsonModuleV3Test { - - private static final String modulePath = "module-v3_0/module.yaml"; - - @Test - public void exampleUsage() { - StatefulFunctionModule module = fromPath(modulePath); - - assertThat(module, notNullValue()); - } - - @Test - public void testFunctions() { - StatefulFunctionModule module = fromPath(modulePath); - - StatefulFunctionsUniverse universe = emptyUniverse(); - module.configure(Collections.emptyMap(), universe); - - assertThat( - universe.functions(), - allOf( - hasKey(new FunctionType("com.foo.bar", "specific_function")), - hasKey(new FunctionType("com.other.namespace", "hello")))); - - assertThat(universe.namespaceFunctions(), hasKey("com.foo.bar")); - } - - @Test - public void testRouters() { - StatefulFunctionModule module = fromPath(modulePath); - - StatefulFunctionsUniverse universe = emptyUniverse(); - module.configure(Collections.emptyMap(), universe); - - assertThat( - universe.routers(), - hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", "names"))); - } - - @Test - public void testIngresses() { - StatefulFunctionModule module = fromPath(modulePath); - - StatefulFunctionsUniverse universe = emptyUniverse(); - module.configure(Collections.emptyMap(), universe); - - assertThat( - universe.ingress(), - hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", "names"))); - } - - @Test - public void testEgresses() { - StatefulFunctionModule module = fromPath(modulePath); - - StatefulFunctionsUniverse universe = emptyUniverse(); - module.configure(Collections.emptyMap(), universe); - - assertThat( - universe.egress(), hasKey(new EgressIdentifier<>("com.mycomp.foo", "bar", Any.class))); - } - - private static StatefulFunctionModule fromPath(String path) { - URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path); - assertThat(moduleUrl, not(nullValue())); - ObjectMapper mapper = JsonServiceLoader.mapper(); - return JsonServiceLoader.fromUrl(mapper, moduleUrl); - } - - private static StatefulFunctionsUniverse emptyUniverse() { - return new StatefulFunctionsUniverse( - MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null)); - } -} diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java index f633fc2..48dadc1 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java @@ -35,8 +35,7 @@ public class PersistedRemoteFunctionValuesTest { @Test public void exampleUsage() { - final PersistedRemoteFunctionValues values = - new PersistedRemoteFunctionValues(Collections.emptyList()); + final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues(); // --- register persisted states values.registerStates( @@ -63,8 +62,7 @@ public class PersistedRemoteFunctionValuesTest { @Test public void zeroRegisteredStates() { - final PersistedRemoteFunctionValues values = - new PersistedRemoteFunctionValues(Collections.emptyList()); + final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues(); final InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder(); values.attachStateValues(builder); @@ -74,8 +72,7 @@ public class PersistedRemoteFunctionValuesTest { @Test(expected = IllegalStateException.class) public void updatingNonRegisteredStateShouldThrow() { - final PersistedRemoteFunctionValues values = - new PersistedRemoteFunctionValues(Collections.emptyList()); + final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues(); values.updateStateValues( Collections.singletonList( @@ -85,8 +82,7 @@ public class PersistedRemoteFunctionValuesTest { @Test public void registeredStateWithEmptyValueShouldBeAttached() { - final PersistedRemoteFunctionValues values = - new PersistedRemoteFunctionValues(Collections.emptyList()); + final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues(); values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state"))); @@ -99,8 +95,7 @@ public class PersistedRemoteFunctionValuesTest { @Test public void registeredStateWithDeletedValueShouldBeAttached() { - final PersistedRemoteFunctionValues values = - new PersistedRemoteFunctionValues(Collections.emptyList()); + final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues(); values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state"))); @@ -120,8 +115,7 @@ public class PersistedRemoteFunctionValuesTest { @Test public void duplicateRegistrationsHasNoEffect() { - final PersistedRemoteFunctionValues values = - new PersistedRemoteFunctionValues(Collections.emptyList()); + final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues(); values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state"))); values.updateStateValues( diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java index c4eb85a..d545281 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java @@ -40,7 +40,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.flink.statefun.flink.core.TestUtils; import org.apache.flink.statefun.flink.core.backpressure.InternalContext; -import org.apache.flink.statefun.flink.core.httpfn.StateSpec; import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics; import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction; @@ -66,11 +65,9 @@ public class RequestReplyFunctionTest { private final FakeClient client = new FakeClient(); private final FakeContext context = new FakeContext(); - private final PersistedRemoteFunctionValues states = - new PersistedRemoteFunctionValues(Collections.singletonList(new StateSpec("session"))); private final RequestReplyFunction functionUnderTest = - new RequestReplyFunction(states, 10, client); + new RequestReplyFunction(testInitialRegisteredState("session"), 10, client); @Test public void example() { @@ -116,7 +113,7 @@ public class RequestReplyFunctionTest { @Test public void reachingABatchLimitTriggersBackpressure() { - RequestReplyFunction functionUnderTest = new RequestReplyFunction(states, 2, client); + RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client); // send one message functionUnderTest.invoke(context, Any.getDefaultInstance()); @@ -132,7 +129,7 @@ public class RequestReplyFunctionTest { @Test public void returnedMessageReleaseBackpressure() { - RequestReplyFunction functionUnderTest = new RequestReplyFunction(states, 2, client); + RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client); // the following invocations should cause backpressure functionUnderTest.invoke(context, Any.getDefaultInstance()); @@ -273,6 +270,15 @@ public class RequestReplyFunctionTest { assertThat(context.functionTypeMetrics().numBacklog, is(0)); } + private static PersistedRemoteFunctionValues testInitialRegisteredState( + String existingStateName) { + final PersistedRemoteFunctionValues states = new PersistedRemoteFunctionValues(); + states.registerStates( + Collections.singletonList( + PersistedValueSpec.newBuilder().setStateName(existingStateName).build())); + return states; + } + private static AsyncOperationResult<Object, FromFunction> successfulAsyncOperation() { return new AsyncOperationResult<>( new Object(), Status.SUCCESS, FromFunction.getDefaultInstance(), null); diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java index 58c382c..d24636b 100644 --- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java +++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java @@ -21,10 +21,10 @@ package org.apache.flink.statefun.flink.datastream; import java.net.URI; import java.time.Duration; import org.apache.flink.annotation.Internal; -import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec; -import org.apache.flink.statefun.flink.core.httpfn.StateSpec; +import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec; +import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.Target; +import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.UrlPathTemplate; import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.state.Expiration; /** A Builder for RequestReply remote function type. */ public class RequestReplyFunctionBuilder { @@ -41,33 +41,12 @@ public class RequestReplyFunctionBuilder { return new RequestReplyFunctionBuilder(functionType, endpoint); } - private final HttpFunctionSpec.Builder builder; + private final HttpFunctionEndpointSpec.Builder builder; private RequestReplyFunctionBuilder(FunctionType functionType, URI endpoint) { - this.builder = HttpFunctionSpec.builder(functionType, endpoint); - } - - /** - * Declares a remote function state. - * - * @param name the name of the state to be used remotely. - * @return this builder. - */ - public RequestReplyFunctionBuilder withPersistedState(String name) { - builder.withState(new StateSpec(name, Expiration.none())); - return this; - } - - /** - * Declares a remote function state, with expiration. - * - * @param name the name of the state to be used remotely. - * @param ttlExpiration the expiration mode for which this state might be deleted. - * @return this builder. - */ - public RequestReplyFunctionBuilder withExpiringState(String name, Expiration ttlExpiration) { - builder.withState(new StateSpec(name, ttlExpiration)); - return this; + this.builder = + HttpFunctionEndpointSpec.builder( + Target.functionType(functionType), new UrlPathTemplate(endpoint.toASCIIString())); } /** @@ -127,7 +106,7 @@ public class RequestReplyFunctionBuilder { } @Internal - HttpFunctionSpec spec() { + HttpFunctionEndpointSpec spec() { return builder.build(); } } diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java index 28ac564..eed278c 100644 --- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java +++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java @@ -18,13 +18,14 @@ package org.apache.flink.statefun.flink.datastream; +import java.util.Collections; import java.util.Map; import java.util.Objects; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.apache.flink.annotation.Internal; +import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec; import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider; -import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.StatefulFunction; @@ -34,17 +35,17 @@ final class SerializableHttpFunctionProvider implements SerializableStatefulFunc private static final long serialVersionUID = 1; - private final Map<FunctionType, HttpFunctionSpec> supportedTypes; + private final Map<FunctionType, HttpFunctionEndpointSpec> supportedTypes; private transient @Nullable HttpFunctionProvider delegate; - SerializableHttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) { + SerializableHttpFunctionProvider(Map<FunctionType, HttpFunctionEndpointSpec> supportedTypes) { this.supportedTypes = Objects.requireNonNull(supportedTypes); } @Override public StatefulFunction functionOfType(FunctionType type) { if (delegate == null) { - delegate = new HttpFunctionProvider(supportedTypes); + delegate = new HttpFunctionProvider(supportedTypes, Collections.emptyMap()); } return delegate.functionOfType(type); } diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java index 39bc028..58dbb92 100644 --- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java +++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java @@ -29,7 +29,7 @@ import javax.annotation.Nullable; import org.apache.flink.shaded.guava18.com.google.common.base.Optional; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; import org.apache.flink.statefun.flink.core.feedback.FeedbackKey; -import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec; +import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec; import org.apache.flink.statefun.flink.core.message.Message; import org.apache.flink.statefun.flink.core.message.RoutableMessage; import org.apache.flink.statefun.flink.core.translation.EmbeddedTranslator; @@ -60,7 +60,7 @@ public final class StatefulFunctionDataStreamBuilder { private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList<>(); private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders = new HashMap<>(); - private final Map<FunctionType, HttpFunctionSpec> requestReplyFunctions = new HashMap<>(); + private final Map<FunctionType, HttpFunctionEndpointSpec> requestReplyFunctions = new HashMap<>(); private final Set<EgressIdentifier<?>> egressesIds = new LinkedHashSet<>(); @Nullable private StatefulFunctionsConfig config; @@ -102,8 +102,8 @@ public final class StatefulFunctionDataStreamBuilder { public StatefulFunctionDataStreamBuilder withRequestReplyRemoteFunction( RequestReplyFunctionBuilder builder) { Objects.requireNonNull(builder); - HttpFunctionSpec spec = builder.spec(); - putAndThrowIfPresent(requestReplyFunctions, spec.functionType(), spec); + HttpFunctionEndpointSpec spec = builder.spec(); + putAndThrowIfPresent(requestReplyFunctions, spec.target().asSpecificFunctionType(), spec); return this; } diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java index d4faf27..ea3cded 100644 --- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java +++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java @@ -18,9 +18,12 @@ package org.apache.flink.statefun.sdk; +import java.io.Serializable; import java.util.Objects; -public final class FunctionTypeNamespaceMatcher { +public final class FunctionTypeNamespaceMatcher implements Serializable { + + private static final long serialVersionUID = 1; private final String targetNamespace;
