This is an automated email from the ASF dual-hosted git repository.
abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new a0c0c44b9 [java] allow adding CA certs to the client's trust chain
a0c0c44b9 is described below
commit a0c0c44b91a795022eb46fd450d05e76acf3d718
Author: Zoltan Chovan <[email protected]>
AuthorDate: Wed May 31 13:50:38 2023 +0000
[java] allow adding CA certs to the client's trust chain
With this patch, the Java client doesn't implicitly trust the TLS
certificate of a Kudu server it's negotiating an RPC connection with
when using its JWT. There is also an update on the Negotiator class
to fix various JWT-related issues during RPC connection negotiation.
To implement that, this patch introduces the following new methods:
* [Async]KuduClient.trustedCertificates(List<ByteString> certs)
to add the specified CA certificates (in DER format) as the
trusted ones for the client (the call replaces all the previously
set certificates if invoked again)
* KuduTestHarness.getClusterCACertDer() to acquire the IPKI CA
certificate of the test mini-cluster via call to
MiniKuduCluster.getCACertDer()
* MiniKuduCluster.getMasterWebServerAddressesAsString() which
returns the addresses of Master's webserver as a comma separated
string, similar to MiniKuduCluster.getMasterAddressesAsString()
In addition, this changelist also contains tests to cover the newly
introduced functionality.
Change-Id: I281fd59da0d24a6119b99e1da8096f381b414a2b
Reviewed-on: http://gerrit.cloudera.org:8080/20017
Tested-by: Kudu Jenkins
Reviewed-by: Attila Bukor <[email protected]>
---
.../org/apache/kudu/client/AsyncKuduClient.java | 27 ++-
.../java/org/apache/kudu/client/KuduClient.java | 14 +-
.../java/org/apache/kudu/client/Negotiator.java | 59 ++++-
.../org/apache/kudu/client/SecurityContext.java | 9 +-
.../org/apache/kudu/client/TestKuduClient.java | 14 ++
.../org/apache/kudu/client/TestNegotiation.java | 245 ++++++++++++++++++++-
.../java/org/apache/kudu/test/KuduTestHarness.java | 11 +
.../apache/kudu/test/cluster/MiniKuduCluster.java | 63 +++++-
.../org/apache/kudu/test/TestMiniKuduCluster.java | 60 +++--
9 files changed, 455 insertions(+), 47 deletions(-)
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 68aa1b2fc..b9ada6de4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -75,6 +75,7 @@ import org.slf4j.LoggerFactory;
import org.apache.kudu.Common;
import org.apache.kudu.Schema;
+import org.apache.kudu.client.Client.AuthenticationCredentialsPB;
import org.apache.kudu.master.Master;
import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
import org.apache.kudu.master.Master.TSInfoPB;
@@ -1178,6 +1179,18 @@ public class AsyncKuduClient implements AutoCloseable {
securityContext.importAuthenticationCredentials(authnData);
}
+ /**
+ * Mark the given CA certificates (in DER format) as the trusted ones for the
+ * client. The provided list of certificates replaces any previously set
ones.
+ *
+ * @param certificates list of certificates to trust (in DER format)
+ * @throws CertificateException if any of the specified certificates were
invalid
+ */
+ @InterfaceStability.Unstable
+ public void trustedCertificates(List<ByteString> certificates) throws
CertificateException {
+ securityContext.trustCertificates(certificates);
+ }
+
/**
* Set JWT (JSON Web Token) to authenticate the client to a server.
* <p>
@@ -1190,10 +1203,13 @@ public class AsyncKuduClient implements AutoCloseable {
*/
@InterfaceStability.Unstable
public void jwt(String jwt) {
- Token.JwtRawPB jwtPB = Token.JwtRawPB.newBuilder()
- .setJwtData(ByteString.copyFromUtf8(jwt))
- .build();
- securityContext.setJsonWebToken(jwtPB);
+ AuthenticationCredentialsPB credentials =
+ AuthenticationCredentialsPB.newBuilder()
+ .setJwt(Token.JwtRawPB.newBuilder()
+ .setJwtData(ByteString.copyFromUtf8(jwt))
+ .build())
+ .build();
+ securityContext.importAuthenticationCredentials(credentials.toByteArray());
}
/**
@@ -1945,7 +1961,8 @@ public class AsyncKuduClient implements AutoCloseable {
securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken());
}
List<ByteString> caCerts =
resp.getConnectResponse().getCaCertDerList();
- if (!caCerts.isEmpty()) {
+ if (!caCerts.isEmpty() && (securityContext.getJsonWebToken() ==
null ||
+
!securityContext.getJsonWebToken().hasJwtData())) {
try {
securityContext.trustCertificates(caCerts);
} catch (CertificateException e) {
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index ed2f90e30..6d8e7f82b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -17,6 +17,7 @@
package org.apache.kudu.client;
+import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
@@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory;
import org.apache.kudu.Schema;
import org.apache.kudu.master.Master.TableIdentifierPB;
-import org.apache.kudu.security.Token;
/**
* A synchronous and thread-safe client for Kudu.
@@ -468,6 +468,18 @@ public class KuduClient implements AutoCloseable {
asyncClient.importAuthenticationCredentials(authnData);
}
+ /**
+ * Mark the given CA certificates (in DER format) as the trusted ones for the
+ * client. The provided list of certificates replaces any previously set
ones.
+ *
+ * @param certificates list of certificates to trust (in DER format)
+ * @throws CertificateException if any of the specified certificates were
invalid
+ */
+ @InterfaceStability.Unstable
+ public void trustedCertificates(List<ByteString> certificates) throws
CertificateException {
+ asyncClient.trustedCertificates(certificates);
+ }
+
/**
* Set JWT (JSON Web Token) to authenticate the client to a server.
* <p>
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index f17dfce4e..1099a1528 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -162,7 +162,8 @@ public class Negotiator extends
SimpleChannelInboundHandler<CallResponse> {
INITIAL,
AWAIT_NEGOTIATE,
AWAIT_TLS_HANDSHAKE,
- AWAIT_TOKEN_EXCHANGE,
+ AWAIT_AUTHN_TOKEN_EXCHANGE,
+ AWAIT_JWT_EXCHANGE,
AWAIT_SASL,
FINISHED
}
@@ -175,8 +176,17 @@ public class Negotiator extends
SimpleChannelInboundHandler<CallResponse> {
* The authentication token we'll try to connect with, maybe null.
* This is fetched from {@link #securityContext} in the constructor to
* ensure that it doesn't change over the course of a negotiation attempt.
+ * An authentication token is used as secondary credentials.
*/
private final SignedTokenPB authnToken;
+ /**
+ * A JSON Web Token (JWT) to authenticate this client/actor to the server
+ * that we'll try to connect with. Similar to {@link #authnToken}, this token
+ * may be null, and it's fetched from {@link #securityContext} as well.
+ * Cannot change over the course of an RPC connection negotiation attempt.
+ *
+ * @note unlike {@link #authnToken}, {@link #jsonWebToken} is used as
primary credentials
+ */
private final JwtRawPB jsonWebToken;
private enum AuthnTokenNotUsedReason {
@@ -254,6 +264,7 @@ public class Negotiator extends
SimpleChannelInboundHandler<CallResponse> {
this.requireAuthentication = requireAuthentication;
this.requireEncryption = requireEncryption;
this.encryptLoopback = encryptLoopback;
+
SignedTokenPB token = securityContext.getAuthenticationToken();
if (token != null) {
if (ignoreAuthnToken) {
@@ -269,9 +280,13 @@ public class Negotiator extends
SimpleChannelInboundHandler<CallResponse> {
this.authnToken = null;
this.authnTokenNotUsedReason = AuthnTokenNotUsedReason.NONE_AVAILABLE;
}
+
JwtRawPB jwt = securityContext.getJsonWebToken();
- // TODO(zchovan): also guard this on the presence of certs.
- this.jsonWebToken = jwt;
+ if (jwt != null && securityContext.hasTrustedCerts()) {
+ this.jsonWebToken = jwt;
+ } else {
+ this.jsonWebToken = null;
+ }
}
public void sendHello(ChannelHandlerContext ctx) {
@@ -304,6 +319,15 @@ public class Negotiator extends
SimpleChannelInboundHandler<CallResponse> {
AuthenticationTypePB.Token.getDefaultInstance());
}
+ // We may also have a JSON Web token, but it can be sent to the server
+ // only if we can verify the server's authenticity and the channel between
+ // this client and the server is confidential, i.e. it's protected by
+ // authenticated TLS.
+ if (jsonWebToken != null) {
+ builder.addAuthnTypesBuilder().setJwt(
+ AuthenticationTypePB.Jwt.getDefaultInstance());
+ }
+
// We currently don't support client-certificate authentication from the
// Java client.
@@ -345,8 +369,11 @@ public class Negotiator extends
SimpleChannelInboundHandler<CallResponse> {
case AWAIT_SASL:
handleSaslMessage(ctx, response);
break;
- case AWAIT_TOKEN_EXCHANGE:
- handleTokenExchangeResponse(ctx, response);
+ case AWAIT_AUTHN_TOKEN_EXCHANGE:
+ handleAuthnTokenExchangeResponse(ctx, response);
+ break;
+ case AWAIT_JWT_EXCHANGE:
+ handleJwtExchangeResponse(ctx, response);
break;
case AWAIT_TLS_HANDSHAKE:
handleTlsMessage(ctx, response);
@@ -595,6 +622,7 @@ public class Negotiator extends
SimpleChannelInboundHandler<CallResponse> {
engine = securityContext.createSSLEngineTrustAll();
break;
case TOKEN:
+ case JWT:
engine = securityContext.createSSLEngine();
break;
default:
@@ -765,7 +793,7 @@ public class Negotiator extends
SimpleChannelInboundHandler<CallResponse> {
}
private void sendTokenExchange(ChannelHandlerContext ctx) {
- // We must not send a token unless we have successfully finished
+ // We must not send authn token unless we have successfully finished
// authenticating via TLS.
Preconditions.checkNotNull(authnToken);
Preconditions.checkNotNull(sslHandshakeFuture);
@@ -774,26 +802,39 @@ public class Negotiator extends
SimpleChannelInboundHandler<CallResponse> {
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
.setStep(NegotiateStep.TOKEN_EXCHANGE)
.setAuthnToken(authnToken);
- state = State.AWAIT_TOKEN_EXCHANGE;
+ state = State.AWAIT_AUTHN_TOKEN_EXCHANGE;
sendSaslMessage(ctx, builder.build());
}
private void sendJwtExchange(ChannelHandlerContext ctx) {
+ // We must not send JWT unless we have successfully finished
+ // authenticating via TLS.
Preconditions.checkNotNull(jsonWebToken);
Preconditions.checkNotNull(sslHandshakeFuture);
Preconditions.checkState(sslHandshakeFuture.isSuccess());
+
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
.setStep(NegotiateStep.JWT_EXCHANGE)
.setJwtRaw(jsonWebToken);
+ state = State.AWAIT_JWT_EXCHANGE;
sendSaslMessage(ctx, builder.build());
}
- private void handleTokenExchangeResponse(ChannelHandlerContext ctx,
NegotiatePB response)
+ private void handleAuthnTokenExchangeResponse(ChannelHandlerContext ctx,
NegotiatePB response)
throws SaslException {
Preconditions.checkArgument(response.getStep() ==
NegotiateStep.TOKEN_EXCHANGE,
"expected TOKEN_EXCHANGE, got step: {}", response.getStep());
- // The token response doesn't have any actual data in it, so we can just
move on.
+ // The authn token response doesn't have any actual data in it, so we can
just move on.
+ finish(ctx);
+ }
+
+ private void handleJwtExchangeResponse(ChannelHandlerContext ctx,
NegotiatePB response)
+ throws SaslException {
+ Preconditions.checkArgument(response.getStep() ==
NegotiateStep.JWT_EXCHANGE,
+ "expected JWT_EXCHANGE, got step: {}", response.getStep());
+
+ // The JWT response doesn't have any actual data in it, so we can just
move on.
finish(ctx);
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
index b02c020a4..cc2c8733d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
@@ -326,11 +326,16 @@ class SecurityContext {
if (pb.hasAuthnToken()) {
authnToken = pb.getAuthnToken();
}
- trustCertificates(pb.getCaCertDersList());
+
+ // only trust ca certificates automatically if they were acquired with
mutual trust of
+ // identities
+ if (!pb.hasJwt()) {
+ trustCertificates(pb.getCaCertDersList());
+ }
if (pb.hasJwt()) {
// Don't overwrite the JWT in the context if it's already set.
- if (!jsonWebToken.hasJwtData() ||
+ if (jsonWebToken == null || !jsonWebToken.hasJwtData() ||
(jsonWebToken.hasJwtData() &&
jsonWebToken.getJwtData().isEmpty())) {
jsonWebToken = pb.getJwt();
}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 917d5aed2..a3138a33a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -42,6 +42,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
@@ -50,8 +51,10 @@ import java.io.Closeable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
+import java.security.cert.CertificateException;
import java.sql.Date;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -66,6 +69,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
import com.stumbleupon.async.Deferred;
import org.junit.Before;
import org.junit.Rule;
@@ -1911,4 +1915,14 @@ public class TestKuduClient {
AsyncKuduSession.injectLatencyBufferFlushCb(false);
}
}
+
+ @Test(timeout = 50000)
+ public void testImportInvalidCert() throws Exception {
+ // An empty certificate to import.
+ byte[] caCert = new byte[0];
+ CertificateException e = assertThrows(CertificateException.class, () -> {
+ client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+ });
+ assertTrue(e.getMessage().contains("Could not parse certificate"));
+ }
}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java
index f0b35c7e2..89d2728eb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiation.java
@@ -17,10 +17,17 @@
package org.apache.kudu.client;
+import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertTrue;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
import java.io.Closeable;
+import java.util.Arrays;
+import com.google.protobuf.ByteString;
import org.junit.Rule;
import org.junit.Test;
@@ -29,6 +36,7 @@ import org.apache.kudu.test.KuduTestHarness;
import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
import org.apache.kudu.test.cluster.FakeDNS;
import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
+import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression;
public class TestNegotiation {
private static final MiniKuduClusterBuilder clusterBuilder =
@@ -62,4 +70,239 @@ public class TestNegotiation {
assertTrue(cla.getAppendedText(),
cla.getAppendedText().contains("Client requested to use
mechanism: PLAIN"));
}
-}
\ No newline at end of file
+
+ /**
+ * When JWT is enabled on the server, a client with a valid JWT should be
+ * able to connect using the provided JSON Web Token to authenticate to Kudu
+ * servers (Kudu master in this particular case).
+ *
+ * In other words, when Kudu client has JWT and trusts the server's TLS
+ * certificate, the client and the server should negotiate a connection using
+ * the JSON Web Token provided by the client.
+ */
+ @Test
+ @MasterServerConfig(flags = {
+ "--enable-jwt-token-auth",
+ "--rpc-trace-negotiation",
+ })
+ public void testJwtAuthnWithTrustedCert() throws Exception {
+ FakeDNS.getInstance().install();
+ CapturingLogAppender cla = new CapturingLogAppender();
+
+ // The test harness might have the client already connected to the test
+ // cluster.
+ harness.resetClients();
+ KuduClient client = harness.getClient();
+ String jwt = harness.createJwtFor("account-id", "kudu", true);
+ assertNotNull(jwt);
+ client.jwt(jwt);
+
+ waitForClusterCACert();
+ final byte[] caCert = harness.getClusterCACertDer();
+ assertNotEquals(0, caCert.length);
+ client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+
+ try (Closeable c = cla.attach()) {
+ // A simple call to make sure the client has connected to the cluster.
+ // Success here assumes that the RPC connection to the Kudu server
+ // has been successfully negotiated.
+ assertFalse(client.tableExists("nonexistent"));
+ }
+
+ // Make sure the parties used JWT authn mechanism to negotiate the
connection.
+ assertTrue(cla.getAppendedText(),
+ cla.getAppendedText().contains("Negotiated authn=JWT"));
+ }
+
+ @Test
+ @MasterServerConfig(flags = {
+ "--enable-jwt-token-auth",
+ "--rpc-authentication=required",
+ "--rpc-negotiation-timeout-ms=1500",
+ "--rpc-trace-negotiation",
+ })
+ public void testJwtAuthnWithoutTrustedCert() throws Exception {
+ FakeDNS.getInstance().install();
+ CapturingLogAppender cla = new CapturingLogAppender();
+
+ harness.kdestroy();
+ harness.resetClients();
+
+ // Create a special client with short timeout for RPCs. This is a bit
funny,
+ // but due to the way how ConnectToMaster is implemented in the Java
client,
+ // there isn't a simple way to stop the client to negotiate a connection
+ // again and again, unless the overall RPC times out. A connection closure
+ // upon negotiation failure is being interpreted as NetworkError, and
that's
+ // a recoverable exception, so the operation is retried again and again.
+ //
+ // For faster test runs, the RPC timeout is set lower than the RPC
connection
+ // negotiation timeout, while the latter is set lower than its default
value
+ // (see the MasterServerConfig for the test). However, to prevent test
+ // flakiness, it's necessary to have at least connection negotiation
attempt
+ // before the RPC times out.
+ AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
+ harness.getMasterAddressesAsString())
+ .defaultAdminOperationTimeoutMs(1000)
+ .defaultOperationTimeoutMs(1000)
+ .build();
+ KuduClient client = asyncClient.syncClient();
+
+ // Provide the client with a valid JWT.
+ String jwt = harness.createJwtFor("account-id", "kudu", true);
+ assertNotNull(jwt);
+ client.jwt(jwt);
+
+ try (Closeable c = cla.attach()) {
+ // It doesn't matter what method to call here: ConnectToMaster should not
+ // succeed, so the corresponding RPC won't be invoked anyway.
+ assertFalse(client.tableExists("nonexistent"));
+ fail();
+ } catch (NonRecoverableException ex) {
+ // Java client reports SERVICE_UNAVAILABLE in this case.
+ //
+ // TODO(aserbin): is this a bug? should it be fixed?
+ assertTrue(ex.getStatus().isServiceUnavailable());
+ }
+
+ // Make sure the parties aren't using JWT authn mechanism to negotiate the
+ // connection since the client shouldn't be willing to send its JWT to a
+ // non-authenticated Kudu server. As of now, the parties are using the SASL
+ // authn mechanism in current implementation, but that's not an invariant
+ // to enforce, so it's not asserted here.
+ assertFalse(cla.getAppendedText(), cla.getAppendedText().contains(
+ "Negotiated authn=JWT"));
+ assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+ "server requires authentication, but client does not have Kerberos
credentials (tgt)."));
+ assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+ "Authentication tokens were not used because no token is available]"));
+ }
+
+ /**
+ * Try to authenticate with a valid JWT by mismatched account/principal name.
+ * An RPC connection to the server will be established successfully, but
+ * the client will fail to invoke the ConnectToMaster RPC because of
+ * NotAuthorized error from the coarse-grain authz subsystem.
+ */
+ @Test
+ @MasterServerConfig(flags = {
+ "--enable-jwt-token-auth",
+ "--rpc-trace-negotiation",
+ })
+ public void testValidJwtButWrongSubject() throws Exception {
+ FakeDNS.getInstance().install();
+ CapturingLogAppender cla = new CapturingLogAppender();
+
+ // The test harness might have the client already connected to the test
+ // cluster.
+ harness.resetClients();
+ KuduClient client = harness.getClient();
+ String jwt = harness.createJwtFor("account-id", "interloper", true);
+ assertNotNull(jwt);
+ client.jwt(jwt);
+
+ waitForClusterCACert();
+ final byte[] caCert = harness.getClusterCACertDer();
+ assertNotEquals(0, caCert.length);
+ client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+
+ try (Closeable c = cla.attach()) {
+ // It doesn't matter what method to call here: ConnectToMaster should not
+ // succeed, so the corresponding RPC won't be invoked anyway.
+ client.tableExists("nonexistent");
+ fail();
+ } catch (NonRecoverableException ex) {
+ // That's a bit funny, but Java client reports SERVICE_UNAVAILABLE in
this
+ // case when failing to call a remote method due to NotAuthorized error
+ // code returned by Kudu master.
+ //
+ // TODO(aserbin): is this a bug? should it be fixed?
+ assertTrue(ex.getStatus().isServiceUnavailable());
+ assertTrue(ex.getMessage().contains(
+ "Not authorized: unauthorized access to method: ConnectToMaster"));
+ }
+
+ // Make sure the parties used JWT authn mechanism to successfully negotiate
+ // the connection, even if the coarse-grained authz check rejected a remote
+ // call of one of the API methods.
+ assertTrue(cla.getAppendedText(),
+ cla.getAppendedText().contains("Negotiated authn=JWT"));
+ }
+
+ /**
+ * Try to authenticate with an invalid JWT. The connection negotiation
+ * should fail because the server should not be able to verify the invalid
JWT
+ * that the client provided.
+ */
+ @Test
+ @MasterServerConfig(flags = {
+ "--enable-jwt-token-auth",
+ "--rpc-negotiation-timeout-ms=1500",
+ "--rpc-trace-negotiation",
+ })
+ public void testInvalidJwt() throws Exception {
+ FakeDNS.getInstance().install();
+ CapturingLogAppender cla = new CapturingLogAppender();
+
+ // Create a special client with short timeout for RPCs. This is a bit
funny,
+ // but due to the way how ConnectToMaster is implemented in the Java
client,
+ // there isn't a simple way to stop the client to negotiate a connection
+ // again and again, unless the overall RPC times out.
+ //
+ // For faster test runs, the RPC timeout is set lower than the RPC
connection
+ // negotiation timeout, while the latter is set lower than its default
value
+ // (see the MasterServerConfig for the test). However, to prevent test
+ // flakiness, it's necessary to have at least connection negotiation
attempt
+ // before the RPC times out.
+ //
+ // TODO(aserbin): fix ConnectToMaster and stop negotiation attempts upon
receiving NotAuthorized
+ AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
+ harness.getMasterAddressesAsString())
+ .defaultAdminOperationTimeoutMs(1000)
+ .defaultOperationTimeoutMs(1000)
+ .build();
+ KuduClient client = asyncClient.syncClient();
+
+ String jwt = harness.createJwtFor("account-id", "kudu", false);
+ assertNotNull(jwt);
+ client.jwt(jwt);
+
+ waitForClusterCACert();
+ final byte[] caCert = harness.getClusterCACertDer();
+ assertNotEquals(0, caCert.length);
+ client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+
+ try (Closeable c = cla.attach()) {
+ // It doesn't matter what method to call here: ConnectToMaster should not
+ // succeed, so the corresponding RPC won't be invoked anyway.
+ client.tableExists("nonexistent");
+ fail();
+ } catch (NonRecoverableException ex) {
+ assertTrue(ex.getStatus().isTimedOut());
+ }
+
+ assertTrue(cla.getAppendedText(),cla.getAppendedText().contains(
+ "Negotiated authn=JWT"));
+ assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+ "Negotiation complete: Not authorized: Server connection negotiation
failed"));
+ assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+ "FATAL_INVALID_JWT: Not authorized: JWT verification failed: failed to
verify signature"));
+ assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+ "Unable to connect to master"));
+ assertTrue(cla.getAppendedText(), cla.getAppendedText().contains(
+ "connection closed"));
+ }
+
+ private void waitForClusterCACert() throws Exception {
+ // It may take some time for the catalog manager to initialize
+ // and have IPKI CA certificate ready.
+ assertEventuallyTrue(
+ "valid cluster IPKI CA certificate captured",
+ new BooleanExpression() {
+ @Override
+ public boolean get() throws Exception {
+ return harness.getClusterCACertDer().length != 0;
+ }
+ },
+ 10000/*timeoutMillis*/);
+ }
+}
diff --git
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
index 30ea8a365..47b6656f0 100644
---
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
+++
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
@@ -20,10 +20,14 @@ package org.apache.kudu.test;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
import java.util.List;
import java.util.Random;
@@ -491,6 +495,13 @@ public class KuduTestHarness extends ExternalResource {
return miniCluster.createJwtFor(accountId, subject, isValid);
}
+ /**
+ * @return cluster's CA certificate in DER format or null if catalog manager
isn't ready
+ */
+ public byte[] getClusterCACertDer() throws IOException {
+ return miniCluster.getCACertDer();
+ }
+
/**
* An annotation that can be added to each test method to
* define additional master server flags to be used when
diff --git
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
index 31bcdc6f9..ce51491b2 100644
---
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
+++
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
@@ -26,6 +26,10 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
import java.nio.file.Paths;
import java.security.Security;
import java.util.ArrayList;
@@ -34,7 +38,9 @@ import java.util.Map;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.yetus.audience.InterfaceAudience;
@@ -96,6 +102,7 @@ public final class MiniKuduCluster implements AutoCloseable {
DaemonIdentifierPB id;
boolean isRunning;
boolean isPaused;
+ String webServerAddress;
}
// Map of master addresses to daemon information.
@@ -113,7 +120,6 @@ public final class MiniKuduCluster implements AutoCloseable
{
private final ImmutableList<String> locationInfo;
private final String clusterRoot;
private final String principal;
- private final boolean enableClientJwt;
private MiniKdcOptionsPB kdcOptionsPb;
private final Common.HmsMode hmsMode;
@@ -129,7 +135,6 @@ public final class MiniKuduCluster implements AutoCloseable
{
String clusterRoot,
Common.HmsMode hmsMode,
String principal,
- boolean enableClientJwt,
MiniOidcOptionsPB oidcOptionsPb) {
this.enableKerberos = enableKerberos;
this.numMasters = numMasters;
@@ -140,7 +145,6 @@ public final class MiniKuduCluster implements AutoCloseable
{
this.kdcOptionsPb = kdcOptionsPb;
this.principal = principal;
this.hmsMode = hmsMode;
- this.enableClientJwt = enableClientJwt;
this.oidcOptionsPb = oidcOptionsPb;
if (clusterRoot == null) {
@@ -290,6 +294,8 @@ public final class MiniKuduCluster implements AutoCloseable
{
d.id = info.getId();
d.isRunning = true;
d.isPaused = false;
+ d.webServerAddress = String.join(":",
info.getBoundHttpAddress().getHost(),
+
Integer.toString(info.getBoundHttpAddress().getPort()));
masterServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()),
d);
}
resp = sendRequestToCluster(
@@ -301,17 +307,31 @@ public final class MiniKuduCluster implements
AutoCloseable {
d.id = info.getId();
d.isRunning = true;
d.isPaused = false;
+ d.webServerAddress = String.join(":",
info.getBoundHttpAddress().getHost(),
+
Integer.toString(info.getBoundHttpAddress().getPort()));
tabletServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()),
d);
}
}
/**
- * @return comma-separated list of master server addresses
+ * @return a comma-separated list of RPC addresses of all masters in the
cluster
*/
public String getMasterAddressesAsString() {
return Joiner.on(',').join(masterServers.keySet());
}
+ /**
+ * @return a comma-separated list of webserver addresses of all masters in
the cluster
+ */
+ public String getMasterWebServerAddressesAsString() {
+ List<String> addresses = new ArrayList<String>();
+ masterServers.forEach((hp, daemonInfo) -> {
+ addresses.add(daemonInfo.webServerAddress);
+ });
+
+ return Joiner.on(',').join(addresses);
+ }
+
/**
* @return the list of master servers
*/
@@ -683,6 +703,33 @@ public final class MiniKuduCluster implements
AutoCloseable {
return clusterRoot;
}
+ /**
+ * @return cluster's CA certificate in DER format or an empty array
+ */
+ public byte[] getCACertDer() throws IOException {
+ String masterHttpAddr = Iterables.get(Splitter.on(',')
+
.split(getMasterWebServerAddressesAsString()), 0);
+ URL url = new URL("http://" + masterHttpAddr + "/ipki-ca-cert-der");
+ HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.connect();
+
+ if (connection.getResponseCode() != 200) {
+ connection.disconnect();
+ return new byte[0];
+ }
+
+ InputStream urlData = connection.getInputStream();
+ int contentSize = connection.getContentLength();
+ byte[] data = new byte[contentSize];
+ int numBytesRead = urlData.read(data);
+ if (numBytesRead != contentSize) {
+ connection.disconnect();
+ return new byte[0];
+ }
+ return data;
+ }
+
/**
* Helper runnable that receives stderr and logs it along with the process'
identifier.
*/
@@ -725,7 +772,6 @@ public final class MiniKuduCluster implements AutoCloseable
{
private final List<String> locationInfo = new ArrayList<>();
private String clusterRoot = null;
private String principal = "kudu";
- private boolean enableClientJwt = false;
private MiniKdcOptionsPB.Builder kdcOptionsPb =
MiniKdcOptionsPB.newBuilder();
private MiniOidcOptionsPB.Builder oidcOptionsPb =
MiniOidcOptionsPB.newBuilder();
@@ -750,11 +796,6 @@ public final class MiniKuduCluster implements
AutoCloseable {
return this;
}
- public MiniKuduClusterBuilder enableClientJwt() {
- enableClientJwt = true;
- return this;
- }
-
public MiniKuduClusterBuilder enableHiveMetastoreIntegration() {
hmsMode = Common.HmsMode.ENABLE_METASTORE_INTEGRATION;
return this;
@@ -836,7 +877,7 @@ public final class MiniKuduCluster implements AutoCloseable
{
new MiniKuduCluster(enableKerberos,
numMasterServers, numTabletServers,
extraTabletServerFlags, extraMasterServerFlags, locationInfo,
- kdcOptionsPb.build(), clusterRoot, hmsMode, principal,
enableClientJwt,
+ kdcOptionsPb.build(), clusterRoot, hmsMode, principal,
oidcOptionsPb.build());
try {
cluster.start();
diff --git
a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
index c31efb399..db11df73a 100644
---
a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
+++
b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
@@ -17,7 +17,9 @@
package org.apache.kudu.test;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -25,25 +27,21 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.Socket;
+import java.util.Arrays;
import com.google.protobuf.ByteString;
import org.junit.Rule;
import org.junit.Test;
-import org.apache.kudu.client.AsyncKuduClient;
-import org.apache.kudu.client.Client.AuthenticationCredentialsPB;
import org.apache.kudu.client.HostAndPort;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduClient.KuduClientBuilder;
import org.apache.kudu.client.ListTablesResponse;
import org.apache.kudu.client.TimeoutTracker;
-import org.apache.kudu.security.Token.JwtRawPB;
import org.apache.kudu.test.cluster.FakeDNS;
import org.apache.kudu.test.cluster.MiniKuduCluster;
-import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
+import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression;
import org.apache.kudu.test.junit.RetryRule;
-import org.apache.kudu.tools.Tool.CreateClusterRequestPB.JwksOptionsPB;
-import org.apache.kudu.tools.Tool.CreateClusterRequestPB.MiniOidcOptionsPB;
public class TestMiniKuduCluster {
@@ -118,19 +116,32 @@ public class TestMiniKuduCluster {
}
@Test(timeout = 50000)
- public void testJwt() throws Exception {
- try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
-
.numMasterServers(NUM_MASTERS)
- .numTabletServers(0)
- .enableClientJwt()
- .addJwks("account-id",
true)
- .build();
- KuduClient client = new
KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) {
- String jwt = cluster.createJwtFor("account-id", "subject", true);
+ public void testJwtAuthn() throws Exception {
+ try (MiniKuduCluster cluster = createClusterForJwtTest();
+ KuduClient client = new
KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) {
+ // It may take some time for the catalog manager to initialize
+ // and have IPKI CA certificate ready.
+ assertEventuallyTrue(
+ "valid cluster IPKI CA certificate captured",
+ new BooleanExpression() {
+ @Override
+ public boolean get() throws Exception {
+ return cluster.getCACertDer().length != 0;
+ }
+ },
+ 10000/*timeoutMillis*/);
+ byte[] caCert = cluster.getCACertDer();
+ assertNotEquals(0, caCert.length);
+
+ String jwt = cluster.createJwtFor("account-id", "kudu", true);
assertNotNull(jwt);
-
client.jwt(jwt);
- client.getTablesList();
+ client.trustedCertificates(Arrays.asList(ByteString.copyFrom(caCert)));
+
+ // A simple call to make sure the client can connect to the cluster.
+ // That assumes an RPC connection to the master has been successfully
+ // negotiated.
+ assertTrue(client.getTablesList().getTablesList().isEmpty());
}
}
@@ -167,7 +178,7 @@ public class TestMiniKuduCluster {
* @param testIsOpen true if we should want it to be open, false if we want
it closed
*/
private static void testHostPort(HostAndPort hp,
- boolean testIsOpen) throws
InterruptedException {
+ boolean testIsOpen) throws InterruptedException {
TimeoutTracker tracker = new TimeoutTracker();
while (tracker.getElapsedMillis() < SLEEP_TIME_MS) {
try {
@@ -185,4 +196,17 @@ public class TestMiniKuduCluster {
}
fail("HostAndPort " + hp + " is still " + (testIsOpen ? "closed " :
"open"));
}
+
+ private MiniKuduCluster createClusterForJwtTest() throws IOException {
+ return new MiniKuduCluster.MiniKuduClusterBuilder()
+ .numMasterServers(NUM_MASTERS)
+ .numTabletServers(0)
+ .addMasterServerFlag("--enable-jwt-token-auth")
+ .addMasterServerFlag("--rpc-trace-negotiation")
+ .addMasterServerFlag("--rpc-authentication=required")
+ .addMasterServerFlag("--rpc-encryption=required")
+ .enableKerberos()
+ .addJwks("account-id", true)
+ .build();
+ }
}