This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 8482d949d81cb044fc4e91b6e0f169fa342cc8b3
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Mon Nov 30 15:05:00 2020 +0800

    [FLINK-20334] [sdk] Support binding function providers per namespace
---
 .../flink/core/StatefulFunctionsUniverse.java      |  21 +++-
 .../core/StatefulFunctionsUniverseValidator.java   |   3 +-
 .../core/functions/PredefinedFunctionLoader.java   |  25 +++--
 .../statefun/flink/core/functions/Reductions.java  |   2 +
 .../functions/PredefinedFunctionLoaderTest.java    | 118 +++++++++++++++++++++
 .../statefun/sdk/FunctionTypeNamespaceMatcher.java |  64 +++++++++++
 .../statefun/sdk/spi/StatefulFunctionModule.java   |  17 ++-
 7 files changed, 239 insertions(+), 11 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
index bbf6661..9c39977 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
@@ -30,6 +30,7 @@ import org.apache.flink.statefun.flink.io.spi.SinkProvider;
 import org.apache.flink.statefun.flink.io.spi.SourceProvider;
 import org.apache.flink.statefun.sdk.EgressType;
 import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
 import org.apache.flink.statefun.sdk.IngressType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
@@ -45,7 +46,9 @@ public final class StatefulFunctionsUniverse
   private final Map<IngressIdentifier<?>, IngressSpec<?>> ingress = new 
HashMap<>();
   private final Map<EgressIdentifier<?>, EgressSpec<?>> egress = new 
HashMap<>();
   private final Map<IngressIdentifier<?>, List<Router<?>>> routers = new 
HashMap<>();
-  private final Map<FunctionType, StatefulFunctionProvider> functions = new 
HashMap<>();
+  private final Map<FunctionType, StatefulFunctionProvider> 
specificFunctionProviders =
+      new HashMap<>();
+  private final Map<String, StatefulFunctionProvider> 
namespaceFunctionProviders = new HashMap<>();
   private final Map<IngressType, SourceProvider> sources = new HashMap<>();
   private final Map<EgressType, SinkProvider> sinks = new HashMap<>();
 
@@ -85,7 +88,15 @@ public final class StatefulFunctionsUniverse
   public void bindFunctionProvider(FunctionType functionType, 
StatefulFunctionProvider provider) {
     Objects.requireNonNull(functionType);
     Objects.requireNonNull(provider);
-    putAndThrowIfPresent(functions, functionType, provider);
+    putAndThrowIfPresent(specificFunctionProviders, functionType, provider);
+  }
+
+  @Override
+  public void bindFunctionProvider(
+      FunctionTypeNamespaceMatcher namespaceMatcher, StatefulFunctionProvider 
provider) {
+    Objects.requireNonNull(namespaceMatcher);
+    Objects.requireNonNull(provider);
+    putAndThrowIfPresent(namespaceFunctionProviders, 
namespaceMatcher.targetNamespace(), provider);
   }
 
   @Override
