This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new a0c0c44b9 [java] allow adding CA certs to the client's trust chain
a0c0c44b9 is described below

commit a0c0c44b91a795022eb46fd450d05e76acf3d718
Author: Zoltan Chovan <[email protected]>
AuthorDate: Wed May 31 13:50:38 2023 +0000

    [java] allow adding CA certs to the client's trust chain
    
    With this patch, the Java client doesn't implicitly trust the TLS
    certificate of a Kudu server it's negotiating an RPC connection with
    when using its JWT. There is also an update on the Negotiator class
    to fix various JWT-related issues during RPC connection negotiation.
    
    To implement that, this patch introduces the following new methods:
    
      * [Async]KuduClient.trustedCertificates(List<ByteString> certs)
        to add the specified CA certificates (in DER format) as the
        trusted ones for the client (the call replaces all the previously
        set certificates if invoked again)
    
      * KuduTestHarness.getClusterCACertDer() to acquire the IPKI CA
        certificate of the test mini-cluster via call to
        MiniKuduCluster.getCACertDer()
    
      * MiniKuduCluster.getMasterWebServerAddressesAsString() which
        returns the addresses of Master's webserver as a comma separated
        string, similar to MiniKuduCluster.getMasterAddressesAsString()
    
    In addition, this changelist also contains tests to cover the newly
    introduced functionality.
    
    Change-Id: I281fd59da0d24a6119b99e1da8096f381b414a2b
    Reviewed-on: http://gerrit.cloudera.org:8080/20017
    Tested-by: Kudu Jenkins
    Reviewed-by: Attila Bukor <[email protected]>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |  27 ++-
 .../java/org/apache/kudu/client/KuduClient.java    |  14 +-
 .../java/org/apache/kudu/client/Negotiator.java    |  59 ++++-
 .../org/apache/kudu/client/SecurityContext.java    |   9 +-
 .../org/apache/kudu/client/TestKuduClient.java     |  14 ++
 .../org/apache/kudu/client/TestNegotiation.java    | 245 ++++++++++++++++++++-
 .../java/org/apache/kudu/test/KuduTestHarness.java |  11 +
 .../apache/kudu/test/cluster/MiniKuduCluster.java  |  63 +++++-
 .../org/apache/kudu/test/TestMiniKuduCluster.java  |  60 +++--
 9 files changed, 455 insertions(+), 47 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 68aa1b2fc..b9ada6de4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -75,6 +75,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
+import org.apache.kudu.client.Client.AuthenticationCredentialsPB;
 import org.apache.kudu.master.Master;
 import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
 import org.apache.kudu.master.Master.TSInfoPB;
@@ -1178,6 +1179,18 @@ public class AsyncKuduClient implements AutoCloseable {
     securityContext.importAuthenticationCredentials(authnData);
   }
 
