igalshilman commented on a change in pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818262446
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
##########
@@ -76,14 +92,30 @@ public NettyRequestReplySpec(
ofNullable(connectTimeout),
() -> DEFAULT_CONNECT_TIMEOUT);
this.pooledConnectionTTL =
- ofNullable(pooledConnectionTTL).orElseGet(() ->
DEFAULT_POOLED_CONNECTION_TTL);
+ ofNullable(pooledConnectionTTL).orElse(DEFAULT_POOLED_CONNECTION_TTL);
this.connectionPoolMaxSize =
ofNullable(connectionPoolMaxSize).orElse(DEFAULT_CONNECTION_POOL_MAX_SIZE);
this.maxRequestOrResponseSizeInBytes =
ofNullable(maxRequestOrResponseSizeInBytes)
.orElse(DEFAULT_MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES);
}
+ public Optional<String> getTrustedCaCertsOptional() {
Review comment:
I think that the `Optional` suffix is redundant, since it is clear from
the return type.
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
##########
@@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) {
}
channel.close().addListener(ignored -> pool.release(channel));
}
+
+ private static SslContext getSslContext(NettyRequestReplySpec spec) {
+ Optional<String> maybeTrustCaCerts = spec.getTrustedCaCertsOptional();
+ Optional<String> maybeClientCerts = spec.getClientCertsOptional();
+ Optional<String> maybeClientKey = spec.getClientKeyOptional();
+ Optional<String> maybeKeyPassword = spec.getClientKeyPasswordOptional();
+
+ if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) {
+ throw new IllegalStateException(
+ "You provided a client cert, but not a client key. Cannot
continue.");
+ }
+ if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) {
Review comment:
This seems to me as a duplicate condition as the condition above.
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
##########
@@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) {
}
channel.close().addListener(ignored -> pool.release(channel));
}
+
+ private static SslContext getSslContext(NettyRequestReplySpec spec) {
+ Optional<String> maybeTrustCaCerts = spec.getTrustedCaCertsOptional();
+ Optional<String> maybeClientCerts = spec.getClientCertsOptional();
+ Optional<String> maybeClientKey = spec.getClientKeyOptional();
+ Optional<String> maybeKeyPassword = spec.getClientKeyPasswordOptional();
+
+ if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) {
+ throw new IllegalStateException(
+ "You provided a client cert, but not a client key. Cannot
continue.");
+ }
+ if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) {
+ throw new IllegalStateException(
+ "You provided a client key, but not a client cert. Cannot
continue.");
+ }
+
+ Optional<InputStream> maybeTrustCaCertsInputStream =
+ maybeTrustCaCerts.map(
+ trustedCaCertsLocation ->
+ openStreamIfExistsOrThrow(
+
ResourceLocator.findNamedResource(trustedCaCertsLocation)));
+
+ Optional<InputStream> maybeCertInputStream =
+ maybeClientCerts.map(
+ clientCertLocation ->
+
openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientCertLocation)));
+
+ Optional<InputStream> maybeKeyInputStream =
+ maybeClientKey.map(
+ clientKeyLocation ->
+
openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientKeyLocation)));
+
+ SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
+ maybeTrustCaCertsInputStream.ifPresent(sslContextBuilder::trustManager);
+ maybeCertInputStream.ifPresent(
+ certInputStream ->
+ maybeKeyInputStream.ifPresent(
Review comment:
What do you think about strongly indicating that `maybeKeyInputStream`
is mandatory, by calling `get` on It (and suppress a warning)?
##########
File path:
statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/CommandInterpreterAppServer.java
##########
@@ -20,33 +20,120 @@
import static
org.apache.flink.statefun.e2e.smoke.java.Constants.CMD_INTERPRETER_FN;
-import io.undertow.Undertow;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.*;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.*;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.ClientAuth;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslProvider;
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.java.StatefulFunctions;
-import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
public class CommandInterpreterAppServer {
Review comment:
wow, well done!
##########
File path:
statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml
##########
@@ -16,5 +16,11 @@
kind: io.statefun.endpoints.v2/http
spec:
functions: statefun.smoke.e2e/command-interpreter-fn
- urlPathTemplate: http://remote-function-host:8000
- maxNumBatchRequests: 10000
\ No newline at end of file
+ urlPathTemplate: https://remote-function-host:8000
+ maxNumBatchRequests: 10000
+ transport:
+ type: io.statefun.transports.v1/async
+ trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem
+ client_cert:
file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt
+ client_key:
file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8
+ client_key_password: test
Review comment:
While I understand the need for a `client_key_password` to be present,
it is a bit problematic, as it puts sensitive information into these yaml files.
How about we add an additional `client_key_password_file` property, and let
people the option to reference that from a, say, k8s secret mounted?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]