tzulitai commented on a change in pull request #243:
URL: https://github.com/apache/flink-statefun/pull/243#discussion_r667689216



##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
##########
@@ -146,6 +170,42 @@ private static FunctionEndpointSpec 
parseFunctionEndpointsSpec(
     }
   }
 
+  private static void configureHttpTransport(
+      HttpFunctionEndpointSpec.Builder endpointSpecBuilder,
+      ObjectNode transportSpecNode,
+      ExtensionResolver extensionResolver) {
+    final Optional<RequestReplyClientFactory> transportClientFactory =
+        Selectors.optionalTextAt(transportSpecNode, 
TransportPointers.CLIENT_FACTORY_TYPE)
+            .map(TypeName::parseFrom)
+            .map(
+                extensionType ->
+                    extensionResolver.resolveExtension(
+                        extensionType, RequestReplyClientFactory.class));
+    
transportClientFactory.ifPresent(endpointSpecBuilder::withTransportClientFactory);
+
+    // retain everything except "type" field, and use that directly as the 
transport client
+    // properties
+    transportSpecNode.remove("type");

Review comment:
       It can be argued that remove the `type` field isn't absolutely necessary.

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
##########
@@ -49,10 +50,17 @@ public JsonModule(JsonNode moduleSpecNode, FormatVersion 
formatVersion, URL modu
 
   public void configure(Map<String, String> conf, Binder binder) {
     try {
-      ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, 
formatVersion));
+      ENTITIES.forEach(
+          jsonEntity ->
+              jsonEntity.bind(binder, getExtensionResolver(binder), 
moduleSpecNode, formatVersion));
     } catch (Throwable t) {
       throw new ModuleConfigurationException(
           format("Error while parsing module at %s", moduleUrl), t);
     }
   }
+
+  // TODO expose ExtensionResolver properly once we have more usages
+  private static ExtensionResolver getExtensionResolver(Binder binder) {
+    return (ExtensionResolver) binder;

Review comment:
       Notice this TODO.
   
   I have considered add a `getExtensionResolver()` method on the binder 
interface of `StatefulFunctionModule.Binder`, but wasn't sure if it the right 
move given how few usages we have so far.

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
##########
@@ -71,40 +68,35 @@ private HttpFunctionEndpointSpec 
getEndpointsSpecOrThrow(FunctionType functionTy
     throw new IllegalStateException("Unknown type: " + functionType);
   }
 
-  private RequestReplyClient buildHttpClient(
-      HttpFunctionEndpointSpec spec, FunctionType functionType) {
-    if (sharedClient == null) {
-      sharedClient = OkHttpUtils.newClient();
-    }
-    OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
-    clientBuilder.callTimeout(spec.maxRequestDuration());
-    clientBuilder.connectTimeout(spec.connectTimeout());
-    clientBuilder.readTimeout(spec.readTimeout());
-    clientBuilder.writeTimeout(spec.writeTimeout());
-
-    URI endpointUrl = spec.urlPathTemplate().apply(functionType);
+  private static RequestReplyClient buildTransportClientFromSpec(
+      URI endpointUrl, HttpFunctionEndpointSpec endpointsSpec) {
+    final RequestReplyClientFactory factory = 
endpointsSpec.transportClientFactory();
+    final ObjectNode properties = endpointsSpec.transportClientProperties();
 
-    final HttpUrl url;
-    if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
-      UnixDomainHttpEndpoint endpoint = 
UnixDomainHttpEndpoint.parseFrom(endpointUrl);
-
-      url =
-          new HttpUrl.Builder()
-              .scheme("http")
-              .host("unused")
-              .addPathSegment(endpoint.pathSegment)
-              .build();
-
-      configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
+    if (Thread.currentThread().getContextClassLoader() == 
factory.getClass().getClassLoader()) {
+      // in this case, we're using one of our own shipped transport client 
factory
+      return factory.createTransportClient(properties, endpointUrl);
     } else {
-      url = HttpUrl.get(endpointUrl);
+      try (SetContextClassLoader ignored = new SetContextClassLoader(factory)) 
{
+        return new ContextSafeRequestReplyClient(
+            factory.createTransportClient(properties, endpointUrl));
+      }
     }
-    return new HttpRequestReplyClient(url, clientBuilder.build(), () -> 
shutdown);
   }
 
   @Override
   public void shutdown() {
-    shutdown = true;
-    OkHttpUtils.closeSilently(sharedClient);
+    specificTypeEndpointSpecs
+        .values()
+        .forEach(spec -> 
shutdownTransportClientFactory(spec.transportClientFactory()));
+    perNamespaceEndpointSpecs
+        .values()
+        .forEach(spec -> 
shutdownTransportClientFactory(spec.transportClientFactory()));
+  }
+
+  private static void shutdownTransportClientFactory(RequestReplyClientFactory 
factory) {
+    try (SetContextClassLoader ignored = new SetContextClassLoader(factory)) {

Review comment:
       As you can see, switching classloaders is currently a concern handled by 
the `HttpFunctionProvider`, and is rather error prone and fragile as we include 
more and more extensions in the future.
   
   It may be worth considering moving this responsibility elsewhere, e.g. the 
`ExtensionResolver` should perhaps always return a classloader-safe, already 
decorated instance of extensions. If that's the case, we can clean up a lot of 
this classloader checks in the `HttpFunctionProvider`, which is essentially 
user code to the runtime.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to