This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 8482d949d81cb044fc4e91b6e0f169fa342cc8b3 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Mon Nov 30 15:05:00 2020 +0800 [FLINK-20334] [sdk] Support binding function providers per namespace --- .../flink/core/StatefulFunctionsUniverse.java | 21 +++- .../core/StatefulFunctionsUniverseValidator.java | 3 +- .../core/functions/PredefinedFunctionLoader.java | 25 +++-- .../statefun/flink/core/functions/Reductions.java | 2 + .../functions/PredefinedFunctionLoaderTest.java | 118 +++++++++++++++++++++ .../statefun/sdk/FunctionTypeNamespaceMatcher.java | 64 +++++++++++ .../statefun/sdk/spi/StatefulFunctionModule.java | 17 ++- 7 files changed, 239 insertions(+), 11 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java index bbf6661..9c39977 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java @@ -30,6 +30,7 @@ import org.apache.flink.statefun.flink.io.spi.SinkProvider; import org.apache.flink.statefun.flink.io.spi.SourceProvider; import org.apache.flink.statefun.sdk.EgressType; import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher; import org.apache.flink.statefun.sdk.IngressType; import org.apache.flink.statefun.sdk.StatefulFunctionProvider; import org.apache.flink.statefun.sdk.io.EgressIdentifier; @@ -45,7 +46,9 @@ public final class StatefulFunctionsUniverse private final Map<IngressIdentifier<?>, IngressSpec<?>> ingress = new HashMap<>(); private final Map<EgressIdentifier<?>, EgressSpec<?>> egress = new HashMap<>(); private final Map<IngressIdentifier<?>, List<Router<?>>> routers = new HashMap<>(); - private final Map<FunctionType, StatefulFunctionProvider> functions = new HashMap<>(); + private final Map<FunctionType, StatefulFunctionProvider> specificFunctionProviders = + new HashMap<>(); + private final Map<String, StatefulFunctionProvider> namespaceFunctionProviders = new HashMap<>(); private final Map<IngressType, SourceProvider> sources = new HashMap<>(); private final Map<EgressType, SinkProvider> sinks = new HashMap<>(); @@ -85,7 +88,15 @@ public final class StatefulFunctionsUniverse public void bindFunctionProvider(FunctionType functionType, StatefulFunctionProvider provider) { Objects.requireNonNull(functionType); Objects.requireNonNull(provider); - putAndThrowIfPresent(functions, functionType, provider); + putAndThrowIfPresent(specificFunctionProviders, functionType, provider); + } + + @Override + public void bindFunctionProvider( + FunctionTypeNamespaceMatcher namespaceMatcher, StatefulFunctionProvider provider) { + Objects.requireNonNull(namespaceMatcher); + Objects.requireNonNull(provider); + putAndThrowIfPresent(namespaceFunctionProviders, namespaceMatcher.targetNamespace(), provider); } @Override @@ -114,7 +125,11 @@ public final class StatefulFunctionsUniverse } public Map<FunctionType, StatefulFunctionProvider> functions() { - return functions; + return specificFunctionProviders; + } + + public Map<String, StatefulFunctionProvider> namespaceFunctions() { + return namespaceFunctionProviders; } public Map<IngressType, SourceProvider> sources() { diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseValidator.java index a149401..8076da8 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseValidator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseValidator.java @@ -30,7 +30,8 @@ final class StatefulFunctionsUniverseValidator { if (statefulFunctionsUniverse.routers().isEmpty()) { throw new IllegalStateException("There are no routers defined."); } - if (statefulFunctionsUniverse.functions().isEmpty()) { + if (statefulFunctionsUniverse.functions().isEmpty() + && statefulFunctionsUniverse.namespaceFunctions().isEmpty()) { throw new IllegalStateException("There are no function providers defined."); } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoader.java index 0c94ddd..4ddcbd3 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoader.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoader.java @@ -29,21 +29,22 @@ import org.apache.flink.statefun.sdk.StatefulFunctionProvider; /** An {@link FunctionLoader} that has a predefined set of {@link StatefulFunctionProvider}s. */ final class PredefinedFunctionLoader implements FunctionLoader { private final Map<FunctionType, StatefulFunctionProvider> functionProviders; + private final Map<String, StatefulFunctionProvider> namespaceFunctionProviders; @Inject PredefinedFunctionLoader( - @Label("function-providers") Map<FunctionType, StatefulFunctionProvider> functionProviders) { + @Label("function-providers") Map<FunctionType, StatefulFunctionProvider> functionProviders, + @Label("namespace-function-providers") + Map<String, StatefulFunctionProvider> namespaceFunctionProviders) { this.functionProviders = Objects.requireNonNull(functionProviders); + this.namespaceFunctionProviders = Objects.requireNonNull(namespaceFunctionProviders); } @Override public StatefulFunction load(FunctionType functionType) { Objects.requireNonNull(functionType); - StatefulFunctionProvider provider = functionProviders.get(functionType); - if (provider == null) { - throw new IllegalArgumentException("Unknown provider for type " + functionType); - } - StatefulFunction statefulFunction = load(provider, functionType); + final StatefulFunctionProvider provider = getFunctionProviderOrThrow(functionType); + final StatefulFunction statefulFunction = load(provider, functionType); if (statefulFunction == null) { throw new IllegalStateException( "A provider for a type " + functionType + " has produced a NULL function"); @@ -51,6 +52,18 @@ final class PredefinedFunctionLoader implements FunctionLoader { return statefulFunction; } + private StatefulFunctionProvider getFunctionProviderOrThrow(FunctionType functionType) { + StatefulFunctionProvider provider = functionProviders.get(functionType); + if (provider != null) { + return provider; + } + provider = namespaceFunctionProviders.get(functionType.namespace()); + if (provider != null) { + return provider; + } + throw new IllegalArgumentException("Cannot find a provider for type " + functionType); + } + private static StatefulFunction load( StatefulFunctionProvider provider, FunctionType functionType) { try (SetContextClassLoader ignored = new SetContextClassLoader(provider)) { diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java index 4a90a20..55b521f 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java @@ -73,6 +73,8 @@ final class Reductions { container.add("function-providers", Map.class, statefulFunctionsUniverse.functions()); container.add( + "namespace-function-providers", Map.class, statefulFunctionsUniverse.namespaceFunctions()); + container.add( "function-repository", FunctionRepository.class, StatefulFunctionRepository.class); container.addAlias( "function-metrics-repository", diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoaderTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoaderTest.java new file mode 100644 index 0000000..0c12b49 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoaderTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.functions; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNull.notNullValue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.statefun.sdk.*; +import org.apache.flink.statefun.sdk.StatefulFunction; +import org.junit.Test; + +public class PredefinedFunctionLoaderTest { + + private static final FunctionType TEST_TYPE = new FunctionType("namespace", "name"); + + @Test + public void exampleUsage() { + PredefinedFunctionLoader loader = + new PredefinedFunctionLoader(specificFunctionProviders(), Collections.emptyMap()); + + StatefulFunction function = loader.load(TEST_TYPE); + + assertThat(function, notNullValue()); + } + + @Test + public void withOnlyPerNamespaceFunctionProviders() { + PredefinedFunctionLoader loader = + new PredefinedFunctionLoader(Collections.emptyMap(), perNamespaceFunctionProviders()); + + StatefulFunction function = loader.load(TEST_TYPE); + + assertThat(function, notNullValue()); + } + + @Test + public void specificFunctionProvidersHigherPrecedence() { + PredefinedFunctionLoader loader = + new PredefinedFunctionLoader(specificFunctionProviders(), perNamespaceFunctionProviders()); + + StatefulFunction function = loader.load(TEST_TYPE); + + assertThat(function, instanceOf(StatefulFunctionA.class)); + } + + @Test(expected = IllegalArgumentException.class) + public void nullLoadedFunctions() { + PredefinedFunctionLoader loader = + new PredefinedFunctionLoader(specificFunctionProviders(), Collections.emptyMap()); + + loader.load(new FunctionType("doesn't", "exist")); + } + + private static Map<FunctionType, StatefulFunctionProvider> specificFunctionProviders() { + final Map<FunctionType, StatefulFunctionProvider> providers = new HashMap<>(); + providers.put( + new FunctionType(TEST_TYPE.namespace(), TEST_TYPE.name()), new SpecificFunctionProvider()); + return providers; + } + + private static Map<String, StatefulFunctionProvider> perNamespaceFunctionProviders() { + final Map<String, StatefulFunctionProvider> providers = new HashMap<>(); + providers.put(TEST_TYPE.namespace(), new PerNamespaceFunctionProvider()); + return providers; + } + + private static class SpecificFunctionProvider implements StatefulFunctionProvider { + @Override + public org.apache.flink.statefun.sdk.StatefulFunction functionOfType(FunctionType type) { + if (type.equals(TEST_TYPE)) { + return new StatefulFunctionA(); + } else { + return null; + } + } + } + + private static class PerNamespaceFunctionProvider implements StatefulFunctionProvider { + @Override + public org.apache.flink.statefun.sdk.StatefulFunction functionOfType(FunctionType type) { + if (type.equals(TEST_TYPE)) { + return new StatefulFunctionB(); + } else { + return null; + } + } + } + + private static class StatefulFunctionA implements StatefulFunction { + @Override + public void invoke(Context context, Object input) {} + } + + private static class StatefulFunctionB implements StatefulFunction { + @Override + public void invoke(Context context, Object input) {} + } +} diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java new file mode 100644 index 0000000..d4faf27 --- /dev/null +++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.sdk; + +import java.util.Objects; + +public final class FunctionTypeNamespaceMatcher { + + private final String targetNamespace; + + public static FunctionTypeNamespaceMatcher targetNamespace(String namespace) { + return new FunctionTypeNamespaceMatcher(namespace); + } + + private FunctionTypeNamespaceMatcher(String targetNamespace) { + this.targetNamespace = Objects.requireNonNull(targetNamespace); + } + + public String targetNamespace() { + return targetNamespace; + } + + public boolean matches(FunctionType functionType) { + return targetNamespace.equals(functionType.namespace()); + } + + @Override + public int hashCode() { + return targetNamespace.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + FunctionTypeNamespaceMatcher other = (FunctionTypeNamespaceMatcher) obj; + return targetNamespace.equals(other.targetNamespace); + } + + @Override + public String toString() { + return String.format("FunctionTypeNamespaceMatcher(%s)", targetNamespace); + } +} diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java index 44efefd..d5b6b71 100644 --- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java +++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java @@ -19,6 +19,7 @@ package org.apache.flink.statefun.sdk.spi; import java.util.Map; import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher; import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.StatefulFunctionProvider; import org.apache.flink.statefun.sdk.io.EgressSpec; @@ -94,7 +95,8 @@ public interface StatefulFunctionModule { <T> void bindEgress(EgressSpec<T> spec); /** - * Binds a {@link StatefulFunctionProvider} to the Stateful Functions application. + * Binds a {@link StatefulFunctionProvider} to the Stateful Functions application for a specific + * {@link FunctionType}. * * @param functionType the type of functions that the {@link StatefulFunctionProvider} provides. * @param provider the provider to bind. @@ -102,6 +104,19 @@ public interface StatefulFunctionModule { void bindFunctionProvider(FunctionType functionType, StatefulFunctionProvider provider); /** + * Binds a {@link StatefulFunctionProvider} to the Stateful Functions application for all + * functions under the specified namespace. If a provider was bound for a specific function type + * using {@link #bindFunctionProvider(FunctionType, StatefulFunctionProvider)}, that provider + * would be used instead. + * + * @param namespaceMatcher matcher for the target namespace of functions that the {@link + * StatefulFunctionProvider} provides. + * @param provider the provider to bind. + */ + void bindFunctionProvider( + FunctionTypeNamespaceMatcher namespaceMatcher, StatefulFunctionProvider provider); + + /** * Binds a {@link Router} for a given ingress to the Stateful Functions application. * * @param id the id of the ingress to bind the router to.
