This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit ac07854eaad3fbc5eb89502f16c518fbce4aab77 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Thu May 28 12:28:12 2020 +0800 [FLINK-17875] [core] Introduce JsonEntity and implementations A JsonEntity represents a section within a JsonModule that should be parsed into application entity specs (of functions, routers, ingresses, egresses, etc.) and bind to the module. --- .../flink/core/jsonmodule/EgressJsonEntity.java | 57 +++++++ .../flink/core/jsonmodule/FunctionJsonEntity.java | 166 +++++++++++++++++++++ .../flink/core/jsonmodule/IngressJsonEntity.java | 68 +++++++++ .../statefun/flink/core/jsonmodule/JsonEntity.java | 39 +++++ .../flink/core/jsonmodule/RouterJsonEntity.java | 106 +++++++++++++ 5 files changed, 436 insertions(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java new file mode 100644 index 0000000..766d936 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.jsonmodule; + +import static org.apache.flink.statefun.flink.core.jsonmodule.Pointers.EGRESSES_POINTER; + +import com.google.protobuf.Any; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.common.json.NamespaceNamePair; +import org.apache.flink.statefun.flink.common.json.Selectors; +import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec; +import org.apache.flink.statefun.sdk.EgressType; +import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder; + +final class EgressJsonEntity implements JsonEntity { + + @Override + public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) { + final Iterable<? extends JsonNode> egressNodes = + Selectors.listAt(moduleSpecRootNode, EGRESSES_POINTER); + + egressNodes.forEach( + egressNode -> { + binder.bindEgress( + new JsonEgressSpec<>(egressType(egressNode), egressId(egressNode), egressNode)); + }); + } + + private static EgressType egressType(JsonNode spec) { + String typeString = Selectors.textAt(spec, Pointers.Egress.META_TYPE); + NamespaceNamePair nn = NamespaceNamePair.from(typeString); + return new EgressType(nn.namespace(), nn.name()); + } + + private static EgressIdentifier<Any> egressId(JsonNode spec) { + String egressId = Selectors.textAt(spec, Pointers.Egress.META_ID); + NamespaceNamePair nn = NamespaceNamePair.from(egressId); + return new EgressIdentifier<>(nn.namespace(), nn.name(), Any.class); + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java new file mode 100644 index 0000000..25f1981 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.jsonmodule; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toMap; +import static org.apache.flink.statefun.flink.core.common.Maps.transformValues; +import static org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.common.json.NamespaceNamePair; +import org.apache.flink.statefun.flink.common.json.Selectors; +import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider; +import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec; +import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider; +import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.StatefulFunctionProvider; +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder; +import org.apache.flink.util.TimeUtils; + +final class FunctionJsonEntity implements JsonEntity { + + private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1); + private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000; + + @Override + public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) { + final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode); + + for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry : + parse(functionSpecNodes).entrySet()) { + StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue()); + Set<FunctionType> functionTypes = entry.getValue().keySet(); + for (FunctionType type : functionTypes) { + binder.bindFunctionProvider(type, provider); + } + } + } + + private Map<Kind, Map<FunctionType, FunctionSpec>> parse( + Iterable<? extends JsonNode> functionSpecNodes) { + return StreamSupport.stream(functionSpecNodes.spliterator(), false) + .map(FunctionJsonEntity::parseFunctionSpec) + .collect(groupingBy(FunctionSpec::kind, groupByFunctionType())); + } + + private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode moduleSpecRootNode) { + return Selectors.listAt(moduleSpecRootNode, Pointers.FUNCTIONS_POINTER); + } + + private static FunctionSpec parseFunctionSpec(JsonNode functionNode) { + String functionKind = Selectors.textAt(functionNode, Pointers.Functions.META_KIND); + FunctionSpec.Kind kind = + FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault())); + FunctionType functionType = functionType(functionNode); + switch (kind) { + case HTTP: + return new HttpFunctionSpec( + functionType, + functionUri(functionNode), + functionStates(functionNode), + maxRequestDuration(functionNode), + maxNumBatchRequests(functionNode)); + case GRPC: + return new GrpcFunctionSpec(functionType, functionAddress(functionNode)); + default: + throw new IllegalArgumentException("Unrecognized function kind " + functionKind); + } + } + + private static List<String> functionStates(JsonNode functionNode) { + return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES); + } + + private static int maxNumBatchRequests(JsonNode functionNode) { + return Selectors.optionalIntegerAt( + functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS) + .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS); + } + + private static Duration maxRequestDuration(JsonNode functionNode) { + return Selectors.optionalTextAt(functionNode, Pointers.Functions.FUNCTION_TIMEOUT) + .map(TimeUtils::parseDuration) + .orElse(DEFAULT_HTTP_TIMEOUT); + } + + private static FunctionType functionType(JsonNode functionNode) { + String namespaceName = Selectors.textAt(functionNode, Pointers.Functions.META_TYPE); + NamespaceNamePair nn = NamespaceNamePair.from(namespaceName); + return new FunctionType(nn.namespace(), nn.name()); + } + + private static InetSocketAddress functionAddress(JsonNode functionNode) { + String host = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_HOSTNAME); + int port = Selectors.integerAt(functionNode, Pointers.Functions.FUNCTION_PORT); + return new InetSocketAddress(host, port); + } + + private static URI functionUri(JsonNode functionNode) { + String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT); + URI typedUri = URI.create(uri); + @Nullable String scheme = typedUri.getScheme(); + if (scheme == null) { + throw new IllegalArgumentException( + "Missing scheme in function endpoint " + + uri + + "; an http or https scheme must be provided."); + } + if (scheme.equalsIgnoreCase("http") + || scheme.equalsIgnoreCase("https") + || scheme.equalsIgnoreCase("http+unix") + || scheme.equalsIgnoreCase("https+unix")) { + return typedUri; + } + throw new IllegalArgumentException( + "Missing scheme in function endpoint " + + uri + + "; an http or https or http+unix or https+unix scheme must be provided."); + } + + private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() { + return toMap(FunctionSpec::functionType, Function.identity()); + } + + private static StatefulFunctionProvider functionProvider( + Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) { + switch (kind) { + case HTTP: + return new HttpFunctionProvider( + transformValues(definedFunctions, HttpFunctionSpec.class::cast)); + case GRPC: + return new GrpcFunctionProvider( + transformValues(definedFunctions, GrpcFunctionSpec.class::cast)); + default: + throw new IllegalStateException("Unexpected value: " + kind); + } + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java new file mode 100644 index 0000000..d30675c --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.jsonmodule; + +import com.google.protobuf.Message; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.common.json.NamespaceNamePair; +import org.apache.flink.statefun.flink.common.json.Selectors; +import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter; +import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes; +import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes; +import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec; +import org.apache.flink.statefun.sdk.IngressType; +import org.apache.flink.statefun.sdk.io.IngressIdentifier; +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder; + +final class IngressJsonEntity implements JsonEntity { + + @Override + public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) { + final Iterable<? extends JsonNode> ingressNodes = + Selectors.listAt(moduleSpecRootNode, Pointers.INGRESSES_POINTER); + + ingressNodes.forEach( + ingressNode -> { + final IngressIdentifier<Message> id = ingressId(ingressNode); + final IngressType type = ingressType(ingressNode); + + binder.bindIngress(new JsonIngressSpec<>(type, id, ingressNode)); + if (isAutoRoutableIngress(type)) { + binder.bindIngressRouter(id, new AutoRoutableProtobufRouter()); + } + }); + } + + private static IngressType ingressType(JsonNode spec) { + String typeString = Selectors.textAt(spec, Pointers.Ingress.META_TYPE); + NamespaceNamePair nn = NamespaceNamePair.from(typeString); + return new IngressType(nn.namespace(), nn.name()); + } + + private static IngressIdentifier<Message> ingressId(JsonNode ingress) { + String ingressId = Selectors.textAt(ingress, Pointers.Ingress.META_ID); + NamespaceNamePair nn = NamespaceNamePair.from(ingressId); + return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name()); + } + + private static boolean isAutoRoutableIngress(IngressType ingressType) { + return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE) + || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE); + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java new file mode 100644 index 0000000..9d40307 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.jsonmodule; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder; + +/** + * A {@link JsonEntity} represents a section within a {@link JsonModule} that should be parsed into + * application entity specs (of functions, routers, ingresses, egresses, etc.) and bind to the + * module. + */ +interface JsonEntity { + + /** + * Parse the module spec node, and bind result specs to the module. + * + * @param binder used to bind specs to the module. + * @param moduleSpecNode the root module spec node. + * @param formatVersion the format version of the module spec. + */ + void bind(Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion); +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java new file mode 100644 index 0000000..ee0ad47 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.jsonmodule; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import java.io.IOException; +import java.net.URL; +import java.util.Optional; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.statefun.flink.common.ResourceLocator; +import org.apache.flink.statefun.flink.common.json.NamespaceNamePair; +import org.apache.flink.statefun.flink.common.json.Selectors; +import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap; +import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter; +import org.apache.flink.statefun.sdk.io.IngressIdentifier; +import org.apache.flink.statefun.sdk.io.Router; +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder; + +final class RouterJsonEntity implements JsonEntity { + + @Override + public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) { + final Iterable<? extends JsonNode> routerNodes = + Selectors.listAt(moduleSpecRootNode, Pointers.ROUTERS_POINTER); + + routerNodes.forEach( + routerNode -> { + // currently the only type of router supported in a module.yaml, is a protobuf + // dynamicMessage + // router once we will introduce further router types we should refactor this to be more + // dynamic. + requireProtobufRouterType(routerNode); + + binder.bindIngressRouter(targetRouterIngress(routerNode), dynamicRouter(routerNode)); + }); + } + + // ---------------------------------------------------------------------------------------------------------- + // Routers + // ---------------------------------------------------------------------------------------------------------- + + private static Router<Message> dynamicRouter(JsonNode router) { + String addressTemplate = Selectors.textAt(router, Pointers.Routers.SPEC_TARGET); + String descriptorSetPath = Selectors.textAt(router, Pointers.Routers.SPEC_DESCRIPTOR); + String messageType = Selectors.textAt(router, Pointers.Routers.SPEC_MESSAGE_TYPE); + + ProtobufDescriptorMap descriptorPath = protobufDescriptorMap(descriptorSetPath); + Optional<Descriptors.GenericDescriptor> maybeDescriptor = + descriptorPath.getDescriptorByName(messageType); + if (!maybeDescriptor.isPresent()) { + throw new IllegalStateException( + "Error while processing a router definition. Unable to locate a message " + + messageType + + " in a descriptor set " + + descriptorSetPath); + } + return ProtobufRouter.forAddressTemplate( + (Descriptors.Descriptor) maybeDescriptor.get(), addressTemplate); + } + + private static ProtobufDescriptorMap protobufDescriptorMap(String descriptorSetPath) { + try { + URL url = ResourceLocator.findNamedResource(descriptorSetPath); + if (url == null) { + throw new IllegalArgumentException( + "Unable to locate a Protobuf descriptor set at " + descriptorSetPath); + } + return ProtobufDescriptorMap.from(url); + } catch (IOException e) { + throw new IllegalStateException( + "Error while processing a router definition. Unable to read the descriptor set at " + + descriptorSetPath, + e); + } + } + + private static IngressIdentifier<Message> targetRouterIngress(JsonNode routerNode) { + String targetIngress = Selectors.textAt(routerNode, Pointers.Routers.SPEC_INGRESS); + NamespaceNamePair nn = NamespaceNamePair.from(targetIngress); + return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name()); + } + + private static void requireProtobufRouterType(JsonNode routerNode) { + String routerType = Selectors.textAt(routerNode, Pointers.Routers.META_TYPE); + if (!routerType.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) { + throw new IllegalStateException("Invalid router type " + routerType); + } + } +}