+  /**
+   * Mark the given CA certificates (in DER format) as the trusted ones for the
+   * client. The provided list of certificates replaces any previously set 
ones.
+   *
+   * @param certificates list of certificates to trust (in DER format)
+   * @throws CertificateException if any of the specified certificates were 
invalid
+   */
+  @InterfaceStability.Unstable
+  public void trustedCertificates(List<ByteString> certificates) throws 
CertificateException {
+    securityContext.trustCertificates(certificates);
+  }
+
   /**
    * Set JWT (JSON Web Token) to authenticate the client to a server.
    * <p>
@@ -1190,10 +1203,13 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   @InterfaceStability.Unstable
   public void jwt(String jwt) {
-    Token.JwtRawPB jwtPB = Token.JwtRawPB.newBuilder()
-        .setJwtData(ByteString.copyFromUtf8(jwt))
-        .build();
-    securityContext.setJsonWebToken(jwtPB);
+    AuthenticationCredentialsPB credentials =
+        AuthenticationCredentialsPB.newBuilder()
+          .setJwt(Token.JwtRawPB.newBuilder()
+                  .setJwtData(ByteString.copyFromUtf8(jwt))
+                  .build())
+          .build();
+    securityContext.importAuthenticationCredentials(credentials.toByteArray());
   }
 
   /**
@@ -1945,7 +1961,8 @@ public class AsyncKuduClient implements AutoCloseable {
                 
securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken());
               }
               List<ByteString> caCerts = 
resp.getConnectResponse().getCaCertDerList();
-              if (!caCerts.isEmpty()) {
+              if (!caCerts.isEmpty() && (securityContext.getJsonWebToken() == 
null ||
+                                         
!securityContext.getJsonWebToken().hasJwtData())) {
                 try {
                   securityContext.trustCertificates(caCerts);
                 } catch (CertificateException e) {
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index ed2f90e30..6d8e7f82b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executor;
@@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master.TableIdentifierPB;
-import org.apache.kudu.security.Token;
 
 /**
  * A synchronous and thread-safe client for Kudu.
@@ -468,6 +468,18 @@ public class KuduClient implements AutoCloseable {
     asyncClient.importAuthenticationCredentials(authnData);
   }
 
+  /**
+   * Mark the given CA certificates (in DER format) as the trusted ones for the
+   * client. The provided list of certificates replaces any previously set 
ones.
+   *
+   * @param certificates list of certificates to trust (in DER format)
+   * @throws CertificateException if any of the specified certificates were 
invalid
+   */
+  @InterfaceStability.Unstable
+  public void trustedCertificates(List<ByteString> certificates) throws 
CertificateException {
+    asyncClient.trustedCertificates(certificates);
+  }
+
   /**
    * Set JWT (JSON Web Token) to authenticate the client to a server.
    * <p>
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index f17dfce4e..1099a1528 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -162,7 +162,8 @@ public class Negotiator extends 
SimpleChannelInboundHandler<CallResponse> {
     INITIAL,
     AWAIT_NEGOTIATE,
     AWAIT_TLS_HANDSHAKE,
-    AWAIT_TOKEN_EXCHANGE,
+    AWAIT_AUTHN_TOKEN_EXCHANGE,
+    AWAIT_JWT_EXCHANGE,
     AWAIT_SASL,
     FINISHED
   }
@@ -175,8 +176,17 @@ public class Negotiator extends 
SimpleChannelInboundHandler<CallResponse> {
    * The authentication token we'll try to connect with, maybe null.
    * This is fetched from {@link #securityContext} in the constructor to
    * ensure that it doesn't change over the course of a negotiation attempt.
+   * An authentication token is used as secondary credentials.
    */
   private final SignedTokenPB authnToken;
