This is an automated email from the ASF dual-hosted git repository. abukor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit d65d75fb6473e1ce1db108758289456a71101690 Author: Attila Bukor <[email protected]> AuthorDate: Tue Jul 27 15:01:31 2021 +0200 [java] KUDU-1921 Add ability to require authn/encryption to Java client Change-Id: Ic951b2090a4933eca70dc53b6f93cdcff5a74929 Reviewed-on: http://gerrit.cloudera.org:8080/17732 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- .../org/apache/kudu/client/AsyncKuduClient.java | 50 +++++++++++++++++- .../java/org/apache/kudu/client/Connection.java | 17 +++++- .../org/apache/kudu/client/ConnectionCache.java | 19 ++++++- .../java/org/apache/kudu/client/KuduClient.java | 32 ++++++++++++ .../java/org/apache/kudu/client/Negotiator.java | 29 ++++++++-- .../org/apache/kudu/client/TestNegotiator.java | 3 +- .../java/org/apache/kudu/client/TestSecurity.java | 61 ++++++++++++++++++++++ 7 files changed, 201 insertions(+), 10 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index cfa03cd..390bdd3 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -425,7 +425,9 @@ public class AsyncKuduClient implements AutoCloseable { this.requestTracker = new RequestTracker(clientId); this.securityContext = new SecurityContext(); - this.connectionCache = new ConnectionCache(securityContext, bootstrap, b.saslProtocolName); + this.connectionCache = new ConnectionCache(securityContext, bootstrap, b.saslProtocolName, + b.requireAuthentication, !b.encryptionPolicy.equals(EncryptionPolicy.OPTIONAL), + b.encryptionPolicy.equals(EncryptionPolicy.REQUIRED)); this.tokenReacquirer = new AuthnTokenReacquirer(this); this.authzTokenCache = new AuthzTokenCache(this); } @@ -2726,6 +2728,18 @@ public class AsyncKuduClient implements AutoCloseable { } } + enum EncryptionPolicy { + // Optional, it uses encrypted connection if the server supports it, + // but it can connect to insecure servers too. + OPTIONAL, + // Only connects to remote servers that support encryption, fails + // otherwise. It can connect to insecure servers only locally. + REQUIRED_REMOTE, + // Only connects to any server, including on the loopback interface, + // that support encryption, fails otherwise. + REQUIRED, + } + /** * Builder class to use in order to connect to Kudu. * All the parameters beyond those in the constructors are optional. @@ -2746,6 +2760,8 @@ public class AsyncKuduClient implements AutoCloseable { private int workerCount = DEFAULT_WORKER_COUNT; private boolean statisticsDisabled = false; private String saslProtocolName = "kudu"; + private boolean requireAuthentication = false; + private EncryptionPolicy encryptionPolicy = EncryptionPolicy.OPTIONAL; /** * Creates a new builder for a client that will connect to the specified masters. @@ -2870,6 +2886,38 @@ public class AsyncKuduClient implements AutoCloseable { } /** + * Require authentication for the connection to a remote server. + * + * If it's set to true, the client will require mutual authentication between + * the server and the client. If the server doesn't support authentication, + * or it's disabled, the client will fail to connect. + */ + public AsyncKuduClientBuilder requireAuthentication(boolean requireAuthentication) { + this.requireAuthentication = requireAuthentication; + return this; + } + + /** + * Require encryption for the connection to a remote server. + * + * If it's set to REQUIRED_REMOTE or REQUIRED, the client will + * require encrypting the traffic between the server and the client. + * If the server doesn't support encryption, or if it's disabled, the + * client will fail to connect. + * + * Loopback connections are encrypted only if 'encryption_policy' is + * set to REQUIRED, or if it's required by the server. + * + * The default value is OPTIONAL, which allows connecting to servers without + * encryption as well, but it will still attempt to use it if the server + * supports it. + */ + public AsyncKuduClientBuilder encryptionPolicy(EncryptionPolicy encryptionPolicy) { + this.encryptionPolicy = encryptionPolicy; + return this; + } + + /** * Creates the client bootstrap for Netty. The user can specify the executor, but * if they don't, we'll use a simple thread pool. */ diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java index 74445ef..0cc7900 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java @@ -110,6 +110,12 @@ class Connection extends SimpleChannelInboundHandler<Object> { private final String saslProtocolName; + private final boolean requireAuthentication; + + private final boolean requireEncryption; + + private final boolean encryptLoopback; + /** The underlying Netty's socket channel. */ private SocketChannel channel; @@ -187,7 +193,10 @@ class Connection extends SimpleChannelInboundHandler<Object> { SecurityContext securityContext, Bootstrap bootstrap, CredentialsPolicy credentialsPolicy, - String saslProtocolName) { + String saslProtocolName, + boolean requireAuthentication, + boolean requireEncryption, + boolean encryptLoopback) { this.serverInfo = serverInfo; this.securityContext = securityContext; this.saslProtocolName = saslProtocolName; @@ -195,6 +204,9 @@ class Connection extends SimpleChannelInboundHandler<Object> { this.credentialsPolicy = credentialsPolicy; this.bootstrap = bootstrap.clone(); this.bootstrap.handler(new ConnectionChannelInitializer()); + this.requireAuthentication = requireAuthentication; + this.requireEncryption = requireEncryption; + this.encryptLoopback = encryptLoopback; } /** {@inheritDoc} */ @@ -213,7 +225,8 @@ class Connection extends SimpleChannelInboundHandler<Object> { } ctx.writeAndFlush(Unpooled.wrappedBuffer(CONNECTION_HEADER), ctx.voidPromise()); Negotiator negotiator = new Negotiator(serverInfo.getAndCanonicalizeHostname(), securityContext, - (credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS), saslProtocolName); + (credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS), saslProtocolName, + requireAuthentication, requireEncryption, encryptLoopback); ctx.pipeline().addBefore(ctx.name(), "negotiation", negotiator); negotiator.sendHello(ctx); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java index 705f962..706bc98 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java @@ -58,6 +58,12 @@ class ConnectionCache { private final String saslProtocolName; + private boolean requireAuthentication; + + private boolean requireEncryption; + + private boolean encryptLoopback; + /** * Container mapping server IP/port into the established connection from the client to the * server. It may be up to two connections per server: one established with secondary @@ -70,10 +76,16 @@ class ConnectionCache { /** Create a new empty ConnectionCache given the specified parameters. */ ConnectionCache(SecurityContext securityContext, Bootstrap bootstrap, - String saslProtocolName) { + String saslProtocolName, + boolean requireAuthentication, + boolean requireEncryption, + boolean encryptLoopback) { this.securityContext = securityContext; this.bootstrap = bootstrap; this.saslProtocolName = saslProtocolName; + this.requireAuthentication = requireAuthentication; + this.requireEncryption = requireEncryption; + this.encryptLoopback = encryptLoopback; } /** @@ -127,7 +139,10 @@ class ConnectionCache { securityContext, bootstrap, credentialsPolicy, - saslProtocolName); + saslProtocolName, + requireAuthentication, + requireEncryption, + encryptLoopback); connections.add(result); // There can be at most 2 connections to the same destination: one with primary and another // with secondary credentials. diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java index ad518a6..3cc1d31 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java @@ -623,6 +623,38 @@ public class KuduClient implements AutoCloseable { } /** + * Require authentication for the connection to a remote server. + * + * If it's set to true, the client will require mutual authentication between + * the server and the client. If the server doesn't support authentication, + * or it's disabled, the client will fail to connect. + */ + public KuduClientBuilder requireAuthentication(boolean requireAuthentication) { + clientBuilder.requireAuthentication(requireAuthentication); + return this; + } + + /** + * Require encryption for the connection to a remote server. + * + * If it's set to REQUIRED or REQUIRED_LOOPBACK, the client will + * require encrypting the traffic between the server and the client. + * If the server doesn't support encryption, or if it's disabled, the + * client will fail to connect. + * + * Loopback connections are encrypted only if 'encryption_policy' is + * set to REQUIRE_LOOPBACK, or if it's required by the server. + * + * The default value is OPTIONAL, which allows connecting to servers without + * encryption as well, but it will still attempt to use it if the server + * supports it. + */ + public KuduClientBuilder encryptionPolicy(AsyncKuduClient.EncryptionPolicy encryptionPolicy) { + clientBuilder.encryptionPolicy(encryptionPolicy); + return this; + } + + /** * Creates a new client that connects to the masters. * Doesn't block and won't throw an exception if the masters don't exist. * @return a new asynchronous Kudu client diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java index 234984b..a9c511d 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java @@ -222,7 +222,13 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { private Certificate peerCert; - private String saslProtocolName; + private final String saslProtocolName; + + private final boolean requireAuthentication; + + private final boolean requireEncryption; + + private final boolean encryptLoopback; @InterfaceAudience.LimitedPrivate("Test") boolean overrideLoopbackForTests; @@ -230,10 +236,16 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { public Negotiator(String remoteHostname, SecurityContext securityContext, boolean ignoreAuthnToken, - String saslProtocolName) { + String saslProtocolName, + boolean requireAuthentication, + boolean requireEncryption, + boolean encryptLoopback) { this.remoteHostname = remoteHostname; this.securityContext = securityContext; this.saslProtocolName = saslProtocolName; + this.requireAuthentication = requireAuthentication; + this.requireEncryption = requireEncryption; + this.encryptLoopback = encryptLoopback; SignedTokenPB token = securityContext.getAuthenticationToken(); if (token != null) { if (ignoreAuthnToken) { @@ -264,7 +276,7 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) { builder.addSupportedFeatures(flag); } - if (isLoopbackConnection(ctx.channel())) { + if (isLoopbackConnection(ctx.channel()) && !encryptLoopback) { builder.addSupportedFeatures(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY); } @@ -371,6 +383,10 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { serverFeatures = getFeatureFlags(response); // If the server supports TLS, we will always speak TLS to it. final boolean negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS); + if (!negotiatedTls && requireEncryption) { + throw new NonRecoverableException(Status.NotAuthorized( + "server does not support required TLS encryption")); + } // Check the negotiated authentication type sent by the server. chosenAuthnType = chooseAuthenticationType(response); @@ -473,6 +489,11 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { if (chosenMech != null) { LOG.debug("SASL mechanism {} chosen for peer {}", chosenMech.name(), remoteHostname); + if (chosenMech.equals(SaslMechanism.PLAIN) && requireAuthentication) { + String message = "client requires authentication, " + + "but server does not have Kerberos enabled"; + throw new NonRecoverableException(Status.NotAuthorized(message)); + } return; } @@ -658,7 +679,7 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> { // Don't wrap the TLS socket if we are using TLS for authentication only. boolean isAuthOnly = serverFeatures.contains(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY) && - isLoopbackConnection(ctx.channel()); + isLoopbackConnection(ctx.channel()) && !encryptLoopback; if (!isAuthOnly) { ctx.pipeline().addFirst("tls", handler); } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java index 0da8368..5734795 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java @@ -117,7 +117,8 @@ public class TestNegotiator { } private void startNegotiation(boolean fakeLoopback) { - Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false, "kudu"); + Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false, "kudu", + false, false, false); negotiator.overrideLoopbackForTests = fakeLoopback; embedder = new EmbeddedChannel(negotiator); negotiator.sendHello(embedder.pipeline().firstContext()); diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java index 7d88d34..cbe84d4 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java @@ -524,4 +524,65 @@ public class TestSecurity { getBasicCreateTableOptions())); } + @Test(timeout = 60000) + public void testKuduRequireAuthenticationInsecureCluster() throws Exception { + try { + KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString()) + .requireAuthentication(true) + .build(); + client.createTable("TestSecurity-authentication-required-1", + getBasicSchema(), getBasicCreateTableOptions()); + Assert.fail("client shouldn't be able to connect to the cluster."); + } catch (NonRecoverableException e) { + Assert.assertThat(e.getMessage(), CoreMatchers.containsString( + "client requires authentication, but server does not have Kerberos enabled" + )); + } + } + + @Test(timeout = 60000) + @KuduTestHarness.MasterServerConfig(flags = {"--rpc_encryption=disabled", + "--rpc_authentication=disabled"}) + @KuduTestHarness.TabletServerConfig(flags = {"--rpc_encryption=disabled", + "--rpc_authentication=disabled"}) + public void testKuduRequireEncryptionInsecureCluster() throws Exception { + try { + KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString()) + .encryptionPolicy(AsyncKuduClient.EncryptionPolicy.REQUIRED_REMOTE) + .build(); + client.createTable("TestSecurity-encryption-required-1", + getBasicSchema(), getBasicCreateTableOptions()); + Assert.fail("client shouldn't be able to connect to the cluster."); + } catch (NonRecoverableException e) { + Assert.assertThat(e.getMessage(), CoreMatchers.containsString( + "server does not support required TLS encryption" + )); + } + } + + @Test + @KuduTestHarness.EnableKerberos + public void testKuduRequireAuthenticationAndEncryptionSecureCluster() throws KuduException { + KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString()) + .requireAuthentication(true) + .encryptionPolicy(AsyncKuduClient.EncryptionPolicy.REQUIRED) + .build(); + KuduTable table = client.createTable("TestSecurity-authentication-required-1", + getBasicSchema(), getBasicCreateTableOptions()); + Assert.assertNotNull(table); + } + + @Test + @KuduTestHarness.MasterServerConfig(flags = {"--rpc_encryption=disabled", + "--rpc_authentication=disabled"}) + @KuduTestHarness.TabletServerConfig(flags = {"--rpc_encryption=disabled", + "--rpc_authentication=disabled"}) + public void testKuduOptionalEncryption() throws KuduException { + KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString()) + .encryptionPolicy(AsyncKuduClient.EncryptionPolicy.OPTIONAL) + .build(); + KuduTable table = client.createTable("testSecurity-encryption-optional-1", + getBasicSchema(), getBasicCreateTableOptions()); + Assert.assertNotNull(table); + } }
