This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch branch-1.17.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 3c69a143934fc82667a086f60fad85ca40d2ecf6 Author: Zoltan Chovan <[email protected]> AuthorDate: Wed May 31 13:50:38 2023 +0000 [java] allow adding CA certs to the client's trust chain With this patch, the Java client doesn't implicitly trust the TLS certificate of a Kudu server it's negotiating an RPC connection with when using its JWT. There is also an update on the Negotiator class to fix various JWT-related issues during RPC connection negotiation. To implement that, this patch introduces the following new methods: * [Async]KuduClient.trustedCertificates(List<ByteString> certs) to add the specified CA certificates (in DER format) as the trusted ones for the client (the call replaces all the previously set certificates if invoked again) * KuduTestHarness.getClusterCACertDer() to acquire the IPKI CA certificate of the test mini-cluster via call to MiniKuduCluster.getCACertDer() * MiniKuduCluster.getMasterWebServerAddressesAsString() which returns the addresses of Master's webserver as a comma separated string, similar to MiniKuduCluster.getMasterAddressesAsString() In addition, this changelist also contains tests to cover the newly introduced functionality. Change-Id: I281fd59da0d24a6119b99e1da8096f381b414a2b Reviewed-on: http://gerrit.cloudera.org:8080/20017 Tested-by: Kudu Jenkins Reviewed-by: Attila Bukor <[email protected]> (cherry picked from commit a0c0c44b91a795022eb46fd450d05e76acf3d718) Reviewed-on: http://gerrit.cloudera.org:8080/20047 Reviewed-by: Yingchun Lai <[email protected]> --- .../org/apache/kudu/client/AsyncKuduClient.java | 27 ++- .../java/org/apache/kudu/client/KuduClient.java | 14 +- .../java/org/apache/kudu/client/Negotiator.java | 59 ++++- .../org/apache/kudu/client/SecurityContext.java | 9 +- .../org/apache/kudu/client/TestKuduClient.java | 14 ++ .../org/apache/kudu/client/TestNegotiation.java | 245 ++++++++++++++++++++- .../java/org/apache/kudu/test/KuduTestHarness.java | 11 + .../apache/kudu/test/cluster/MiniKuduCluster.java | 63 +++++- .../org/apache/kudu/test/TestMiniKuduCluster.java | 60 +++-- 9 files changed, 455 insertions(+), 47 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index 68aa1b2fc..b9ada6de4 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -75,6 +75,7 @@ import org.slf4j.LoggerFactory; import org.apache.kudu.Common; import org.apache.kudu.Schema; +import org.apache.kudu.client.Client.AuthenticationCredentialsPB; import org.apache.kudu.master.Master; import org.apache.kudu.master.Master.GetTableLocationsResponsePB; import org.apache.kudu.master.Master.TSInfoPB; @@ -1178,6 +1179,18 @@ public class AsyncKuduClient implements AutoCloseable { securityContext.importAuthenticationCredentials(authnData); } + /** + * Mark the given CA certificates (in DER format) as the trusted ones for the + * client. The provided list of certificates replaces any previously set ones. + * + * @param certificates list of certificates to trust (in DER format) + * @throws CertificateException if any of the specified certificates were invalid + */ + @InterfaceStability.Unstable + public void trustedCertificates(List<ByteString> certificates) throws CertificateException { + securityContext.trustCertificates(certificates); + } + /** * Set JWT (JSON Web Token) to authenticate the client to a server. * <p> @@ -1190,10 +1203,13 @@ public class AsyncKuduClient implements AutoCloseable { */ @InterfaceStability.Unstable public void jwt(String jwt) { - Token.JwtRawPB jwtPB = Token.JwtRawPB.newBuilder() - .setJwtData(ByteString.copyFromUtf8(jwt)) - .build(); - securityContext.setJsonWebToken(jwtPB); + AuthenticationCredentialsPB credentials = + AuthenticationCredentialsPB.newBuilder() + .setJwt(Token.JwtRawPB.newBuilder() + .setJwtData(ByteString.copyFromUtf8(jwt)) + .build()) + .build(); + securityContext.importAuthenticationCredentials(credentials.toByteArray()); } /** @@ -1945,7 +1961,8 @@ public class AsyncKuduClient implements AutoCloseable { securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken()); } List<ByteString> caCerts = resp.getConnectResponse().getCaCertDerList(); - if (!caCerts.isEmpty()) { + if (!caCerts.isEmpty() && (securityContext.getJsonWebToken() == null || + !securityContext.getJsonWebToken().hasJwtData())) { try { securityContext.trustCertificates(caCerts); } catch (CertificateException e) { diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java index ed2f90e30..6d8e7f82b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java @@ -17,6 +17,7 @@ package org.apache.kudu.client; +import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; @@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory; import org.apache.kudu.Schema; import org.apache.kudu.master.Master.TableIdentifierPB; -import org.apache.kudu.security.Token; /** * A synchronous and thread-safe client for Kudu. @@ -468,6 +468,18 @@ public class KuduClient implements AutoCloseable { asyncClient.importAuthenticationCredentials(authnData); } + /** + * Mark the given CA certificates (in DER format) as the trusted ones for the + * client. The provided list of certificates replaces any previously set ones. + * + * @param certificates list of certificates to trust (in DER format) + * @throws CertificateException if any of the specified certificates were invalid + */ + @InterfaceStability.Unstable + public void trustedCertificates(List<ByteString> certificates) throws CertificateException { + asyncClient.trustedCertificates(certificates); + } + /** * Set JWT (JSON Web Token) to authenticate the client to a server. * <p> diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java index f17dfce4e..1099a1528 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java @@ -162,7 +162,8 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { INITIAL, AWAIT_NEGOTIATE, AWAIT_TLS_HANDSHAKE, - AWAIT_TOKEN_EXCHANGE, + AWAIT_AUTHN_TOKEN_EXCHANGE, + AWAIT_JWT_EXCHANGE, AWAIT_SASL, FINISHED } @@ -175,8 +176,17 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { * The authentication token we'll try to connect with, maybe null. * This is fetched from {@link #securityContext} in the constructor to * ensure that it doesn't change over the course of a negotiation attempt. + * An authentication token is used as secondary credentials. */ private final SignedTokenPB authnToken; + /** + * A JSON Web Token (JWT) to authenticate this client/actor to the server + * that we'll try to connect with. Similar to {@link #authnToken}, this token + * may be null, and it's fetched from {@link #securityContext} as well. + * Cannot change over the course of an RPC connection negotiation attempt. + * + * @note unlike {@link #authnToken}, {@link #jsonWebToken} is used as primary credentials + */ private final JwtRawPB jsonWebToken; private enum AuthnTokenNotUsedReason { @@ -254,6 +264,7 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { this.requireAuthentication = requireAuthentication; this.requireEncryption = requireEncryption; this.encryptLoopback = encryptLoopback; + SignedTokenPB token = securityContext.getAuthenticationToken(); if (token != null) { if (ignoreAuthnToken) { @@ -269,9 +280,13 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { this.authnToken = null; this.authnTokenNotUsedReason = AuthnTokenNotUsedReason.NONE_AVAILABLE; } + JwtRawPB jwt = securityContext.getJsonWebToken(); - // TODO(zchovan): also guard this on the presence of certs. - this.jsonWebToken = jwt; + if (jwt != null && securityContext.hasTrustedCerts()) { + this.jsonWebToken = jwt; + } else { + this.jsonWebToken = null; + } } public void sendHello(ChannelHandlerContext ctx) { @@ -304,6 +319,15 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { AuthenticationTypePB.Token.getDefaultInstance()); } + // We may also have a JSON Web token, but it can be sent to the server + // only if we can verify the server's authenticity and the channel between + // this client and the server is confidential, i.e. it's protected by + // authenticated TLS. + if (jsonWebToken != null) { + builder.addAuthnTypesBuilder().setJwt( + AuthenticationTypePB.Jwt.getDefaultInstance()); + } + // We currently don't support client-certificate authentication from the // Java client. @@ -345,8 +369,11 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { case AWAIT_SASL: handleSaslMessage(ctx, response); break; - case AWAIT_TOKEN_EXCHANGE: - handleTokenExchangeResponse(ctx, response); + case AWAIT_AUTHN_TOKEN_EXCHANGE: + handleAuthnTokenExchangeResponse(ctx, response); + break; + case AWAIT_JWT_EXCHANGE: + handleJwtExchangeResponse(ctx, response); break; case AWAIT_TLS_HANDSHAKE: handleTlsMessage(ctx, response); @@ -595,6 +622,7 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { engine = securityContext.createSSLEngineTrustAll(); break; case TOKEN: + case JWT: engine = securityContext.createSSLEngine(); break; default: @@ -765,7 +793,7 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { } private void sendTokenExchange(ChannelHandlerContext ctx) { - // We must not send a token unless we have successfully finished + // We must not send authn token unless we have successfully finished // authenticating via TLS. Preconditions.checkNotNull(authnToken); Preconditions.checkNotNull(sslHandshakeFuture); @@ -774,26 +802,39 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder() .setStep(NegotiateStep.TOKEN_EXCHANGE) .setAuthnToken(authnToken); - state = State.AWAIT_TOKEN_EXCHANGE; + state = State.AWAIT_AUTHN_TOKEN_EXCHANGE; sendSaslMessage(ctx, builder.build()); } private void sendJwtExchange(ChannelHandlerContext ctx) { + // We must not send JWT unless we have successfully finished + // authenticating via TLS. Preconditions.checkNotNull(jsonWebToken); Preconditions.checkNotNull(sslHandshakeFuture); Preconditions.checkState(sslHandshakeFuture.isSuccess()); + RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder() .setStep(NegotiateStep.JWT_EXCHANGE) .setJwtRaw(jsonWebToken); + state = State.AWAIT_JWT_EXCHANGE; sendSaslMessage(ctx, builder.build()); } - private void handleTokenExchangeResponse(ChannelHandlerContext ctx, NegotiatePB response) + private void handleAuthnTokenExchangeResponse(ChannelHandlerContext ctx, NegotiatePB response) throws SaslException { Preconditions.checkArgument(response.getStep() == NegotiateStep.TOKEN_EXCHANGE, "expected TOKEN_EXCHANGE, got step: {}", response.getStep()); - // The token response doesn't have any actual data in it, so we can just move on. + // The authn token response doesn't have any actual data in it, so we can just move on. + finish(ctx); + } + + private void handleJwtExchangeResponse(ChannelHandlerContext ctx, NegotiatePB response) + throws SaslException { + Preconditions.checkArgument(response.getStep() == NegotiateStep.JWT_EXCHANGE, + "expected JWT_EXCHANGE, got step: {}", response.getStep()); + + // The JWT response doesn't have any actual data in it, so we can just move on. finish(ctx); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java index b02c020a4..cc2c8733d 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java @@ -326,11 +326,16 @@ class SecurityContext { if (pb.hasAuthnToken()) { authnToken = pb.getAuthnToken(); } - trustCertificates(pb.getCaCertDersList()); + + // only trust ca certificates automatically if they were acquired with mutual trust of + // identities + if (!pb.hasJwt()) { + trustCertificates(pb.getCaCertDersList()); + } if (pb.hasJwt()) { // Don't overwrite the JWT in the context if it's already set. - if (!jsonWebToken.hasJwtData() || + if (jsonWebToken == null || !jsonWebToken.hasJwtData() || (jsonWebToken.hasJwtData() && jsonWebToken.getJwtData().isEmpty())) { jsonWebToken = pb.getJwt(); } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index 917d5aed2..a3138a33a 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -42,6 +42,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; @@ -50,8 +51,10 @@ import java.io.Closeable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.math.BigDecimal; +import java.security.cert.CertificateException; import java.sql.Date; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -66,6 +69,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; import com.stumbleupon.async.Deferred; import org.junit.Before; import org.junit.Rule; @@ -1911,4 +1915,14 @@ public class TestKuduClient { AsyncKuduSession.injectLatencyBufferFlushCb(false); } } + + @Test(timeout = 50000) + public void testImportInvalidCert() throws Exception { + // An empty certificate to import. + byte[] caCert = new byte[0]; + CertificateException e = assertThrows(CertificateException.class, () -> { + client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert))); + }); + assertTrue(e.getMessage().contains("Could not parse certificate")); + } } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java index f0b35c7e2..89d2728eb 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java @@ -17,10 +17,17 @@ package org.apache.kudu.client; +import static junit.framework.TestCase.assertFalse; import static junit.framework.TestCase.assertTrue; +import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import java.io.Closeable; +import java.util.Arrays; +import com.google.protobuf.ByteString; import org.junit.Rule; import org.junit.Test; @@ -29,6 +36,7 @@ import org.apache.kudu.test.KuduTestHarness; import org.apache.kudu.test.KuduTestHarness.MasterServerConfig; import org.apache.kudu.test.cluster.FakeDNS; import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder; +import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression; public class TestNegotiation { private static final MiniKuduClusterBuilder clusterBuilder = @@ -62,4 +70,239 @@ public class TestNegotiation { assertTrue(cla.getAppendedText(), cla.getAppendedText().contains("Client requested to use mechanism: PLAIN")); } -} \ No newline at end of file + + /** + * When JWT is enabled on the server, a client with a valid JWT should be + * able to connect using the provided JSON Web Token to authenticate to Kudu + * servers (Kudu master in this particular case). + * + * In other words, when Kudu client has JWT and trusts the server's TLS + * certificate, the client and the server should negotiate a connection using + * the JSON Web Token provided by the client. + */ + @Test + @MasterServerConfig(flags = { + "--enable-jwt-token-auth", + "--rpc-trace-negotiation", + }) + public void testJwtAuthnWithTrustedCert() throws Exception { + FakeDNS.getInstance().install(); + CapturingLogAppender cla = new CapturingLogAppender(); + + // The test harness might have the client already connected to the test + // cluster. + harness.resetClients(); + KuduClient client = harness.getClient(); + String jwt = harness.createJwtFor("account-id", "kudu", true); + assertNotNull(jwt); + client.jwt(jwt); + + waitForClusterCACert(); + final byte[] caCert = harness.getClusterCACertDer(); + assertNotEquals(0, caCert.length); + client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert))); + + try (Closeable c = cla.attach()) { + // A simple call to make sure the client has connected to the cluster. + // Success here assumes that the RPC connection to the Kudu server + // has been successfully negotiated. + assertFalse(client.tableExists("nonexistent")); + } + + // Make sure the parties used JWT authn mechanism to negotiate the connection. + assertTrue(cla.getAppendedText(), + cla.getAppendedText().contains("Negotiated authn=JWT")); + } + + @Test + @MasterServerConfig(flags = { + "--enable-jwt-token-auth", + "--rpc-authentication=required", + "--rpc-negotiation-timeout-ms=1500", + "--rpc-trace-negotiation", + }) + public void testJwtAuthnWithoutTrustedCert() throws Exception { + FakeDNS.getInstance().install(); + CapturingLogAppender cla = new CapturingLogAppender(); + + harness.kdestroy(); + harness.resetClients(); + + // Create a special client with short timeout for RPCs. This is a bit funny, + // but due to the way how ConnectToMaster is implemented in the Java client, + // there isn't a simple way to stop the client to negotiate a connection + // again and again, unless the overall RPC times out. A connection closure + // upon negotiation failure is being interpreted as NetworkError, and that's + // a recoverable exception, so the operation is retried again and again. + // + // For faster test runs, the RPC timeout is set lower than the RPC connection + // negotiation timeout, while the latter is set lower than its default value + // (see the MasterServerConfig for the test). However, to prevent test + // flakiness, it's necessary to have at least connection negotiation attempt + // before the RPC times out. + AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder( + harness.getMasterAddressesAsString()) + .defaultAdminOperationTimeoutMs(1000) + .defaultOperationTimeoutMs(1000) + .build(); + KuduClient client = asyncClient.syncClient(); + + // Provide the client with a valid JWT. + String jwt = harness.createJwtFor("account-id", "kudu", true); + assertNotNull(jwt); + client.jwt(jwt); + + try (Closeable c = cla.attach()) { + // It doesn't matter what method to call here: ConnectToMaster should not + // succeed, so the corresponding RPC won't be invoked anyway. + assertFalse(client.tableExists("nonexistent")); + fail(); + } catch (NonRecoverableException ex) { + // Java client reports SERVICE_UNAVAILABLE in this case. + // + // TODO(aserbin): is this a bug? should it be fixed? + assertTrue(ex.getStatus().isServiceUnavailable()); + } + + // Make sure the parties aren't using JWT authn mechanism to negotiate the + // connection since the client shouldn't be willing to send its JWT to a + // non-authenticated Kudu server. As of now, the parties are using the SASL + // authn mechanism in current implementation, but that's not an invariant + // to enforce, so it's not asserted here. + assertFalse(cla.getAppendedText(), cla.getAppendedText().contains( + "Negotiated authn=JWT")); + assertTrue(cla.getAppendedText(), cla.getAppendedText().contains( + "server requires authentication, but client does not have Kerberos credentials (tgt).")); + assertTrue(cla.getAppendedText(), cla.getAppendedText().contains( + "Authentication tokens were not used because no token is available]")); + } + + /** + * Try to authenticate with a valid JWT by mismatched account/principal name. + * An RPC connection to the server will be established successfully, but + * the client will fail to invoke the ConnectToMaster RPC because of + * NotAuthorized error from the coarse-grain authz subsystem. + */ + @Test + @MasterServerConfig(flags = { + "--enable-jwt-token-auth", + "--rpc-trace-negotiation", + }) + public void testValidJwtButWrongSubject() throws Exception { + FakeDNS.getInstance().install(); + CapturingLogAppender cla = new CapturingLogAppender(); + + // The test harness might have the client already connected to the test + // cluster. + harness.resetClients(); + KuduClient client = harness.getClient(); + String jwt = harness.createJwtFor("account-id", "interloper", true); + assertNotNull(jwt); + client.jwt(jwt); + + waitForClusterCACert(); + final byte[] caCert = harness.getClusterCACertDer(); + assertNotEquals(0, caCert.length); + client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert))); + + try (Closeable c = cla.attach()) { + // It doesn't matter what method to call here: ConnectToMaster should not + // succeed, so the corresponding RPC won't be invoked anyway. + client.tableExists("nonexistent"); + fail(); + } catch (NonRecoverableException ex) { + // That's a bit funny, but Java client reports SERVICE_UNAVAILABLE in this + // case when failing to call a remote method due to NotAuthorized error + // code returned by Kudu master. + // + // TODO(aserbin): is this a bug? should it be fixed? + assertTrue(ex.getStatus().isServiceUnavailable()); + assertTrue(ex.getMessage().contains( + "Not authorized: unauthorized access to method: ConnectToMaster")); + } + + // Make sure the parties used JWT authn mechanism to successfully negotiate + // the connection, even if the coarse-grained authz check rejected a remote + // call of one of the API methods. + assertTrue(cla.getAppendedText(), + cla.getAppendedText().contains("Negotiated authn=JWT")); + } + + /** + * Try to authenticate with an invalid JWT. The connection negotiation + * should fail because the server should not be able to verify the invalid JWT + * that the client provided. + */ + @Test + @MasterServerConfig(flags = { + "--enable-jwt-token-auth", + "--rpc-negotiation-timeout-ms=1500", + "--rpc-trace-negotiation", + }) + public void testInvalidJwt() throws Exception { + FakeDNS.getInstance().install(); + CapturingLogAppender cla = new CapturingLogAppender(); + + // Create a special client with short timeout for RPCs. This is a bit funny, + // but due to the way how ConnectToMaster is implemented in the Java client, + // there isn't a simple way to stop the client to negotiate a connection + // again and again, unless the overall RPC times out. + // + // For faster test runs, the RPC timeout is set lower than the RPC connection + // negotiation timeout, while the latter is set lower than its default value + // (see the MasterServerConfig for the test). However, to prevent test + // flakiness, it's necessary to have at least connection negotiation attempt + // before the RPC times out. + // + // TODO(aserbin): fix ConnectToMaster and stop negotiation attempts upon receiving NotAuthorized + AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder( + harness.getMasterAddressesAsString()) + .defaultAdminOperationTimeoutMs(1000) + .defaultOperationTimeoutMs(1000) + .build(); + KuduClient client = asyncClient.syncClient(); + + String jwt = harness.createJwtFor("account-id", "kudu", false); + assertNotNull(jwt); + client.jwt(jwt); + + waitForClusterCACert(); + final byte[] caCert = harness.getClusterCACertDer(); + assertNotEquals(0, caCert.length); + client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert))); + + try (Closeable c = cla.attach()) { + // It doesn't matter what method to call here: ConnectToMaster should not + // succeed, so the corresponding RPC won't be invoked anyway. + client.tableExists("nonexistent"); + fail(); + } catch (NonRecoverableException ex) { + assertTrue(ex.getStatus().isTimedOut()); + } + + assertTrue(cla.getAppendedText(),cla.getAppendedText().contains( + "Negotiated authn=JWT")); + assertTrue(cla.getAppendedText(), cla.getAppendedText().contains( + "Negotiation complete: Not authorized: Server connection negotiation failed")); + assertTrue(cla.getAppendedText(), cla.getAppendedText().contains( + "FATAL_INVALID_JWT: Not authorized: JWT verification failed: failed to verify signature")); + assertTrue(cla.getAppendedText(), cla.getAppendedText().contains( + "Unable to connect to master")); + assertTrue(cla.getAppendedText(), cla.getAppendedText().contains( + "connection closed")); + } + + private void waitForClusterCACert() throws Exception { + // It may take some time for the catalog manager to initialize + // and have IPKI CA certificate ready. + assertEventuallyTrue( + "valid cluster IPKI CA certificate captured", + new BooleanExpression() { + @Override + public boolean get() throws Exception { + return harness.getClusterCACertDer().length != 0; + } + }, + 10000/*timeoutMillis*/); + } +} diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java index 30ea8a365..47b6656f0 100644 --- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java +++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java @@ -20,10 +20,14 @@ package org.apache.kudu.test; import static org.junit.Assert.fail; import java.io.IOException; +import java.io.InputStream; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; import java.util.List; import java.util.Random; @@ -491,6 +495,13 @@ public class KuduTestHarness extends ExternalResource { return miniCluster.createJwtFor(accountId, subject, isValid); } + /** + * @return cluster's CA certificate in DER format or null if catalog manager isn't ready + */ + public byte[] getClusterCACertDer() throws IOException { + return miniCluster.getCACertDer(); + } + /** * An annotation that can be added to each test method to * define additional master server flags to be used when diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java index 31bcdc6f9..ce51491b2 100644 --- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java +++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java @@ -26,6 +26,10 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; import java.nio.file.Paths; import java.security.Security; import java.util.ArrayList; @@ -34,7 +38,9 @@ import java.util.Map; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.yetus.audience.InterfaceAudience; @@ -96,6 +102,7 @@ public final class MiniKuduCluster implements AutoCloseable { DaemonIdentifierPB id; boolean isRunning; boolean isPaused; + String webServerAddress; } // Map of master addresses to daemon information. @@ -113,7 +120,6 @@ public final class MiniKuduCluster implements AutoCloseable { private final ImmutableList<String> locationInfo; private final String clusterRoot; private final String principal; - private final boolean enableClientJwt; private MiniKdcOptionsPB kdcOptionsPb; private final Common.HmsMode hmsMode; @@ -129,7 +135,6 @@ public final class MiniKuduCluster implements AutoCloseable { String clusterRoot, Common.HmsMode hmsMode, String principal, - boolean enableClientJwt, MiniOidcOptionsPB oidcOptionsPb) { this.enableKerberos = enableKerberos; this.numMasters = numMasters; @@ -140,7 +145,6 @@ public final class MiniKuduCluster implements AutoCloseable { this.kdcOptionsPb = kdcOptionsPb; this.principal = principal; this.hmsMode = hmsMode; - this.enableClientJwt = enableClientJwt; this.oidcOptionsPb = oidcOptionsPb; if (clusterRoot == null) { @@ -290,6 +294,8 @@ public final class MiniKuduCluster implements AutoCloseable { d.id = info.getId(); d.isRunning = true; d.isPaused = false; + d.webServerAddress = String.join(":", info.getBoundHttpAddress().getHost(), + Integer.toString(info.getBoundHttpAddress().getPort())); masterServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), d); } resp = sendRequestToCluster( @@ -301,17 +307,31 @@ public final class MiniKuduCluster implements AutoCloseable { d.id = info.getId(); d.isRunning = true; d.isPaused = false; + d.webServerAddress = String.join(":", info.getBoundHttpAddress().getHost(), + Integer.toString(info.getBoundHttpAddress().getPort())); tabletServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), d); } } /** - * @return comma-separated list of master server addresses + * @return a comma-separated list of RPC addresses of all masters in the cluster */ public String getMasterAddressesAsString() { return Joiner.on(',').join(masterServers.keySet()); } + /** + * @return a comma-separated list of webserver addresses of all masters in the cluster + */ + public String getMasterWebServerAddressesAsString() { + List<String> addresses = new ArrayList<String>(); + masterServers.forEach((hp, daemonInfo) -> { + addresses.add(daemonInfo.webServerAddress); + }); + + return Joiner.on(',').join(addresses); + } + /** * @return the list of master servers */ @@ -683,6 +703,33 @@ public final class MiniKuduCluster implements AutoCloseable { return clusterRoot; } + /** + * @return cluster's CA certificate in DER format or an empty array + */ + public byte[] getCACertDer() throws IOException { + String masterHttpAddr = Iterables.get(Splitter.on(',') + .split(getMasterWebServerAddressesAsString()), 0); + URL url = new URL("http://" + masterHttpAddr + "/ipki-ca-cert-der"); + HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + connection.setRequestMethod("GET"); + connection.connect(); + + if (connection.getResponseCode() != 200) { + connection.disconnect(); + return new byte[0]; + } + + InputStream urlData = connection.getInputStream(); + int contentSize = connection.getContentLength(); + byte[] data = new byte[contentSize]; + int numBytesRead = urlData.read(data); + if (numBytesRead != contentSize) { + connection.disconnect(); + return new byte[0]; + } + return data; + } + /** * Helper runnable that receives stderr and logs it along with the process' identifier. */ @@ -725,7 +772,6 @@ public final class MiniKuduCluster implements AutoCloseable { private final List<String> locationInfo = new ArrayList<>(); private String clusterRoot = null; private String principal = "kudu"; - private boolean enableClientJwt = false; private MiniKdcOptionsPB.Builder kdcOptionsPb = MiniKdcOptionsPB.newBuilder(); private MiniOidcOptionsPB.Builder oidcOptionsPb = MiniOidcOptionsPB.newBuilder(); @@ -750,11 +796,6 @@ public final class MiniKuduCluster implements AutoCloseable { return this; } - public MiniKuduClusterBuilder enableClientJwt() { - enableClientJwt = true; - return this; - } - public MiniKuduClusterBuilder enableHiveMetastoreIntegration() { hmsMode = Common.HmsMode.ENABLE_METASTORE_INTEGRATION; return this; @@ -836,7 +877,7 @@ public final class MiniKuduCluster implements AutoCloseable { new MiniKuduCluster(enableKerberos, numMasterServers, numTabletServers, extraTabletServerFlags, extraMasterServerFlags, locationInfo, - kdcOptionsPb.build(), clusterRoot, hmsMode, principal, enableClientJwt, + kdcOptionsPb.build(), clusterRoot, hmsMode, principal, oidcOptionsPb.build()); try { cluster.start(); diff --git a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java index c31efb399..db11df73a 100644 --- a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java +++ b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java @@ -17,7 +17,9 @@ package org.apache.kudu.test; +import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -25,25 +27,21 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.net.Socket; +import java.util.Arrays; import com.google.protobuf.ByteString; import org.junit.Rule; import org.junit.Test; -import org.apache.kudu.client.AsyncKuduClient; -import org.apache.kudu.client.Client.AuthenticationCredentialsPB; import org.apache.kudu.client.HostAndPort; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduClient.KuduClientBuilder; import org.apache.kudu.client.ListTablesResponse; import org.apache.kudu.client.TimeoutTracker; -import org.apache.kudu.security.Token.JwtRawPB; import org.apache.kudu.test.cluster.FakeDNS; import org.apache.kudu.test.cluster.MiniKuduCluster; -import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder; +import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression; import org.apache.kudu.test.junit.RetryRule; -import org.apache.kudu.tools.Tool.CreateClusterRequestPB.JwksOptionsPB; -import org.apache.kudu.tools.Tool.CreateClusterRequestPB.MiniOidcOptionsPB; public class TestMiniKuduCluster { @@ -118,19 +116,32 @@ public class TestMiniKuduCluster { } @Test(timeout = 50000) - public void testJwt() throws Exception { - try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder() - .numMasterServers(NUM_MASTERS) - .numTabletServers(0) - .enableClientJwt() - .addJwks("account-id", true) - .build(); - KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) { - String jwt = cluster.createJwtFor("account-id", "subject", true); + public void testJwtAuthn() throws Exception { + try (MiniKuduCluster cluster = createClusterForJwtTest(); + KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) { + // It may take some time for the catalog manager to initialize + // and have IPKI CA certificate ready. + assertEventuallyTrue( + "valid cluster IPKI CA certificate captured", + new BooleanExpression() { + @Override + public boolean get() throws Exception { + return cluster.getCACertDer().length != 0; + } + }, + 10000/*timeoutMillis*/); + byte[] caCert = cluster.getCACertDer(); + assertNotEquals(0, caCert.length); + + String jwt = cluster.createJwtFor("account-id", "kudu", true); assertNotNull(jwt); - client.jwt(jwt); - client.getTablesList(); + client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert))); + + // A simple call to make sure the client can connect to the cluster. + // That assumes an RPC connection to the master has been successfully + // negotiated. + assertTrue(client.getTablesList().getTablesList().isEmpty()); } } @@ -167,7 +178,7 @@ public class TestMiniKuduCluster { * @param testIsOpen true if we should want it to be open, false if we want it closed */ private static void testHostPort(HostAndPort hp, - boolean testIsOpen) throws InterruptedException { + boolean testIsOpen) throws InterruptedException { TimeoutTracker tracker = new TimeoutTracker(); while (tracker.getElapsedMillis() < SLEEP_TIME_MS) { try { @@ -185,4 +196,17 @@ public class TestMiniKuduCluster { } fail("HostAndPort " + hp + " is still " + (testIsOpen ? "closed " : "open")); } + + private MiniKuduCluster createClusterForJwtTest() throws IOException { + return new MiniKuduCluster.MiniKuduClusterBuilder() + .numMasterServers(NUM_MASTERS) + .numTabletServers(0) + .addMasterServerFlag("--enable-jwt-token-auth") + .addMasterServerFlag("--rpc-trace-negotiation") + .addMasterServerFlag("--rpc-authentication=required") + .addMasterServerFlag("--rpc-encryption=required") + .enableKerberos() + .addJwks("account-id", true) + .build(); + } }