+  /**
+   * A JSON Web Token (JWT) to authenticate this client/actor to the server
+   * that we'll try to connect with. Similar to {@link #authnToken}, this token
+   * may be null, and it's fetched from {@link #securityContext} as well.
+   * Cannot change over the course of an RPC connection negotiation attempt.
+   *
+   * @note unlike {@link #authnToken}, {@link #jsonWebToken} is used as 
primary credentials
+   */
   private final JwtRawPB jsonWebToken;
 
   private enum AuthnTokenNotUsedReason {
@@ -254,6 +264,7 @@ public class Negotiator extends 
SimpleChannelInboundHandler<CallResponse> {
     this.requireAuthentication = requireAuthentication;
     this.requireEncryption = requireEncryption;
     this.encryptLoopback = encryptLoopback;
+
     SignedTokenPB token = securityContext.getAuthenticationToken();
     if (token != null) {
       if (ignoreAuthnToken) {
@@ -269,9 +280,13 @@ public class Negotiator extends 
SimpleChannelInboundHandler<CallResponse> {
       this.authnToken = null;
       this.authnTokenNotUsedReason = AuthnTokenNotUsedReason.NONE_AVAILABLE;
     }
+
     JwtRawPB jwt = securityContext.getJsonWebToken();
-    // TODO(zchovan): also guard this on the presence of certs.
-    this.jsonWebToken = jwt;
+    if (jwt != null && securityContext.hasTrustedCerts()) {
+      this.jsonWebToken = jwt;
+    } else {
+      this.jsonWebToken = null;
+    }
   }
 
   public void sendHello(ChannelHandlerContext ctx) {
@@ -304,6 +319,15 @@ public class Negotiator extends 
SimpleChannelInboundHandler<CallResponse> {
           AuthenticationTypePB.Token.getDefaultInstance());
     }
 
+    // We may also have a JSON Web token, but it can be sent to the server
+    // only if we can verify the server's authenticity and the channel between
+    // this client and the server is confidential, i.e. it's protected by
+    // authenticated TLS.
+    if (jsonWebToken != null) {
+      builder.addAuthnTypesBuilder().setJwt(
+          AuthenticationTypePB.Jwt.getDefaultInstance());
+    }
+
     // We currently don't support client-certificate authentication from the
     // Java client.
 
@@ -345,8 +369,11 @@ public class Negotiator extends 
SimpleChannelInboundHandler<CallResponse> {
       case AWAIT_SASL:
         handleSaslMessage(ctx, response);
         break;
-      case AWAIT_TOKEN_EXCHANGE:
-        handleTokenExchangeResponse(ctx, response);
+      case AWAIT_AUTHN_TOKEN_EXCHANGE:
+        handleAuthnTokenExchangeResponse(ctx, response);
+        break;
+      case AWAIT_JWT_EXCHANGE:
+        handleJwtExchangeResponse(ctx, response);
         break;
       case AWAIT_TLS_HANDSHAKE:
         handleTlsMessage(ctx, response);
@@ -595,6 +622,7 @@ public class Negotiator extends 
SimpleChannelInboundHandler<CallResponse> {
         engine = securityContext.createSSLEngineTrustAll();
         break;
       case TOKEN:
+      case JWT:
         engine = securityContext.createSSLEngine();
         break;
       default:
@@ -765,7 +793,7 @@ public class Negotiator extends 
SimpleChannelInboundHandler<CallResponse> {
   }
 
   private void sendTokenExchange(ChannelHandlerContext ctx) {
-    // We must not send a token unless we have successfully finished
+    // We must not send authn token unless we have successfully finished
     // authenticating via TLS.
     Preconditions.checkNotNull(authnToken);
     Preconditions.checkNotNull(sslHandshakeFuture);
@@ -774,26 +802,39 @@ public class Negotiator extends 
SimpleChannelInboundHandler<CallResponse> {
     RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
         .setStep(NegotiateStep.TOKEN_EXCHANGE)
         .setAuthnToken(authnToken);
-    state = State.AWAIT_TOKEN_EXCHANGE;
+    state = State.AWAIT_AUTHN_TOKEN_EXCHANGE;
     sendSaslMessage(ctx, builder.build());
   }
 
   private void sendJwtExchange(ChannelHandlerContext ctx) {
+    // We must not send JWT unless we have successfully finished
+    // authenticating via TLS.
     Preconditions.checkNotNull(jsonWebToken);
     Preconditions.checkNotNull(sslHandshakeFuture);
     Preconditions.checkState(sslHandshakeFuture.isSuccess());
+
     RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
         .setStep(NegotiateStep.JWT_EXCHANGE)
         .setJwtRaw(jsonWebToken);
+    state = State.AWAIT_JWT_EXCHANGE;
     sendSaslMessage(ctx, builder.build());
   }
 
-  private void handleTokenExchangeResponse(ChannelHandlerContext ctx, 
NegotiatePB response)
+  private void handleAuthnTokenExchangeResponse(ChannelHandlerContext ctx, 
NegotiatePB response)
       throws SaslException {
     Preconditions.checkArgument(response.getStep() == 
NegotiateStep.TOKEN_EXCHANGE,
         "expected TOKEN_EXCHANGE, got step: {}", response.getStep());
 
-    // The token response doesn't have any actual data in it, so we can just 
move on.
+    // The authn token response doesn't have any actual data in it, so we can 
just move on.
+    finish(ctx);
+  }
+
+  private void handleJwtExchangeResponse(ChannelHandlerContext ctx, 
NegotiatePB response)
+      throws SaslException {
+    Preconditions.checkArgument(response.getStep() == 
NegotiateStep.JWT_EXCHANGE,
+        "expected JWT_EXCHANGE, got step: {}", response.getStep());
+
+    // The JWT response doesn't have any actual data in it, so we can just 
move on.
     finish(ctx);
   }
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
index b02c020a4..cc2c8733d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
@@ -326,11 +326,16 @@ class SecurityContext {
       if (pb.hasAuthnToken()) {
         authnToken = pb.getAuthnToken();
       }
-      trustCertificates(pb.getCaCertDersList());
+
+      // only trust ca certificates automatically if they were acquired with 
mutual trust of
+      // identities
+      if (!pb.hasJwt()) {
+        trustCertificates(pb.getCaCertDersList());
+      }
 
       if (pb.hasJwt()) {
         // Don't overwrite the JWT in the context if it's already set.
-        if (!jsonWebToken.hasJwtData() ||
+        if (jsonWebToken == null || !jsonWebToken.hasJwtData() ||
             (jsonWebToken.hasJwtData() && 
jsonWebToken.getJwtData().isEmpty())) {
           jsonWebToken = pb.getJwt();
         }
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 917d5aed2..a3138a33a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -42,6 +42,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
@@ -50,8 +51,10 @@ import java.io.Closeable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
+import java.security.cert.CertificateException;
 import java.sql.Date;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -66,6 +69,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import com.stumbleupon.async.Deferred;
 import org.junit.Before;
 import org.junit.Rule;
@@ -1911,4 +1915,14 @@ public class TestKuduClient {
       AsyncKuduSession.injectLatencyBufferFlushCb(false);
     }
   }
+
+  @Test(timeout = 50000)
+  public void testImportInvalidCert() throws Exception {
+    // An empty certificate to import.
+    byte[] caCert = new byte[0];
+    CertificateException e = assertThrows(CertificateException.class, () -> {
+      client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+    });
+    assertTrue(e.getMessage().contains("Could not parse certificate"));
+  }
 }
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java
index f0b35c7e2..89d2728eb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java
@@ -17,10 +17,17 @@
 
 package org.apache.kudu.client;
 
+import static junit.framework.TestCase.assertFalse;
 import static junit.framework.TestCase.assertTrue;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 import java.io.Closeable;
+import java.util.Arrays;
 
+import com.google.protobuf.ByteString;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -29,6 +36,7 @@ import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
 import org.apache.kudu.test.cluster.FakeDNS;
 import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
+import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression;
 
 public class TestNegotiation {
   private static final MiniKuduClusterBuilder clusterBuilder =
@@ -62,4 +70,239 @@ public class TestNegotiation {
     assertTrue(cla.getAppendedText(),
                cla.getAppendedText().contains("Client requested to use 
mechanism: PLAIN"));
   }
-}
\ No newline at end of file
+
+  /**
+   * When JWT is enabled on the server, a client with a valid JWT should be
+   * able to connect using the provided JSON Web Token to authenticate to Kudu
+   * servers (Kudu master in this particular case).
+   *
+   * In other words, when Kudu client has JWT and trusts the server's TLS
+   * certificate, the client and the server should negotiate a connection using
+   * the JSON Web Token provided by the client.
+   */
+  @Test
+  @MasterServerConfig(flags = {
+      "--enable-jwt-token-auth",
+      "--rpc-trace-negotiation",
+  })
+  public void testJwtAuthnWithTrustedCert() throws Exception {
+    FakeDNS.getInstance().install();
+    CapturingLogAppender cla = new CapturingLogAppender();
+
+    // The test harness might have the client already connected to the test
+    // cluster.
+    harness.resetClients();
+    KuduClient client = harness.getClient();
+    String jwt = harness.createJwtFor("account-id", "kudu", true);
+    assertNotNull(jwt);
+    client.jwt(jwt);
+
+    waitForClusterCACert();
+    final byte[] caCert = harness.getClusterCACertDer();
+    assertNotEquals(0, caCert.length);
+    client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+
+    try (Closeable c = cla.attach()) {
+      // A simple call to make sure the client has connected to the cluster.
+      // Success here assumes that the RPC connection to the Kudu server
+      // has been successfully negotiated.
+      assertFalse(client.tableExists("nonexistent"));
+    }
+
+    // Make sure the parties used JWT authn mechanism to negotiate the 
connection.
+    assertTrue(cla.getAppendedText(),
+               cla.getAppendedText().contains("Negotiated authn=JWT"));
+  }
+
+  @Test
+  @MasterServerConfig(flags = {
+      "--enable-jwt-token-auth",
+      "--rpc-authentication=required",
+      "--rpc-negotiation-timeout-ms=1500",
+      "--rpc-trace-negotiation",
+  })
+  public void testJwtAuthnWithoutTrustedCert() throws Exception {
+    FakeDNS.getInstance().install();
+    CapturingLogAppender cla = new CapturingLogAppender();
+
+    harness.kdestroy();
+    harness.resetClients();
+
+    // Create a special client with short timeout for RPCs. This is a bit 
funny,
+    // but due to the way how ConnectToMaster is implemented in the Java 
client,
+    // there isn't a simple way to stop the client to negotiate a connection
+    // again and again, unless the overall RPC times out. A connection closure
+    // upon negotiation failure is being interpreted as NetworkError, and 
that's
+    // a recoverable exception, so the operation is retried again and again.
+    //
+    // For faster test runs, the RPC timeout is set lower than the RPC 
connection
+    // negotiation timeout, while the latter is set lower than its default 
value
+    // (see the MasterServerConfig for the test). However, to prevent test
+    // flakiness, it's necessary to have at least connection negotiation 
attempt
+    // before the RPC times out.
+    AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
+        harness.getMasterAddressesAsString())
+        .defaultAdminOperationTimeoutMs(1000)
+        .defaultOperationTimeoutMs(1000)
+        .build();
+    KuduClient client = asyncClient.syncClient();
+
+    // Provide the client with a valid JWT.
+    String jwt = harness.createJwtFor("account-id", "kudu", true);
+    assertNotNull(jwt);
+    client.jwt(jwt);
+
+    try (Closeable c = cla.attach()) {
+      // It doesn't matter what method to call here: ConnectToMaster should not
+      // succeed, so the corresponding RPC won't be invoked anyway.
+      assertFalse(client.tableExists("nonexistent"));
+      fail();
+    } catch (NonRecoverableException ex) {
+      // Java client reports SERVICE_UNAVAILABLE in this case.
+      //
+      // TODO(aserbin): is this a bug? should it be fixed?
+      assertTrue(ex.getStatus().isServiceUnavailable());
+    }
+
+    // Make sure the parties aren't using JWT authn mechanism to negotiate the
+    // connection since the client shouldn't be willing to send its JWT to a
+    // non-authenticated Kudu server. As of now, the parties are using the SASL
+    // authn mechanism in current implementation, but that's not an invariant
+    // to enforce, so it's not asserted here.
+    assertFalse(cla.getAppendedText(), cla.getAppendedText().contains(
+        "Negotiated authn=JWT"));
+    assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+        "server requires authentication, but client does not have Kerberos 
credentials (tgt)."));
+    assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+        "Authentication tokens were not used because no token is available]"));
+  }
+
+  /**
+   * Try to authenticate with a valid JWT by mismatched account/principal name.
+   * An RPC connection to the server will be established successfully, but
+   * the client will fail to invoke the ConnectToMaster RPC because of
+   * NotAuthorized error from the coarse-grain authz subsystem.
+   */
+  @Test
+  @MasterServerConfig(flags = {
+      "--enable-jwt-token-auth",
+      "--rpc-trace-negotiation",
+  })
+  public void testValidJwtButWrongSubject() throws Exception {
+    FakeDNS.getInstance().install();
+    CapturingLogAppender cla = new CapturingLogAppender();
+
+    // The test harness might have the client already connected to the test
+    // cluster.
+    harness.resetClients();
+    KuduClient client = harness.getClient();
+    String jwt = harness.createJwtFor("account-id", "interloper", true);
+    assertNotNull(jwt);
+    client.jwt(jwt);
+
+    waitForClusterCACert();
+    final byte[] caCert = harness.getClusterCACertDer();
+    assertNotEquals(0, caCert.length);
+    client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+
+    try (Closeable c = cla.attach()) {
+      // It doesn't matter what method to call here: ConnectToMaster should not
+      // succeed, so the corresponding RPC won't be invoked anyway.
+      client.tableExists("nonexistent");
+      fail();
+    } catch (NonRecoverableException ex) {
+      // That's a bit funny, but Java client reports SERVICE_UNAVAILABLE in 
this
+      // case when failing to call a remote method due to NotAuthorized error
+      // code returned by Kudu master.
+      //
+      // TODO(aserbin): is this a bug? should it be fixed?
+      assertTrue(ex.getStatus().isServiceUnavailable());
+      assertTrue(ex.getMessage().contains(
+          "Not authorized: unauthorized access to method: ConnectToMaster"));
+    }
+
+    // Make sure the parties used JWT authn mechanism to successfully negotiate
+    // the connection, even if the coarse-grained authz check rejected a remote
+    // call of one of the API methods.
+    assertTrue(cla.getAppendedText(),
+        cla.getAppendedText().contains("Negotiated authn=JWT"));
+  }
+
+  /**
+   * Try to authenticate with an invalid JWT. The connection negotiation
+   * should fail because the server should not be able to verify the invalid 
JWT
+   * that the client provided.
+   */
+  @Test
+  @MasterServerConfig(flags = {
+      "--enable-jwt-token-auth",
+      "--rpc-negotiation-timeout-ms=1500",
+      "--rpc-trace-negotiation",
+  })
+  public void testInvalidJwt() throws Exception {
+    FakeDNS.getInstance().install();
+    CapturingLogAppender cla = new CapturingLogAppender();
+
+    // Create a special client with short timeout for RPCs. This is a bit 
funny,
+    // but due to the way how ConnectToMaster is implemented in the Java 
client,
+    // there isn't a simple way to stop the client to negotiate a connection
+    // again and again, unless the overall RPC times out.
+    //
+    // For faster test runs, the RPC timeout is set lower than the RPC 
connection
+    // negotiation timeout, while the latter is set lower than its default 
value
+    // (see the MasterServerConfig for the test). However, to prevent test
+    // flakiness, it's necessary to have at least connection negotiation 
attempt
+    // before the RPC times out.
+    //
+    // TODO(aserbin): fix ConnectToMaster and stop negotiation attempts upon 
receiving NotAuthorized
+    AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
+        harness.getMasterAddressesAsString())
+        .defaultAdminOperationTimeoutMs(1000)
+        .defaultOperationTimeoutMs(1000)
+        .build();
+    KuduClient client = asyncClient.syncClient();
+
+    String jwt = harness.createJwtFor("account-id", "kudu", false);
+    assertNotNull(jwt);
+    client.jwt(jwt);
+
+    waitForClusterCACert();
+    final byte[] caCert = harness.getClusterCACertDer();
+    assertNotEquals(0, caCert.length);
+    client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+
+    try (Closeable c = cla.attach()) {
+      // It doesn't matter what method to call here: ConnectToMaster should not
+      // succeed, so the corresponding RPC won't be invoked anyway.
+      client.tableExists("nonexistent");
+      fail();
+    } catch (NonRecoverableException ex) {
+      assertTrue(ex.getStatus().isTimedOut());
+    }
+
+    assertTrue(cla.getAppendedText(),cla.getAppendedText().contains(
+        "Negotiated authn=JWT"));
+    assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+        "Negotiation complete: Not authorized: Server connection negotiation 
failed"));
+    assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+        "FATAL_INVALID_JWT: Not authorized: JWT verification failed: failed to 
verify signature"));
+    assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+        "Unable to connect to master"));
+    assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+        "connection closed"));
+  }
+
+  private void waitForClusterCACert() throws Exception {
+    // It may take some time for the catalog manager to initialize
+    // and have IPKI CA certificate ready.
+    assertEventuallyTrue(
+        "valid cluster IPKI CA certificate captured",
+        new BooleanExpression() {
+          @Override
+          public boolean get() throws Exception {
+            return harness.getClusterCACertDer().length != 0;
+          }
+        },
+        10000/*timeoutMillis*/);
+  }
+}
diff --git 
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java 
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
index 30ea8a365..47b6656f0 100644
--- 
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
+++ 
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
@@ -20,10 +20,14 @@ package org.apache.kudu.test;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.util.List;
 import java.util.Random;
 
