This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 758076b0033bca61185911557041614d7bae0ada Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Thu May 28 14:13:17 2020 +0800 [FLINK-17875] [core] Wire in JsonEntities into JsonModule --- .../statefun/flink/core/jsonmodule/JsonModule.java | 294 ++------------------- .../flink/core/jsonmodule/JsonModuleFactory.java | 46 ---- .../flink/core/jsonmodule/JsonServiceLoader.java | 31 ++- .../flink/core/jsonmodule/JsonModuleTest.java | 10 +- 4 files changed, 34 insertions(+), 347 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java index bd96211..cedeb97 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java @@ -18,303 +18,41 @@ package org.apache.flink.statefun.flink.core.jsonmodule; import static java.lang.String.format; -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.toMap; -import static org.apache.flink.statefun.flink.core.common.Maps.transformValues; -import com.google.protobuf.Any; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; import java.net.URL; -import java.time.Duration; +import java.util.Arrays; import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collector; -import java.util.stream.StreamSupport; -import javax.annotation.Nullable; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.statefun.flink.common.ResourceLocator; -import org.apache.flink.statefun.flink.common.json.NamespaceNamePair; -import org.apache.flink.statefun.flink.common.json.Selectors; -import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap; -import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider; -import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec; -import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider; -import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec; -import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind; -import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions; -import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter; -import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter; -import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes; -import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes; -import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec; -import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec; -import org.apache.flink.statefun.sdk.EgressType; -import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.IngressType; -import org.apache.flink.statefun.sdk.StatefulFunctionProvider; -import org.apache.flink.statefun.sdk.io.EgressIdentifier; -import org.apache.flink.statefun.sdk.io.IngressIdentifier; -import org.apache.flink.statefun.sdk.io.Router; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; -import org.apache.flink.util.TimeUtils; final class JsonModule implements StatefulFunctionModule { - private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1); - private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000; - private final JsonNode spec; + + /** Entities in the JSON moduleSpecNode that should be parsed and bound to the module. */ + private static final List<JsonEntity> ENTITIES = + Arrays.asList( + new FunctionJsonEntity(), + new IngressJsonEntity(), + new RouterJsonEntity(), + new EgressJsonEntity()); + + private final JsonNode moduleSpecNode; + private final FormatVersion formatVersion; private final URL moduleUrl; - public JsonModule(JsonNode spec, URL moduleUrl) { - this.spec = Objects.requireNonNull(spec); + public JsonModule(JsonNode moduleSpecNode, FormatVersion formatVersion, URL moduleUrl) { + this.moduleSpecNode = Objects.requireNonNull(moduleSpecNode); + this.formatVersion = Objects.requireNonNull(formatVersion); this.moduleUrl = Objects.requireNonNull(moduleUrl); } public void configure(Map<String, String> conf, Binder binder) { try { - configureFunctions(binder, Selectors.listAt(spec, Pointers.FUNCTIONS_POINTER)); - configureRouters(binder, Selectors.listAt(spec, Pointers.ROUTERS_POINTER)); - configureIngress(binder, Selectors.listAt(spec, Pointers.INGRESSES_POINTER)); - configureEgress(binder, Selectors.listAt(spec, Pointers.EGRESSES_POINTER)); + ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion)); } catch (Throwable t) { throw new ModuleConfigurationException( format("Error while parsing module at %s", moduleUrl), t); } } - - private void configureFunctions(Binder binder, Iterable<? extends JsonNode> functions) { - final Map<Kind, Map<FunctionType, FunctionSpec>> definedFunctions = - StreamSupport.stream(functions.spliterator(), false) - .map(JsonModule::parseFunctionSpec) - .collect(groupingBy(FunctionSpec::kind, groupByFunctionType())); - - for (Entry<Kind, Map<FunctionType, FunctionSpec>> entry : definedFunctions.entrySet()) { - StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue()); - Set<FunctionType> functionTypes = entry.getValue().keySet(); - for (FunctionType type : functionTypes) { - binder.bindFunctionProvider(type, provider); - } - } - } - - private static StatefulFunctionProvider functionProvider( - Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) { - switch (kind) { - case HTTP: - return new HttpFunctionProvider( - transformValues(definedFunctions, HttpFunctionSpec.class::cast)); - case GRPC: - return new GrpcFunctionProvider( - transformValues(definedFunctions, GrpcFunctionSpec.class::cast)); - default: - throw new IllegalStateException("Unexpected value: " + kind); - } - } - - private void configureRouters(Binder binder, Iterable<? extends JsonNode> routerNodes) { - for (JsonNode router : routerNodes) { - // currently the only type of router supported in a module.yaml, is a protobuf dynamicMessage - // router once we will introduce further router types we should refactor this to be more - // dynamic. - requireProtobufRouterType(router); - - IngressIdentifier<Message> id = targetRouterIngress(router); - binder.bindIngressRouter(id, dynamicRouter(router)); - } - } - - private void configureIngress(Binder binder, Iterable<? extends JsonNode> ingressNode) { - for (JsonNode ingress : ingressNode) { - IngressIdentifier<Message> id = ingressId(ingress); - IngressType type = ingressType(ingress); - - JsonIngressSpec<Message> ingressSpec = new JsonIngressSpec<>(type, id, ingress); - binder.bindIngress(ingressSpec); - - if (isAutoRoutableIngress(type)) { - binder.bindIngressRouter(id, new AutoRoutableProtobufRouter()); - } - } - } - - private void configureEgress(Binder binder, Iterable<? extends JsonNode> egressNode) { - for (JsonNode egress : egressNode) { - EgressIdentifier<Any> id = egressId(egress); - EgressType type = egressType(egress); - - JsonEgressSpec<Any> egressSpec = new JsonEgressSpec<>(type, id, egress); - binder.bindEgress(egressSpec); - } - } - - // ---------------------------------------------------------------------------------------------------------- - // Ingresses - // ---------------------------------------------------------------------------------------------------------- - private static IngressType ingressType(JsonNode spec) { - String typeString = Selectors.textAt(spec, Pointers.Ingress.META_TYPE); - NamespaceNamePair nn = NamespaceNamePair.from(typeString); - return new IngressType(nn.namespace(), nn.name()); - } - - private static IngressIdentifier<Message> ingressId(JsonNode ingress) { - String ingressId = Selectors.textAt(ingress, Pointers.Ingress.META_ID); - NamespaceNamePair nn = NamespaceNamePair.from(ingressId); - return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name()); - } - - private static boolean isAutoRoutableIngress(IngressType ingressType) { - return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE) - || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE); - } - - // ---------------------------------------------------------------------------------------------------------- - // Egresses - // ---------------------------------------------------------------------------------------------------------- - private static EgressType egressType(JsonNode spec) { - String typeString = Selectors.textAt(spec, Pointers.Egress.META_TYPE); - NamespaceNamePair nn = NamespaceNamePair.from(typeString); - return new EgressType(nn.namespace(), nn.name()); - } - - private static EgressIdentifier<Any> egressId(JsonNode spec) { - String egressId = Selectors.textAt(spec, Pointers.Egress.META_ID); - NamespaceNamePair nn = NamespaceNamePair.from(egressId); - return new EgressIdentifier<>(nn.namespace(), nn.name(), Any.class); - } - - // ---------------------------------------------------------------------------------------------------------- - // Routers - // ---------------------------------------------------------------------------------------------------------- - private static Router<Message> dynamicRouter(JsonNode router) { - String addressTemplate = Selectors.textAt(router, Pointers.Routers.SPEC_TARGET); - String descriptorSetPath = Selectors.textAt(router, Pointers.Routers.SPEC_DESCRIPTOR); - String messageType = Selectors.textAt(router, Pointers.Routers.SPEC_MESSAGE_TYPE); - - ProtobufDescriptorMap descriptorPath = protobufDescriptorMap(descriptorSetPath); - Optional<Descriptors.GenericDescriptor> maybeDescriptor = - descriptorPath.getDescriptorByName(messageType); - if (!maybeDescriptor.isPresent()) { - throw new IllegalStateException( - "Error while processing a router definition. Unable to locate a message " - + messageType - + " in a descriptor set " - + descriptorSetPath); - } - return ProtobufRouter.forAddressTemplate( - (Descriptors.Descriptor) maybeDescriptor.get(), addressTemplate); - } - - private static ProtobufDescriptorMap protobufDescriptorMap(String descriptorSetPath) { - try { - URL url = ResourceLocator.findNamedResource(descriptorSetPath); - if (url == null) { - throw new IllegalArgumentException( - "Unable to locate a Protobuf descriptor set at " + descriptorSetPath); - } - return ProtobufDescriptorMap.from(url); - } catch (IOException e) { - throw new IllegalStateException( - "Error while processing a router definition. Unable to read the descriptor set at " - + descriptorSetPath, - e); - } - } - - private static IngressIdentifier<Message> targetRouterIngress(JsonNode routerNode) { - String targetIngress = Selectors.textAt(routerNode, Pointers.Routers.SPEC_INGRESS); - NamespaceNamePair nn = NamespaceNamePair.from(targetIngress); - return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name()); - } - - private static void requireProtobufRouterType(JsonNode routerNode) { - String routerType = Selectors.textAt(routerNode, Pointers.Routers.META_TYPE); - if (!routerType.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) { - throw new IllegalStateException("Invalid router type " + routerType); - } - } - - // ---------------------------------------------------------------------------------------------------------- - // Functions - // ---------------------------------------------------------------------------------------------------------- - - private static FunctionSpec parseFunctionSpec(JsonNode functionNode) { - String functionKind = Selectors.textAt(functionNode, Pointers.Functions.META_KIND); - FunctionSpec.Kind kind = Kind.valueOf(functionKind.toUpperCase(Locale.getDefault())); - FunctionType functionType = functionType(functionNode); - switch (kind) { - case HTTP: - return new HttpFunctionSpec( - functionType, - functionUri(functionNode), - functionStates(functionNode), - maxRequestDuration(functionNode), - maxNumBatchRequests(functionNode)); - case GRPC: - return new GrpcFunctionSpec(functionType, functionAddress(functionNode)); - default: - throw new IllegalArgumentException("Unrecognized function kind " + functionKind); - } - } - - private static int maxNumBatchRequests(JsonNode functionNode) { - return Selectors.optionalIntegerAt(functionNode, Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS) - .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS); - } - - private static Duration maxRequestDuration(JsonNode functionNode) { - return Selectors.optionalTextAt(functionNode, Functions.FUNCTION_TIMEOUT) - .map(TimeUtils::parseDuration) - .orElse(DEFAULT_HTTP_TIMEOUT); - } - - private static List<String> functionStates(JsonNode functionNode) { - return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES); - } - - private static FunctionType functionType(JsonNode functionNode) { - String namespaceName = Selectors.textAt(functionNode, Pointers.Functions.META_TYPE); - NamespaceNamePair nn = NamespaceNamePair.from(namespaceName); - return new FunctionType(nn.namespace(), nn.name()); - } - - private static InetSocketAddress functionAddress(JsonNode functionNode) { - String host = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_HOSTNAME); - int port = Selectors.integerAt(functionNode, Pointers.Functions.FUNCTION_PORT); - return new InetSocketAddress(host, port); - } - - private static URI functionUri(JsonNode functionNode) { - String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT); - URI typedUri = URI.create(uri); - @Nullable String scheme = typedUri.getScheme(); - if (scheme == null) { - throw new IllegalArgumentException( - "Missing scheme in function endpoint " - + uri - + "; an http or https scheme must be provided."); - } - if (scheme.equalsIgnoreCase("http") - || scheme.equalsIgnoreCase("https") - || scheme.equalsIgnoreCase("http+unix") - || scheme.equalsIgnoreCase("https+unix")) { - return typedUri; - } - throw new IllegalArgumentException( - "Missing scheme in function endpoint " - + uri - + "; an http or https or http+unix or https+unix scheme must be provided."); - } - - private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() { - return toMap(FunctionSpec::functionType, Function.identity()); - } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java deleted file mode 100644 index 3616bcd..0000000 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.statefun.flink.core.jsonmodule; - -import java.net.URL; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.statefun.flink.common.json.Selectors; -import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; - -final class JsonModuleFactory { - - private JsonModuleFactory() {} - - static StatefulFunctionModule create(JsonNode root, URL moduleUrl) { - final FormatVersion formatVersion = formatVersion(root); - final JsonNode spec = root.at(Pointers.MODULE_SPEC); - - switch (formatVersion) { - case v1_0: - return new JsonModule(spec, moduleUrl); - default: - throw new IllegalArgumentException("Unrecognized format version: " + formatVersion); - } - } - - private static FormatVersion formatVersion(JsonNode root) { - String versionStr = Selectors.textAt(root, Pointers.FORMAT_VERSION); - return FormatVersion.fromString(versionStr); - } -} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java index 0ac7712..84e3eba 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java @@ -26,6 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.apache.flink.statefun.flink.common.ResourceLocator; +import org.apache.flink.statefun.flink.common.json.Selectors; import org.apache.flink.statefun.flink.core.spi.Constants; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; @@ -45,8 +46,9 @@ public final class JsonServiceLoader { @VisibleForTesting static StatefulFunctionModule fromUrl(ObjectMapper mapper, URL moduleUrl) { try { - JsonNode root = readAndValidateModuleTree(mapper, moduleUrl); - return JsonModuleFactory.create(root, moduleUrl); + final JsonNode root = readAndValidateModuleTree(mapper, moduleUrl); + return new JsonModule( + requireValidModuleSpecNode(moduleUrl, root), requireValidFormatVersion(root), moduleUrl); } catch (Throwable t) { throw new RuntimeException("Failed loading a module at " + moduleUrl, t); } @@ -55,21 +57,13 @@ public final class JsonServiceLoader { /** * Read a {@code StatefulFunction} module definition. * - * <p>A valid resource module definition has to contain the following sections: - * - * <ul> - * <li>meta - contains the metadata associated with this module, such as its type. - * <li>spec - a specification of the module. i.e. the definied functions, routers etc'. - * </ul> - * - * <p>If any of these sections are missing, this would be considered an invalid module definition, - * in addition a type is a mandatory field of a module spec. + * <p>A valid resource module definition has to contain the metadata associated with this module, + * such as its type. */ private static JsonNode readAndValidateModuleTree(ObjectMapper mapper, URL moduleYamlFile) throws IOException { JsonNode root = mapper.readTree(moduleYamlFile); validateMeta(moduleYamlFile, root); - validateSpec(moduleYamlFile, root); return root; } @@ -87,10 +81,19 @@ public final class JsonServiceLoader { } } - private static void validateSpec(URL moduleYamlFile, JsonNode root) { - if (root.at(Pointers.MODULE_SPEC).isMissingNode()) { + private static JsonNode requireValidModuleSpecNode(URL moduleYamlFile, JsonNode root) { + final JsonNode moduleSpecNode = root.at(Pointers.MODULE_SPEC); + + if (moduleSpecNode.isMissingNode()) { throw new IllegalStateException("A module without a spec at " + moduleYamlFile); } + + return moduleSpecNode; + } + + private static FormatVersion requireValidFormatVersion(JsonNode root) { + final String formatVersion = Selectors.textAt(root, Pointers.FORMAT_VERSION); + return FormatVersion.fromString(formatVersion); } @VisibleForTesting diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java index f1f3aa1..28ee1fb 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java @@ -22,10 +22,8 @@ import static org.junit.Assert.assertThat; import com.google.protobuf.Any; import com.google.protobuf.Message; -import java.io.IOException; import java.net.URL; import java.util.Collections; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; import org.apache.flink.statefun.flink.core.message.MessageFactoryType; @@ -98,13 +96,7 @@ public class JsonModuleTest { URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path); assertThat(moduleUrl, not(nullValue())); ObjectMapper mapper = JsonServiceLoader.mapper(); - final JsonNode json; - try { - json = mapper.readTree(moduleUrl); - } catch (IOException e) { - throw new RuntimeException(e); - } - return JsonModuleFactory.create(json, moduleUrl); + return JsonServiceLoader.fromUrl(mapper, moduleUrl); } private static StatefulFunctionsUniverse emptyUniverse() {