@@ -114,7 +125,11 @@ public final class StatefulFunctionsUniverse
   }
 
   public Map<FunctionType, StatefulFunctionProvider> functions() {
-    return functions;
+    return specificFunctionProviders;
+  }
+
+  public Map<String, StatefulFunctionProvider> namespaceFunctions() {
+    return namespaceFunctionProviders;
   }
 
   public Map<IngressType, SourceProvider> sources() {
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseValidator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseValidator.java
index a149401..8076da8 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseValidator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseValidator.java
@@ -30,7 +30,8 @@ final class StatefulFunctionsUniverseValidator {
     if (statefulFunctionsUniverse.routers().isEmpty()) {
       throw new IllegalStateException("There are no routers defined.");
     }
-    if (statefulFunctionsUniverse.functions().isEmpty()) {
+    if (statefulFunctionsUniverse.functions().isEmpty()
+        && statefulFunctionsUniverse.namespaceFunctions().isEmpty()) {
       throw new IllegalStateException("There are no function providers 
defined.");
     }
   }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoader.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoader.java
index 0c94ddd..4ddcbd3 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoader.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoader.java
@@ -29,21 +29,22 @@ import 
org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 /** An {@link FunctionLoader} that has a predefined set of {@link 
StatefulFunctionProvider}s. */
 final class PredefinedFunctionLoader implements FunctionLoader {
   private final Map<FunctionType, StatefulFunctionProvider> functionProviders;
+  private final Map<String, StatefulFunctionProvider> 
namespaceFunctionProviders;
 
   @Inject
   PredefinedFunctionLoader(
-      @Label("function-providers") Map<FunctionType, StatefulFunctionProvider> 
functionProviders) {
+      @Label("function-providers") Map<FunctionType, StatefulFunctionProvider> 
functionProviders,
+      @Label("namespace-function-providers")
+          Map<String, StatefulFunctionProvider> namespaceFunctionProviders) {
     this.functionProviders = Objects.requireNonNull(functionProviders);
+    this.namespaceFunctionProviders = 
Objects.requireNonNull(namespaceFunctionProviders);
   }
 
   @Override
   public StatefulFunction load(FunctionType functionType) {
     Objects.requireNonNull(functionType);
-    StatefulFunctionProvider provider = functionProviders.get(functionType);
-    if (provider == null) {
-      throw new IllegalArgumentException("Unknown provider for type " + 
functionType);
-    }
-    StatefulFunction statefulFunction = load(provider, functionType);
+    final StatefulFunctionProvider provider = 
getFunctionProviderOrThrow(functionType);
+    final StatefulFunction statefulFunction = load(provider, functionType);
     if (statefulFunction == null) {
       throw new IllegalStateException(
           "A provider for a type " + functionType + " has produced a NULL 
function");
@@ -51,6 +52,18 @@ final class PredefinedFunctionLoader implements 
FunctionLoader {
     return statefulFunction;
   }
 
+  private StatefulFunctionProvider getFunctionProviderOrThrow(FunctionType 
functionType) {
+    StatefulFunctionProvider provider = functionProviders.get(functionType);
+    if (provider != null) {
+      return provider;
+    }
+    provider = namespaceFunctionProviders.get(functionType.namespace());
+    if (provider != null) {
+      return provider;
+    }
+    throw new IllegalArgumentException("Cannot find a provider for type " + 
functionType);
+  }
+
   private static StatefulFunction load(
       StatefulFunctionProvider provider, FunctionType functionType) {
     try (SetContextClassLoader ignored = new SetContextClassLoader(provider)) {
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
index 4a90a20..55b521f 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
@@ -73,6 +73,8 @@ final class Reductions {
 
     container.add("function-providers", Map.class, 
statefulFunctionsUniverse.functions());
     container.add(
+        "namespace-function-providers", Map.class, 
statefulFunctionsUniverse.namespaceFunctions());
+    container.add(
         "function-repository", FunctionRepository.class, 
StatefulFunctionRepository.class);
     container.addAlias(
         "function-metrics-repository",
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoaderTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoaderTest.java
new file mode 100644
index 0000000..0c12b49
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PredefinedFunctionLoaderTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.functions;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNull.notNullValue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.*;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.junit.Test;
+
+public class PredefinedFunctionLoaderTest {
+
+  private static final FunctionType TEST_TYPE = new FunctionType("namespace", 
"name");
+
+  @Test
+  public void exampleUsage() {
+    PredefinedFunctionLoader loader =
+        new PredefinedFunctionLoader(specificFunctionProviders(), 
Collections.emptyMap());
+
+    StatefulFunction function = loader.load(TEST_TYPE);
+
+    assertThat(function, notNullValue());
+  }
+
+  @Test
+  public void withOnlyPerNamespaceFunctionProviders() {
+    PredefinedFunctionLoader loader =
+        new PredefinedFunctionLoader(Collections.emptyMap(), 
perNamespaceFunctionProviders());
+
+    StatefulFunction function = loader.load(TEST_TYPE);
+
+    assertThat(function, notNullValue());
+  }
+
+  @Test
+  public void specificFunctionProvidersHigherPrecedence() {
+    PredefinedFunctionLoader loader =
+        new PredefinedFunctionLoader(specificFunctionProviders(), 
perNamespaceFunctionProviders());
+
+    StatefulFunction function = loader.load(TEST_TYPE);
+
+    assertThat(function, instanceOf(StatefulFunctionA.class));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullLoadedFunctions() {
+    PredefinedFunctionLoader loader =
+        new PredefinedFunctionLoader(specificFunctionProviders(), 
Collections.emptyMap());
+
+    loader.load(new FunctionType("doesn't", "exist"));
+  }
+
+  private static Map<FunctionType, StatefulFunctionProvider> 
specificFunctionProviders() {
+    final Map<FunctionType, StatefulFunctionProvider> providers = new 
HashMap<>();
+    providers.put(
+        new FunctionType(TEST_TYPE.namespace(), TEST_TYPE.name()), new 
SpecificFunctionProvider());
+    return providers;
+  }
+
+  private static Map<String, StatefulFunctionProvider> 
perNamespaceFunctionProviders() {
+    final Map<String, StatefulFunctionProvider> providers = new HashMap<>();
+    providers.put(TEST_TYPE.namespace(), new PerNamespaceFunctionProvider());
+    return providers;
+  }
+
+  private static class SpecificFunctionProvider implements 
StatefulFunctionProvider {
+    @Override
+    public org.apache.flink.statefun.sdk.StatefulFunction 
functionOfType(FunctionType type) {
+      if (type.equals(TEST_TYPE)) {
+        return new StatefulFunctionA();
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class PerNamespaceFunctionProvider implements 
StatefulFunctionProvider {
+    @Override
+    public org.apache.flink.statefun.sdk.StatefulFunction 
functionOfType(FunctionType type) {
+      if (type.equals(TEST_TYPE)) {
+        return new StatefulFunctionB();
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class StatefulFunctionA implements StatefulFunction {
+    @Override
+    public void invoke(Context context, Object input) {}
+  }
+
+  private static class StatefulFunctionB implements StatefulFunction {
+    @Override
+    public void invoke(Context context, Object input) {}
+  }
+}
diff --git 
a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java
 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java
new file mode 100644
index 0000000..d4faf27
--- /dev/null
+++ 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk;
+
+import java.util.Objects;
+
+public final class FunctionTypeNamespaceMatcher {
+
+  private final String targetNamespace;
+
+  public static FunctionTypeNamespaceMatcher targetNamespace(String namespace) 
{
+    return new FunctionTypeNamespaceMatcher(namespace);
+  }
+
+  private FunctionTypeNamespaceMatcher(String targetNamespace) {
+    this.targetNamespace = Objects.requireNonNull(targetNamespace);
+  }
+
+  public String targetNamespace() {
+    return targetNamespace;
+  }
+
+  public boolean matches(FunctionType functionType) {
+    return targetNamespace.equals(functionType.namespace());
+  }
+
+  @Override
+  public int hashCode() {
+    return targetNamespace.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    FunctionTypeNamespaceMatcher other = (FunctionTypeNamespaceMatcher) obj;
+    return targetNamespace.equals(other.targetNamespace);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("FunctionTypeNamespaceMatcher(%s)", targetNamespace);
+  }
+}
diff --git 
a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java
 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java
index 44efefd..d5b6b71 100644
--- 
a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java
+++ 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.sdk.spi;
 
 import java.util.Map;
 import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.io.EgressSpec;
@@ -94,7 +95,8 @@ public interface StatefulFunctionModule {
     <T> void bindEgress(EgressSpec<T> spec);
 
     /**
-     * Binds a {@link StatefulFunctionProvider} to the Stateful Functions 
application.
+     * Binds a {@link StatefulFunctionProvider} to the Stateful Functions 
application for a specific
+     * {@link FunctionType}.
      *
      * @param functionType the type of functions that the {@link 
StatefulFunctionProvider} provides.
      * @param provider the provider to bind.
@@ -102,6 +104,19 @@ public interface StatefulFunctionModule {
     void bindFunctionProvider(FunctionType functionType, 
StatefulFunctionProvider provider);
 
     /**
+     * Binds a {@link StatefulFunctionProvider} to the Stateful Functions 
application for all
+     * functions under the specified namespace. If a provider was bound for a 
specific function type
+     * using {@link #bindFunctionProvider(FunctionType, 
StatefulFunctionProvider)}, that provider
+     * would be used instead.
+     *
+     * @param namespaceMatcher matcher for the target namespace of functions 
that the {@link
+     *     StatefulFunctionProvider} provides.
+     * @param provider the provider to bind.
+     */
+    void bindFunctionProvider(
+        FunctionTypeNamespaceMatcher namespaceMatcher, 
StatefulFunctionProvider provider);
+
+    /**
      * Binds a {@link Router} for a given ingress to the Stateful Functions 
application.
      *
      * @param id the id of the ingress to bind the router to.

Reply via email to