tzulitai commented on a change in pull request #243:
URL: https://github.com/apache/flink-statefun/pull/243#discussion_r667689216
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
##########
@@ -146,6 +170,42 @@ private static FunctionEndpointSpec
parseFunctionEndpointsSpec(
}
}
+ private static void configureHttpTransport(
+ HttpFunctionEndpointSpec.Builder endpointSpecBuilder,
+ ObjectNode transportSpecNode,
+ ExtensionResolver extensionResolver) {
+ final Optional<RequestReplyClientFactory> transportClientFactory =
+ Selectors.optionalTextAt(transportSpecNode,
TransportPointers.CLIENT_FACTORY_TYPE)
+ .map(TypeName::parseFrom)
+ .map(
+ extensionType ->
+ extensionResolver.resolveExtension(
+ extensionType, RequestReplyClientFactory.class));
+
transportClientFactory.ifPresent(endpointSpecBuilder::withTransportClientFactory);
+
+ // retain everything except "type" field, and use that directly as the
transport client
+ // properties
+ transportSpecNode.remove("type");
Review comment:
It can be argued that remove the `type` field isn't absolutely necessary.
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
##########
@@ -49,10 +50,17 @@ public JsonModule(JsonNode moduleSpecNode, FormatVersion
formatVersion, URL modu
public void configure(Map<String, String> conf, Binder binder) {
try {
- ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode,
formatVersion));
+ ENTITIES.forEach(
+ jsonEntity ->
+ jsonEntity.bind(binder, getExtensionResolver(binder),
moduleSpecNode, formatVersion));
} catch (Throwable t) {
throw new ModuleConfigurationException(
format("Error while parsing module at %s", moduleUrl), t);
}
}
+
+ // TODO expose ExtensionResolver properly once we have more usages
+ private static ExtensionResolver getExtensionResolver(Binder binder) {
+ return (ExtensionResolver) binder;
Review comment:
Notice this TODO.
I have considered add a `getExtensionResolver()` method on the binder
interface of `StatefulFunctionModule.Binder`, but wasn't sure if it the right
move given how few usages we have so far.
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
##########
@@ -71,40 +68,35 @@ private HttpFunctionEndpointSpec
getEndpointsSpecOrThrow(FunctionType functionTy
throw new IllegalStateException("Unknown type: " + functionType);
}
- private RequestReplyClient buildHttpClient(
- HttpFunctionEndpointSpec spec, FunctionType functionType) {
- if (sharedClient == null) {
- sharedClient = OkHttpUtils.newClient();
- }
- OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
- clientBuilder.callTimeout(spec.maxRequestDuration());
- clientBuilder.connectTimeout(spec.connectTimeout());
- clientBuilder.readTimeout(spec.readTimeout());
- clientBuilder.writeTimeout(spec.writeTimeout());
-
- URI endpointUrl = spec.urlPathTemplate().apply(functionType);
+ private static RequestReplyClient buildTransportClientFromSpec(
+ URI endpointUrl, HttpFunctionEndpointSpec endpointsSpec) {
+ final RequestReplyClientFactory factory =
endpointsSpec.transportClientFactory();
+ final ObjectNode properties = endpointsSpec.transportClientProperties();
- final HttpUrl url;
- if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
- UnixDomainHttpEndpoint endpoint =
UnixDomainHttpEndpoint.parseFrom(endpointUrl);
-
- url =
- new HttpUrl.Builder()
- .scheme("http")
- .host("unused")
- .addPathSegment(endpoint.pathSegment)
- .build();
-
- configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
+ if (Thread.currentThread().getContextClassLoader() ==
factory.getClass().getClassLoader()) {
+ // in this case, we're using one of our own shipped transport client
factory
+ return factory.createTransportClient(properties, endpointUrl);
} else {
- url = HttpUrl.get(endpointUrl);
+ try (SetContextClassLoader ignored = new SetContextClassLoader(factory))
{
+ return new ContextSafeRequestReplyClient(
+ factory.createTransportClient(properties, endpointUrl));
+ }
}
- return new HttpRequestReplyClient(url, clientBuilder.build(), () ->
shutdown);
}
@Override
public void shutdown() {
- shutdown = true;
- OkHttpUtils.closeSilently(sharedClient);
+ specificTypeEndpointSpecs
+ .values()
+ .forEach(spec ->
shutdownTransportClientFactory(spec.transportClientFactory()));
+ perNamespaceEndpointSpecs
+ .values()
+ .forEach(spec ->
shutdownTransportClientFactory(spec.transportClientFactory()));
+ }
+
+ private static void shutdownTransportClientFactory(RequestReplyClientFactory
factory) {
+ try (SetContextClassLoader ignored = new SetContextClassLoader(factory)) {
Review comment:
As you can see, switching classloaders is currently a concern handled by
the `HttpFunctionProvider`, and is rather error prone and fragile as we include
more and more extensions in the future.
It may be worth considering moving this responsibility elsewhere, e.g. the
`ExtensionResolver` should perhaps always return a classloader-safe, already
decorated instance of extensions. If that's the case, we can clean up a lot of
this classloader checks in the `HttpFunctionProvider`, which is essentially
user code to the runtime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]