@@ -491,6 +495,13 @@ public class KuduTestHarness extends ExternalResource {
     return miniCluster.createJwtFor(accountId, subject, isValid);
   }
 
+  /**
+   * @return cluster's CA certificate in DER format or null if catalog manager 
isn't ready
+   */
+  public byte[] getClusterCACertDer() throws IOException {
+    return miniCluster.getCACertDer();
+  }
+
   /**
    * An annotation that can be added to each test method to
    * define additional master server flags to be used when
diff --git 
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
 
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
index 31bcdc6f9..ce51491b2 100644
--- 
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
+++ 
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
@@ -26,6 +26,10 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.nio.file.Paths;
 import java.security.Security;
 import java.util.ArrayList;
@@ -34,7 +38,9 @@ import java.util.Map;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -96,6 +102,7 @@ public final class MiniKuduCluster implements AutoCloseable {
     DaemonIdentifierPB id;
     boolean isRunning;
     boolean isPaused;
+    String webServerAddress;
   }
 
   // Map of master addresses to daemon information.
@@ -113,7 +120,6 @@ public final class MiniKuduCluster implements AutoCloseable 
{
   private final ImmutableList<String> locationInfo;
   private final String clusterRoot;
   private final String principal;
-  private final boolean enableClientJwt;
 
   private MiniKdcOptionsPB kdcOptionsPb;
   private final Common.HmsMode hmsMode;
@@ -129,7 +135,6 @@ public final class MiniKuduCluster implements AutoCloseable 
{
       String clusterRoot,
       Common.HmsMode hmsMode,
       String principal,
-      boolean enableClientJwt,
       MiniOidcOptionsPB oidcOptionsPb) {
     this.enableKerberos = enableKerberos;
     this.numMasters = numMasters;
@@ -140,7 +145,6 @@ public final class MiniKuduCluster implements AutoCloseable 
{
     this.kdcOptionsPb = kdcOptionsPb;
     this.principal = principal;
     this.hmsMode = hmsMode;
-    this.enableClientJwt = enableClientJwt;
     this.oidcOptionsPb = oidcOptionsPb;
 
     if (clusterRoot == null) {
@@ -290,6 +294,8 @@ public final class MiniKuduCluster implements AutoCloseable 
{
       d.id = info.getId();
       d.isRunning = true;
       d.isPaused = false;
+      d.webServerAddress = String.join(":", 
info.getBoundHttpAddress().getHost(),
+                                       
Integer.toString(info.getBoundHttpAddress().getPort()));
       
masterServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), 
d);
     }
     resp = sendRequestToCluster(
@@ -301,17 +307,31 @@ public final class MiniKuduCluster implements 
AutoCloseable {
       d.id = info.getId();
       d.isRunning = true;
       d.isPaused = false;
+      d.webServerAddress = String.join(":", 
info.getBoundHttpAddress().getHost(),
+                                       
Integer.toString(info.getBoundHttpAddress().getPort()));
       
tabletServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), 
d);
     }
   }
 
   /**
-   * @return comma-separated list of master server addresses
+   * @return a comma-separated list of RPC addresses of all masters in the 
cluster
    */
   public String getMasterAddressesAsString() {
     return Joiner.on(',').join(masterServers.keySet());
   }
 
