igalshilman commented on a change in pull request #306:
URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818262446



##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
##########
@@ -76,14 +92,30 @@ public NettyRequestReplySpec(
             ofNullable(connectTimeout),
             () -> DEFAULT_CONNECT_TIMEOUT);
     this.pooledConnectionTTL =
-        ofNullable(pooledConnectionTTL).orElseGet(() -> 
DEFAULT_POOLED_CONNECTION_TTL);
+        ofNullable(pooledConnectionTTL).orElse(DEFAULT_POOLED_CONNECTION_TTL);
     this.connectionPoolMaxSize =
         
ofNullable(connectionPoolMaxSize).orElse(DEFAULT_CONNECTION_POOL_MAX_SIZE);
     this.maxRequestOrResponseSizeInBytes =
         ofNullable(maxRequestOrResponseSizeInBytes)
             .orElse(DEFAULT_MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES);
   }
 
+  public Optional<String> getTrustedCaCertsOptional() {

Review comment:
       I think that the `Optional` suffix is redundant, since it is clear from 
the return type.

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
##########
@@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) {
     }
     channel.close().addListener(ignored -> pool.release(channel));
   }
+
+  private static SslContext getSslContext(NettyRequestReplySpec spec) {
+    Optional<String> maybeTrustCaCerts = spec.getTrustedCaCertsOptional();
+    Optional<String> maybeClientCerts = spec.getClientCertsOptional();
+    Optional<String> maybeClientKey = spec.getClientKeyOptional();
+    Optional<String> maybeKeyPassword = spec.getClientKeyPasswordOptional();
+
+    if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) {
+      throw new IllegalStateException(
+          "You provided a client cert, but not a client key. Cannot 
continue.");
+    }
+    if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) {

Review comment:
       This seems to me as a duplicate condition as the condition above.

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
##########
@@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) {
     }
     channel.close().addListener(ignored -> pool.release(channel));
   }
+
+  private static SslContext getSslContext(NettyRequestReplySpec spec) {
+    Optional<String> maybeTrustCaCerts = spec.getTrustedCaCertsOptional();
+    Optional<String> maybeClientCerts = spec.getClientCertsOptional();
+    Optional<String> maybeClientKey = spec.getClientKeyOptional();
+    Optional<String> maybeKeyPassword = spec.getClientKeyPasswordOptional();
+
+    if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) {
+      throw new IllegalStateException(
+          "You provided a client cert, but not a client key. Cannot 
continue.");
+    }
+    if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) {
+      throw new IllegalStateException(
+          "You provided a client key, but not a client cert. Cannot 
continue.");
+    }
+
+    Optional<InputStream> maybeTrustCaCertsInputStream =
+        maybeTrustCaCerts.map(
+            trustedCaCertsLocation ->
+                openStreamIfExistsOrThrow(
+                    
ResourceLocator.findNamedResource(trustedCaCertsLocation)));
+
+    Optional<InputStream> maybeCertInputStream =
+        maybeClientCerts.map(
+            clientCertLocation ->
+                
openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientCertLocation)));
+
+    Optional<InputStream> maybeKeyInputStream =
+        maybeClientKey.map(
+            clientKeyLocation ->
+                
openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientKeyLocation)));
+
+    SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
+    maybeTrustCaCertsInputStream.ifPresent(sslContextBuilder::trustManager);
+    maybeCertInputStream.ifPresent(
+        certInputStream ->
+            maybeKeyInputStream.ifPresent(

Review comment:
       What do you think about strongly indicating that `maybeKeyInputStream` 
is mandatory, by calling `get` on It (and suppress a warning)?

##########
File path: 
statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/CommandInterpreterAppServer.java
##########
@@ -20,33 +20,120 @@
 
 import static 
org.apache.flink.statefun.e2e.smoke.java.Constants.CMD_INTERPRETER_FN;
 
-import io.undertow.Undertow;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.*;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.*;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.ClientAuth;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslProvider;
 import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
 import org.apache.flink.statefun.sdk.java.StatefulFunctions;
-import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
 
 public class CommandInterpreterAppServer {

Review comment:
       wow, well done! 

##########
File path: 
statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml
##########
@@ -16,5 +16,11 @@
 kind: io.statefun.endpoints.v2/http
 spec:
   functions: statefun.smoke.e2e/command-interpreter-fn
-  urlPathTemplate: http://remote-function-host:8000
-  maxNumBatchRequests: 10000
\ No newline at end of file
+  urlPathTemplate: https://remote-function-host:8000
+  maxNumBatchRequests: 10000
+  transport:
+    type: io.statefun.transports.v1/async
+    trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem
+    client_cert: 
file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt
+    client_key: 
file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8
+    client_key_password: test

Review comment:
       While I understand the need for a `client_key_password` to be present, 
it is a bit problematic, as it puts sensitive information into these yaml files.
   How about we add an additional `client_key_password_file` property, and let 
people the option to reference that from a, say, k8s secret mounted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to