tzulitai commented on a change in pull request #110:
URL: https://github.com/apache/flink-statefun/pull/110#discussion_r425563507
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
##########
@@ -41,12 +45,32 @@ public RequestReplyFunction functionOfType(FunctionType
type) {
if (spec == null) {
throw new IllegalArgumentException("Unsupported type " + type);
}
- // specific client reuses the same the connection pool and thread pool
- // as the sharedClient.
- OkHttpClient specificClient =
-
sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
- RequestReplyClient httpClient =
- new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()),
specificClient);
- return new RequestReplyFunction(spec.states(), spec.maxNumBatchRequests(),
httpClient);
+ return new RequestReplyFunction(
+ spec.states(), spec.maxNumBatchRequests(), buildHttpClient(spec));
+ }
+
+ private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+ // We need to build a UDS HTTP client
+ if (spec.unixDomainSocket() != null) {
+ OkHttpClient specificClient =
+ sharedClient
+ .newBuilder()
+ .socketFactory(new
AFUNIXSocketFactory.FactoryArg(spec.unixDomainSocket()))
Review comment:
Is there a reason that you chose not to implement a custom UDS socket
factory like in the example at
https://github.com/square/okhttp/tree/master/samples/unixdomainsockets/src/main/java/okhttp3/unixdomainsockets?
I'm asking only because we should try to have the `statefun-flink-core`
dependencies as slim as possible, as it directly influences the size of our
base distributions. It might also not be too big of a deal if the
`junixsocket-core` library is small.
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
##########
@@ -41,12 +45,32 @@ public RequestReplyFunction functionOfType(FunctionType
type) {
if (spec == null) {
throw new IllegalArgumentException("Unsupported type " + type);
}
- // specific client reuses the same the connection pool and thread pool
- // as the sharedClient.
- OkHttpClient specificClient =
-
sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
- RequestReplyClient httpClient =
- new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()),
specificClient);
- return new RequestReplyFunction(spec.states(), spec.maxNumBatchRequests(),
httpClient);
+ return new RequestReplyFunction(
+ spec.states(), spec.maxNumBatchRequests(), buildHttpClient(spec));
+ }
+
+ private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+ // We need to build a UDS HTTP client
+ if (spec.unixDomainSocket() != null) {
+ OkHttpClient specificClient =
+ sharedClient
+ .newBuilder()
+ .socketFactory(new
AFUNIXSocketFactory.FactoryArg(spec.unixDomainSocket()))
+ // Enable HTTP/2 if available (uses H2 upgrade),
+ // otherwise fallback to HTTP/1.1
+ .protocols(Collections.singletonList(Protocol.HTTP_2))
+ .callTimeout(spec.maxRequestDuration())
+ .build();
+
+ return new HttpRequestReplyClient(
+ // Only the path matters!
+ HttpUrl.get(URI.create(spec.endpoint().getPath())), specificClient);
Review comment:
For easier readability: maybe it makese sense to add to the spec class a
getter method which just returns the path string without the scheme.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]