+  /**
+   * @return a comma-separated list of webserver addresses of all masters in 
the cluster
+   */
+  public String getMasterWebServerAddressesAsString() {
+    List<String> addresses = new ArrayList<String>();
+    masterServers.forEach((hp, daemonInfo) -> {
+      addresses.add(daemonInfo.webServerAddress);
+    });
+
+    return Joiner.on(',').join(addresses);
+  }
+
   /**
    * @return the list of master servers
    */
@@ -683,6 +703,33 @@ public final class MiniKuduCluster implements 
AutoCloseable {
     return clusterRoot;
   }
 
+  /**
+   * @return cluster's CA certificate in DER format or an empty array
+   */
+  public byte[] getCACertDer() throws IOException {
+    String masterHttpAddr = Iterables.get(Splitter.on(',')
+                                     
.split(getMasterWebServerAddressesAsString()), 0);
+    URL url = new URL("http://"; + masterHttpAddr + "/ipki-ca-cert-der");
+    HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+    connection.setRequestMethod("GET");
+    connection.connect();
+
+    if (connection.getResponseCode() != 200) {
+      connection.disconnect();
+      return new byte[0];
+    }
+
+    InputStream urlData = connection.getInputStream();
+    int contentSize = connection.getContentLength();
+    byte[] data = new byte[contentSize];
+    int numBytesRead = urlData.read(data);
+    if (numBytesRead != contentSize) {
+      connection.disconnect();
+      return new byte[0];
+    }
+    return data;
+  }
+
   /**
    * Helper runnable that receives stderr and logs it along with the process' 
identifier.
    */
@@ -725,7 +772,6 @@ public final class MiniKuduCluster implements AutoCloseable 
{
     private final List<String> locationInfo = new ArrayList<>();
     private String clusterRoot = null;
     private String principal = "kudu";
-    private boolean enableClientJwt = false;
 
     private MiniKdcOptionsPB.Builder kdcOptionsPb = 
MiniKdcOptionsPB.newBuilder();
     private MiniOidcOptionsPB.Builder oidcOptionsPb = 
MiniOidcOptionsPB.newBuilder();
@@ -750,11 +796,6 @@ public final class MiniKuduCluster implements 
AutoCloseable {
       return this;
     }
 
-    public MiniKuduClusterBuilder enableClientJwt() {
-      enableClientJwt = true;
-      return this;
-    }
-
     public MiniKuduClusterBuilder enableHiveMetastoreIntegration() {
       hmsMode = Common.HmsMode.ENABLE_METASTORE_INTEGRATION;
       return this;
@@ -836,7 +877,7 @@ public final class MiniKuduCluster implements AutoCloseable 
{
           new MiniKuduCluster(enableKerberos,
               numMasterServers, numTabletServers,
               extraTabletServerFlags, extraMasterServerFlags, locationInfo,
-              kdcOptionsPb.build(), clusterRoot, hmsMode, principal, 
enableClientJwt,
+              kdcOptionsPb.build(), clusterRoot, hmsMode, principal,
               oidcOptionsPb.build());
       try {
         cluster.start();
diff --git 
a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
 
b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
index c31efb399..db11df73a 100644
--- 
a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
+++ 
b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
@@ -17,7 +17,9 @@
 
 package org.apache.kudu.test;
 
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -25,25 +27,21 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.Arrays;
 
 import com.google.protobuf.ByteString;
 import org.junit.Rule;
 import org.junit.Test;
 
-import org.apache.kudu.client.AsyncKuduClient;
-import org.apache.kudu.client.Client.AuthenticationCredentialsPB;
 import org.apache.kudu.client.HostAndPort;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduClient.KuduClientBuilder;
 import org.apache.kudu.client.ListTablesResponse;
 import org.apache.kudu.client.TimeoutTracker;
-import org.apache.kudu.security.Token.JwtRawPB;
 import org.apache.kudu.test.cluster.FakeDNS;
 import org.apache.kudu.test.cluster.MiniKuduCluster;
-import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
+import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression;
 import org.apache.kudu.test.junit.RetryRule;
-import org.apache.kudu.tools.Tool.CreateClusterRequestPB.JwksOptionsPB;
-import org.apache.kudu.tools.Tool.CreateClusterRequestPB.MiniOidcOptionsPB;
 
 public class TestMiniKuduCluster {
 
@@ -118,19 +116,32 @@ public class TestMiniKuduCluster {
   }
 
   @Test(timeout = 50000)
-  public void testJwt() throws Exception {
-    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
-                                                     
.numMasterServers(NUM_MASTERS)
-                                                     .numTabletServers(0)
-                                                     .enableClientJwt()
-                                                     .addJwks("account-id", 
true)
-                                                     .build();
-        KuduClient client = new 
KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) {
-      String jwt = cluster.createJwtFor("account-id", "subject", true);
+  public void testJwtAuthn() throws Exception {
+    try (MiniKuduCluster cluster = createClusterForJwtTest();
+         KuduClient client = new 
KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) {
+      // It may take some time for the catalog manager to initialize
+      // and have IPKI CA certificate ready.
+      assertEventuallyTrue(
+          "valid cluster IPKI CA certificate captured",
+          new BooleanExpression() {
+            @Override
+            public boolean get() throws Exception {
+              return cluster.getCACertDer().length != 0;
+            }
+          },
+          10000/*timeoutMillis*/);
+      byte[] caCert = cluster.getCACertDer();
+      assertNotEquals(0, caCert.length);
+
+      String jwt = cluster.createJwtFor("account-id", "kudu", true);
       assertNotNull(jwt);
-
       client.jwt(jwt);
-      client.getTablesList();
+      client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+
+      // A simple call to make sure the client can connect to the cluster.
+      // That assumes an RPC connection to the master has been successfully
+      // negotiated.
+      assertTrue(client.getTablesList().getTablesList().isEmpty());
     }
   }
 
@@ -167,7 +178,7 @@ public class TestMiniKuduCluster {
    * @param testIsOpen true if we should want it to be open, false if we want 
it closed
    */
   private static void testHostPort(HostAndPort hp,
-                                   boolean testIsOpen) throws 
InterruptedException {
+      boolean testIsOpen) throws InterruptedException {
     TimeoutTracker tracker = new TimeoutTracker();
     while (tracker.getElapsedMillis() < SLEEP_TIME_MS) {
       try {
@@ -185,4 +196,17 @@ public class TestMiniKuduCluster {
     }
     fail("HostAndPort " + hp + " is still " + (testIsOpen ? "closed " : 
"open"));
   }
+
+  private MiniKuduCluster createClusterForJwtTest() throws IOException {
+    return new MiniKuduCluster.MiniKuduClusterBuilder()
+        .numMasterServers(NUM_MASTERS)
+        .numTabletServers(0)
+        .addMasterServerFlag("--enable-jwt-token-auth")
+        .addMasterServerFlag("--rpc-trace-negotiation")
+        .addMasterServerFlag("--rpc-authentication=required")
+        .addMasterServerFlag("--rpc-encryption=required")
+        .enableKerberos()
+        .addJwks("account-id", true)
+        .build();
+  }
 }


Reply via email to