[java-client]: support for Kerberized RPC
This commit adds initial support for connecting to a Kerberized cluster
with the Java client. The application is required to have an active
login context which contains a subject with Kerberos credentials when
creating the KuduClient to connect to a secured cluster. For example:
```java
// Create a login context ensuring that Kerberos credentials are set, and login.
LoginContext context = new LoginContext(...);
context.login();
// Create a new Kudu client in the privileged context.
KuduClient client = Subject.doAs(context.getSubject(), new
PrivilegedAction<KuduClient>() {
@Override
public KuduClient run() {
new KuduClient.KuduClientBuilder(...).build();
}
});
```
Once the KuduClient is created, the login context is no longer
necessary. This commit does not add a configuration option to ensure the
client rejects connecting to an insecure cluster, that may come in a
follow-up commit.
Testing:
One test is included in TestMiniKuduCluster which explicitly enables
Kerberos authentication and tests that the client can connect. I also
manually switched BaseKuduTest to use a Kerberized cluster, and verified
that tests in TestKuduClient and TestKuduTable which didn't create their
own (uncredentialed) clients passed. I'll leave it to a follow up commit
to figure out how to automate running the full test suite against a
secure cluster.
Change-Id: I5131edb1f2bda443f7980a4aad86362666b3b6f5
Reviewed-on: http://gerrit.cloudera.org:8080/5150
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
Reviewed-by: Todd Lipcon <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/dad80bd5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/dad80bd5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/dad80bd5
Branch: refs/heads/master
Commit: dad80bd504bd7065c93721846df70086345ab68a
Parents: 8d29d10
Author: Dan Burkert <[email protected]>
Authored: Fri Nov 4 11:56:06 2016 -0700
Committer: Dan Burkert <[email protected]>
Committed: Mon Nov 28 19:57:18 2016 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 21 +++
.../java/org/apache/kudu/client/KuduClient.java | 1 -
.../org/apache/kudu/client/SecureRpcHelper.java | 84 ++++-----
.../org/apache/kudu/client/TabletClient.java | 12 +-
.../java/org/apache/kudu/client/MiniKdc.java | 52 +++---
.../org/apache/kudu/client/MiniKuduCluster.java | 180 +++++++++++++++----
.../org/apache/kudu/client/TestMiniKdc.java | 4 +-
.../apache/kudu/client/TestMiniKuduCluster.java | 104 ++++++-----
.../java/org/apache/kudu/client/TestUtils.java | 26 +++
java/kudu-client/src/test/resources/flags | 2 +-
.../integration-tests/external_mini_cluster.h | 2 +-
src/kudu/rpc/sasl_server.cc | 8 +-
12 files changed, 314 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 9c28458..0de1fb8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -27,10 +27,13 @@
package org.apache.kudu.client;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -44,6 +47,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.annotation.concurrent.GuardedBy;
+import javax.security.auth.Subject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -180,6 +184,8 @@ public class AsyncKuduClient implements AutoCloseable {
private final RequestTracker requestTracker;
+ private final Subject subject;
+
private volatile boolean closed;
private AsyncKuduClient(AsyncKuduClientBuilder b) {
@@ -195,6 +201,7 @@ public class AsyncKuduClient implements AutoCloseable {
this.timer = b.timer;
String clientId = UUID.randomUUID().toString().replace("-", "");
this.requestTracker = new RequestTracker(clientId);
+ this.subject = b.subject;
this.connectionCache = new ConnectionCache(this);
}
@@ -938,6 +945,17 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
+ * Gets the subject who created the Kudu client.
+ *
+ * The subject contains credentials necessary to authenticate to Kerberized
Kudu clusters.
+ *
+ * @return the subject who created the Kudu client, or null if no login
context was active.
+ */
+ Subject getSubject() {
+ return subject;
+ }
+
+ /**
* Clears {@link #tableLocations} of the table's entries.
*
* This method makes the maps momentarily inconsistent, and should only be
@@ -1623,6 +1641,7 @@ public class AsyncKuduClient implements AutoCloseable {
private int bossCount = DEFAULT_BOSS_COUNT;
private int workerCount = DEFAULT_WORKER_COUNT;
private boolean statisticsDisabled = false;
+ private Subject subject;
/**
* Creates a new builder for a client that will connect to the specified
masters.
@@ -1779,6 +1798,8 @@ public class AsyncKuduClient implements AutoCloseable {
* @return a new asynchronous Kudu client
*/
public AsyncKuduClient build() {
+ AccessControlContext context = AccessController.getContext();
+ subject = Subject.getSubject(context);
return new AsyncKuduClient(this);
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index 067fcc4..141acad 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -410,6 +410,5 @@ public class KuduClient implements AutoCloseable {
AsyncKuduClient client = clientBuilder.build();
return new KuduClient(client);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
index 269b256..34ef397 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
@@ -26,23 +26,23 @@
package org.apache.kudu.client;
+import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
+import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
+import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.protobuf.ByteString;
import com.google.protobuf.ZeroCopyLiteralByteString;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -57,12 +57,17 @@ import org.apache.kudu.rpc.RpcHeader;
@InterfaceAudience.Private
public class SecureRpcHelper {
- public static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(TabletClient.class);
+
+ private static final Map<String, String> SASL_PROPS = ImmutableMap.of();
+ private static final SaslClientCallbackHandler SASL_CALLBACK = new
SaslClientCallbackHandler();
+ private static final String[] MECHS = new String[] { "GSSAPI", "PLAIN" };
+ private static final String[] INSECURE_MECHS = new String[] { "PLAIN" };
+
+ static final String USER_AND_PASSWORD = "java_client";
private final TabletClient client;
private SaslClient saslClient;
- public static final String SASL_DEFAULT_REALM = "default";
- public static final Map<String, String> SASL_PROPS = new TreeMap<>();
private static final int SASL_CALL_ID = -33;
private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
@@ -70,19 +75,22 @@ public class SecureRpcHelper {
private boolean useWrap = false; // no QOP at the moment
private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
- public static final String USER_AND_PASSWORD = "java_client";
-
- public SecureRpcHelper(TabletClient client) {
+ public SecureRpcHelper(final TabletClient client) {
this.client = client;
+
+ Subject subject = client.getSubject();
+ boolean tryKerberos = subject != null &&
+
!subject.getPrincipals(KerberosPrincipal.class).isEmpty();
+ String[] mechanisms = tryKerberos ? MECHS : INSECURE_MECHS;
+
try {
- saslClient = Sasl.createSaslClient(new String[]{"PLAIN"},
- null,
+ saslClient = Sasl.createSaslClient(mechanisms,
null,
- SASL_DEFAULT_REALM,
+ "kudu",
+ client.getServerInfo().getHostname(),
SASL_PROPS,
- new
SaslClientCallbackHandler(USER_AND_PASSWORD,
-
USER_AND_PASSWORD));
- } catch (SaslException e) {
+ SASL_CALLBACK);
+ } catch (Exception e) {
throw new RuntimeException("Could not create the SASL client", e);
}
}
@@ -129,10 +137,10 @@ public class SecureRpcHelper {
handleChallengeResponse(chan, response);
break;
case SUCCESS:
- handleSuccessResponse(chan, response);
+ handleSuccessResponse(chan);
break;
default:
- System.out.println("Wrong sasl state");
+ LOG.error(String.format("Wrong SASL state: %s",
response.getState()));
}
return null;
}
@@ -229,8 +237,7 @@ public class SecureRpcHelper {
private void handleChallengeResponse(Channel chan, RpcHeader.SaslMessagePB
response) throws
SaslException {
- ByteString bs = response.getToken();
- byte[] saslToken = saslClient.evaluateChallenge(bs.toByteArray());
+ byte[] saslToken =
saslClient.evaluateChallenge(response.getToken().toByteArray());
if (saslToken == null) {
throw new IllegalStateException("Not expecting an empty token");
}
@@ -240,49 +247,24 @@ public class SecureRpcHelper {
sendSaslMessage(chan, builder.build());
}
- private void handleSuccessResponse(Channel chan, RpcHeader.SaslMessagePB
response) {
+ private void handleSuccessResponse(Channel chan) {
LOG.debug("nego finished");
negoUnderway = false;
client.sendContext(chan);
}
private static class SaslClientCallbackHandler implements CallbackHandler {
- private final String userName;
- private final char[] userPassword;
-
- public SaslClientCallbackHandler(String user, String password) {
- this.userName = user;
- this.userPassword = password.toCharArray();
- }
-
- public void handle(Callback[] callbacks)
- throws UnsupportedCallbackException {
- NameCallback nc = null;
- PasswordCallback pc = null;
- RealmCallback rc = null;
+ public void handle(Callback[] callbacks) throws
UnsupportedCallbackException {
for (Callback callback : callbacks) {
- if (callback instanceof RealmChoiceCallback) {
- continue;
- } else if (callback instanceof NameCallback) {
- nc = (NameCallback) callback;
+ if (callback instanceof NameCallback) {
+ ((NameCallback) callback).setName(USER_AND_PASSWORD);
} else if (callback instanceof PasswordCallback) {
- pc = (PasswordCallback) callback;
- } else if (callback instanceof RealmCallback) {
- rc = (RealmCallback) callback;
+ ((PasswordCallback)
callback).setPassword(USER_AND_PASSWORD.toCharArray());
} else {
throw new UnsupportedCallbackException(callback,
- "Unrecognized SASL client callback");
+ "Unrecognized SASL client
callback");
}
}
- if (nc != null) {
- nc.setName(userName);
- }
- if (pc != null) {
- pc.setPassword(userPassword);
- }
- if (rc != null) {
- rc.setText(rc.getDefaultText());
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 8f9e75b..c710b4c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import com.google.common.annotations.VisibleForTesting;
@@ -844,8 +845,8 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
userBuilder.setRealUser(SecureRpcHelper.USER_AND_PASSWORD);
builder.setDEPRECATEDUserInfo(userBuilder.build());
RpcHeader.ConnectionContextPB pb = builder.build();
- RpcHeader.RequestHeader header = RpcHeader.RequestHeader.newBuilder()
- .setCallId(CONNECTION_CTX_CALL_ID).build();
+ RpcHeader.RequestHeader header =
+
RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID).build();
return KuduRpc.toChannelBuffer(header, pb);
}
@@ -857,6 +858,13 @@ public class TabletClient extends
ReplayingDecoder<VoidEnum> {
return serverInfo;
}
+ /**
+ * @return the subject containing security credentials, or null if no
subject is available.
+ */
+ Subject getSubject() {
+ return kuduClient.getSubject();
+ }
+
public String toString() {
final StringBuilder buf = new StringBuilder(13 + 10 + 6 + 64 + 7 + 32 + 16
+ 1 + 17 + 2 + 1);
buf.append("TabletClient@") // =13
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
index 6239b25..1e3538c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
@@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
package org.apache.kudu.client;
import java.io.BufferedReader;
@@ -24,8 +25,6 @@ import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
@@ -50,6 +49,9 @@ import org.apache.kudu.annotations.InterfaceAudience;
* to the KDC.
*
* The KDC is managed as an external process, using the krb5 binaries
installed on the system.
+ *
+ * For debugging Kerberos client issues, it can be helpful to add
+ * {@code -Dsun.security.krb5.debug=true} to the JVM properties.
*/
@InterfaceAudience.Private
@NotThreadSafe
@@ -118,7 +120,7 @@ public class MiniKdc implements Closeable {
return new MiniKdc(
new Options("KRBTEST.COM",
Paths.get(TestUtils.getBaseDir(), "krb5kdc-" +
System.currentTimeMillis()),
- TestUtils.findFreePort(PORT_START)));
+ TestUtils.findFreeUdpPort(PORT_START)));
}
/**
@@ -135,12 +137,6 @@ public class MiniKdc implements Closeable {
dataRootDir));
}
- File credentialCacheDir = options.dataRoot.resolve("krb5cc").toFile();
- if (!credentialCacheDir.mkdir()) {
- throw new RuntimeException(String.format("unable to create credential
cache directory: %s",
- credentialCacheDir));
- }
-
createKdcConf();
createKrb5Conf();
@@ -235,7 +231,6 @@ public class MiniKdc implements Closeable {
" kdc = STDERR",
"[libdefaults]",
- " default_ccache_name = " + "DIR:" +
options.dataRoot.resolve("krb5cc"),
" default_realm = " + options.realm,
" dns_lookup_kdc = false",
" dns_lookup_realm = false",
@@ -243,8 +238,16 @@ public class MiniKdc implements Closeable {
" renew_lifetime = 7d",
" ticket_lifetime = 24h",
- // The KDC is configured to only use TCP, so the client should not
prefer UDP.
- " udp_preference_limit = 0",
+ // Disable aes256, since Java does not support it without JCE, see
+ //
https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/jgss-features.html
+ " default_tkt_enctypes = aes128-cts des3-cbc-sha1",
+ " default_tgs_enctypes = aes128-cts des3-cbc-sha1",
+ " permitted_enctypes = aes128-cts des3-cbc-sha1",
+
+ // In miniclusters, we start daemons on local loopback IPs that
+ // have no reverse DNS entries. So, disable reverse DNS.
+ " rdns = false",
+ " ignore_acceptor_hostname = true",
"[realms]",
options.realm + " = {",
@@ -257,8 +260,7 @@ public class MiniKdc implements Closeable {
private void createKdcConf() throws IOException {
List<String> contents = ImmutableList.of(
"[kdcdefaults]",
- " kdc_ports = \"\"",
- " kdc_tcp_ports = " + options.port,
+ " kdc_ports = " + options.port,
"[realms]",
options.realm + " = {",
@@ -312,28 +314,22 @@ public class MiniKdc implements Closeable {
"/usr/sbin" // Linux
);
- private Map<String, String> getEnvVars() {
+ public Map<String, String> getEnvVars() {
return ImmutableMap.of(
"KRB5_CONFIG", options.dataRoot.resolve("krb5.conf").toString(),
"KRB5_KDC_PROFILE", options.dataRoot.resolve("kdc.conf").toString(),
- "KRB5CCNAME", "DIR:" + options.dataRoot.resolve("krb5cc").toString()
+ "KRB5CCNAME", options.dataRoot.resolve("krb5cc").toString()
);
}
private Process startProcessWithKrbEnv(String... argv) throws IOException {
- List<String> args = new ArrayList<>();
- args.add("env");
- for (Map.Entry<String, String> entry : getEnvVars().entrySet()) {
- args.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
- }
- args.addAll(Arrays.asList(argv));
-
- LOG.debug("executing {}: {}", Paths.get(argv[0]).getFileName(),
Joiner.on(' ').join(args));
- return new
ProcessBuilder(args).redirectOutput(ProcessBuilder.Redirect.PIPE)
- .redirectErrorStream(true)
- .redirectInput(ProcessBuilder.Redirect.PIPE)
- .start();
+ ProcessBuilder procBuilder = new ProcessBuilder(argv);
+ procBuilder.environment().putAll(getEnvVars());
+ LOG.debug("executing '{}', env: '{}'",
+ Joiner.on(" ").join(procBuilder.command()),
+ Joiner.on(",
").withKeyValueSeparator("=").join(procBuilder.environment()));
+ return procBuilder.redirectErrorStream(true).start();
}
/**
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 28b1bf7..2ddc5e7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -11,19 +11,28 @@
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
+
package org.apache.kudu.client;
import java.io.BufferedReader;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.file.Path;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -61,7 +70,7 @@ public class MiniKuduCluster implements AutoCloseable {
private final Map<Integer, Process> tserverProcesses = new
ConcurrentHashMap<>();
// Map of ports to process command lines. Never removed from. Used to
restart processes.
- private final Map<Integer, String[]> commandLines = new
ConcurrentHashMap<>();
+ private final Map<Integer, List<String>> commandLines = new
ConcurrentHashMap<>();
private final List<String> pathsToDelete = new ArrayList<>();
private final List<HostAndPort> masterHostPorts = new ArrayList<>();
@@ -73,15 +82,76 @@ public class MiniKuduCluster implements AutoCloseable {
private String masterAddresses;
- private MiniKuduCluster(int numMasters, int numTservers, int
defaultTimeoutMs) throws Exception {
+ private final String bindHost = TestUtils.getUniqueLocalhost();
+ private final Path keytab;
+ private final MiniKdc miniKdc;
+ private final Subject subject;
+
+ private MiniKuduCluster(int numMasters,
+ int numTservers,
+ final int defaultTimeoutMs,
+ boolean enableKerberos) throws Exception {
this.defaultTimeoutMs = defaultTimeoutMs;
+ if (enableKerberos) {
+ miniKdc = MiniKdc.withDefaults();
+ miniKdc.start();
+
+ keytab = miniKdc.createServiceKeytab("kudu/" + bindHost);
+
+ miniKdc.createUserPrincipal("testuser");
+ miniKdc.kinit("testuser");
+ System.setProperty("java.security.krb5.conf",
miniKdc.getEnvVars().get("KRB5_CONFIG"));
+
+ Configuration conf = new Configuration() {
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+ Map<String, String> options = new HashMap<>();
+ options.put("useKeyTab", "true");
+ options.put("useTicketCache", "true");
+ options.put("ticketCache", miniKdc.getEnvVars().get("KRB5CCNAME"));
+ options.put("principal", "testuser");
+ options.put("doNotPrompt", "true");
+ options.put("renewTGT", "true");
+ options.put("debug", "true");
+
+ return new AppConfigurationEntry[] {
+ new
AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
+
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ options)
+ };
+ }
+ };
+
+ LoginContext context = new
LoginContext("com.sun.security.auth.module.Krb5LoginModule",
+ new Subject(), null, conf);
+ context.login();
+ context.getSubject();
+ subject = context.getSubject();
+ } else {
+ miniKdc = null;
+ keytab = null;
+ subject = null;
+ }
+
startCluster(numMasters, numTservers);
- syncClient = new KuduClient.KuduClientBuilder(getMasterAddresses())
- .defaultAdminOperationTimeoutMs(defaultTimeoutMs)
- .defaultOperationTimeoutMs(defaultTimeoutMs)
- .build();
+ PrivilegedAction<KuduClient> createClient = new
PrivilegedAction<KuduClient>() {
+ @Override
+ public KuduClient run() {
+ KuduClient.KuduClientBuilder kuduClientBuilder =
+ new KuduClient.KuduClientBuilder(getMasterAddresses());
+ kuduClientBuilder.defaultAdminOperationTimeoutMs(defaultTimeoutMs);
+ kuduClientBuilder.defaultOperationTimeoutMs(defaultTimeoutMs);
+ return kuduClientBuilder.build();
+ }
+ };
+
+ if (subject != null) {
+ syncClient = Subject.doAs(subject, createClient);
+ } else {
+ syncClient = createClient.run();
+ }
}
/**
@@ -104,18 +174,15 @@ public class MiniKuduCluster implements AutoCloseable {
* Starts a Kudu cluster composed of the provided masters and tablet servers.
* @param numMasters how many masters to start
* @param numTservers how many tablet servers to start
- * @throws Exception
*/
private void startCluster(int numMasters, int numTservers) throws Exception {
Preconditions.checkArgument(numMasters > 0, "Need at least one master");
- Preconditions.checkArgument(numTservers > 0, "Need at least one tablet
server");
// The following props are set via kudu-client's pom.
String baseDirPath = TestUtils.getBaseDir();
- String localhost = TestUtils.getUniqueLocalhost();
long now = System.currentTimeMillis();
LOG.info("Starting {} masters...", numMasters);
- int startPort = startMasters(PORT_START, numMasters, baseDirPath);
+ int startPort = startMasters(PORT_START, numMasters, baseDirPath,
bindHost);
LOG.info("Starting {} tablet servers...", numTservers);
List<Integer> ports = TestUtils.findFreePorts(startPort, numTservers * 2);
for (int i = 0; i < numTservers; i++) {
@@ -123,19 +190,27 @@ public class MiniKuduCluster implements AutoCloseable {
tserverPorts.add(rpcPort);
String dataDirPath = baseDirPath + "/ts-" + i + "-" + now;
String flagsPath = TestUtils.getFlagsPath();
- String[] tsCmdLine = {
+
+ List<String> commandLine = Lists.newArrayList(
TestUtils.findBinary("kudu-tserver"),
"--flagfile=" + flagsPath,
"--fs_wal_dir=" + dataDirPath,
"--fs_data_dirs=" + dataDirPath,
"--flush_threshold_mb=1",
"--tserver_master_addrs=" + masterAddresses,
- "--webserver_interface=" + localhost,
- "--local_ip_for_outbound_sockets=" + localhost,
+ "--webserver_interface=" + bindHost,
+ "--local_ip_for_outbound_sockets=" + bindHost,
"--webserver_port=" + (rpcPort + 1),
- "--rpc_bind_addresses=" + localhost + ":" + rpcPort};
- tserverProcesses.put(rpcPort, configureAndStartProcess(rpcPort,
tsCmdLine));
- commandLines.put(rpcPort, tsCmdLine);
+ "--rpc_bind_addresses=" + bindHost + ":" + rpcPort);
+
+ if (miniKdc != null) {
+ commandLine.add("--keytab=" + keytab);
+ commandLine.add("--kerberos_principal=kudu/" + bindHost);
+ commandLine.add("--server_require_kerberos");
+ }
+
+ tserverProcesses.put(rpcPort, configureAndStartProcess(rpcPort,
commandLine));
+ commandLines.put(rpcPort, commandLine);
if (flagsPath.startsWith(baseDirPath)) {
// We made a temporary copy of the flags; delete them later.
@@ -155,13 +230,14 @@ public class MiniKuduCluster implements AutoCloseable {
* @return the next free port
* @throws Exception if we are unable to start the masters
*/
- private int startMasters(int masterStartPort, int numMasters,
- String baseDirPath) throws Exception {
+ private int startMasters(int masterStartPort,
+ int numMasters,
+ String baseDirPath,
+ String bindHost) throws Exception {
LOG.info("Starting {} masters...", numMasters);
// Get the list of web and RPC ports to use for the master consensus
configuration:
// request NUM_MASTERS * 2 free ports as we want to also reserve the web
// ports for the consensus configuration.
- String localhost = TestUtils.getUniqueLocalhost();
List<Integer> ports = TestUtils.findFreePorts(masterStartPort, numMasters
* 2);
int lastFreePort = ports.get(ports.size() - 1);
List<Integer> masterRpcPorts = Lists.newArrayListWithCapacity(numMasters);
@@ -169,7 +245,7 @@ public class MiniKuduCluster implements AutoCloseable {
for (int i = 0; i < numMasters * 2; i++) {
if (i % 2 == 0) {
masterRpcPorts.add(ports.get(i));
- masterHostPorts.add(HostAndPort.fromParts(localhost, ports.get(i)));
+ masterHostPorts.add(HostAndPort.fromParts(bindHost, ports.get(i)));
} else {
masterWebPorts.add(ports.get(i));
}
@@ -187,20 +263,26 @@ public class MiniKuduCluster implements AutoCloseable {
// 3) master 1 happens to bind to port b for the web port, as master 2
hasn't been
// started yet and findFreePort(s) is "check-time-of-use" (it does not
reserve the
// ports, only checks that when it was last called, these ports could be
used).
- List<String> masterCmdLine = Lists.newArrayList(
+ List<String> commandLine = Lists.newArrayList(
TestUtils.findBinary("kudu-master"),
"--flagfile=" + flagsPath,
"--fs_wal_dir=" + dataDirPath,
"--fs_data_dirs=" + dataDirPath,
- "--webserver_interface=" + localhost,
- "--local_ip_for_outbound_sockets=" + localhost,
- "--rpc_bind_addresses=" + localhost + ":" + port,
+ "--webserver_interface=" + bindHost,
+ "--local_ip_for_outbound_sockets=" + bindHost,
+ "--rpc_bind_addresses=" + bindHost + ":" + port,
"--webserver_port=" + masterWebPorts.get(i),
"--raft_heartbeat_interval_ms=200"); // make leader elections faster
for faster tests
if (numMasters > 1) {
- masterCmdLine.add("--master_addresses=" + masterAddresses);
+ commandLine.add("--master_addresses=" + masterAddresses);
+ }
+
+ if (miniKdc != null) {
+ commandLine.add("--keytab=" + keytab);
+ commandLine.add("--kerberos_principal=kudu/" + bindHost);
+ commandLine.add("--server_require_kerberos");
}
- String[] commandLine = masterCmdLine.toArray(new
String[masterCmdLine.size()]);
+
masterProcesses.put(port, configureAndStartProcess(port, commandLine));
commandLines.put(port, commandLine);
@@ -224,24 +306,26 @@ public class MiniKuduCluster implements AutoCloseable {
* or if we were able to start the process but noticed that it was then
killed (in which case
* we'll log the exit value).
*/
- private Process configureAndStartProcess(int port, String[] command) throws
Exception {
- LOG.info("Starting process: {}", Joiner.on(" ").join(command));
+ private Process configureAndStartProcess(int port, List<String> command)
throws Exception {
ProcessBuilder processBuilder = new ProcessBuilder(command);
processBuilder.redirectErrorStream(true);
+ if (miniKdc != null) {
+ processBuilder.environment().putAll(miniKdc.getEnvVars());
+ }
Process proc = processBuilder.start();
ProcessInputStreamLogPrinterRunnable printer =
new ProcessInputStreamLogPrinterRunnable(proc.getInputStream());
Thread thread = new Thread(printer);
thread.setDaemon(true);
-
thread.setName(Iterables.getLast(Splitter.on(File.separatorChar).split(command[0]))
+ ":" + port);
+
thread.setName(Iterables.getLast(Splitter.on(File.separatorChar).split(command.get(0)))
+ ":" + port);
PROCESS_INPUT_PRINTERS.add(thread);
thread.start();
Thread.sleep(300);
try {
int ev = proc.exitValue();
- throw new Exception("We tried starting a process (" + command[0] + ")
but it exited with " +
- "value=" + ev);
+ throw new Exception(String.format(
+ "We tried starting a process (%s) but it exited with value=%s",
command.get(0), ev));
} catch (IllegalThreadStateException ex) {
// This means the process is still alive, it's like reverse psychology.
}
@@ -279,8 +363,7 @@ public class MiniKuduCluster implements AutoCloseable {
throw new RuntimeException(message);
}
- String[] commandLine = commandLines.get(port);
- map.put(port, configureAndStartProcess(port, commandLine));
+ map.put(port, configureAndStartProcess(port, commandLines.get(port)));
}
/**
@@ -385,7 +468,15 @@ public class MiniKuduCluster implements AutoCloseable {
f.delete();
}
} catch (Exception e) {
- LOG.warn("Could not delete path {}", path, e);
+ LOG.warn(String.format("Could not delete path %s", path), e);
+ }
+ }
+
+ if (miniKdc != null) {
+ try {
+ miniKdc.close();
+ } catch (IOException e) {
+ LOG.warn("Unable to close MiniKdc", e);
}
}
}
@@ -430,6 +521,14 @@ public class MiniKuduCluster implements AutoCloseable {
}
/**
+ * @return authenticated user credentials for this cluster,
+ * or {@code null} if it is not a secure cluster.
+ */
+ public Subject getLoggedInSubject() {
+ return subject;
+ }
+
+ /**
* Helper runnable that receives stdout and logs it along with the process'
identifier.
*/
public static class ProcessInputStreamLogPrinterRunnable implements Runnable
{
@@ -463,6 +562,7 @@ public class MiniKuduCluster implements AutoCloseable {
private int numMasters = 1;
private int numTservers = 3;
private int defaultTimeoutMs = 50000;
+ private boolean enableKerberos = false;
public MiniKuduClusterBuilder numMasters(int numMasters) {
this.numMasters = numMasters;
@@ -485,9 +585,17 @@ public class MiniKuduCluster implements AutoCloseable {
return this;
}
+ /**
+ * Enables Kerberos on the mini cluster and acquire client credentials for
this process.
+ * @return this instance
+ */
+ public MiniKuduClusterBuilder enableKerberos() {
+ enableKerberos = true;
+ return this;
+ }
+
public MiniKuduCluster build() throws Exception {
- return new MiniKuduCluster(numMasters, numTservers, defaultTimeoutMs);
+ return new MiniKuduCluster(numMasters, numTservers, defaultTimeoutMs,
enableKerberos);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
index 79ce7d9..9feb56f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
@@ -16,7 +16,7 @@
// under the License.
package org.apache.kudu.client;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import org.junit.Test;
@@ -40,7 +40,7 @@ public class TestMiniKdc {
String klist = kdc.klist();
- assertTrue(klist.contains("[email protected]"));
+ assertFalse(klist.contains("[email protected]"));
assertTrue(klist.contains("[email protected]"));
assertTrue(klist.contains("krbtgt/[email protected]"));
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
index baa5658..fbbe245 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
@@ -13,15 +13,11 @@
*/
package org.apache.kudu.client;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.IOException;
import java.net.Socket;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
public class TestMiniKuduCluster {
@@ -29,62 +25,63 @@ public class TestMiniKuduCluster {
private static final int NUM_TABLET_SERVERS = 3;
private static final int DEFAULT_NUM_MASTERS = 1;
- private MiniKuduCluster cluster;
-
- @Before
- public void before() throws Exception {
- cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
- .numMasters(DEFAULT_NUM_MASTERS)
- .numTservers(NUM_TABLET_SERVERS)
- .build();
- assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
- }
-
- @After
- public void after() {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
@Test(timeout = 50000)
public void test() throws Exception {
+ try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+
.numMasters(DEFAULT_NUM_MASTERS)
+
.numTservers(NUM_TABLET_SERVERS)
+ .build()) {
+ assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
+ assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
+ assertEquals(NUM_TABLET_SERVERS,
cluster.getTabletServerProcesses().size());
+
+ {
+ // Kill the master.
+ int masterPort =
cluster.getMasterProcesses().keySet().iterator().next();
+ testPort(masterPort, true, 1000);
+ cluster.killMasterOnPort(masterPort);
+
+ testPort(masterPort, false, 2000);
+
+ // Restart the master.
+ cluster.restartDeadMasterOnPort(masterPort);
+
+ // Test we can reach it.
+ testPort(masterPort, true, 3000);
+ }
- assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
- assertEquals(NUM_TABLET_SERVERS,
cluster.getTabletServerProcesses().size());
+ {
+ // Kill the first TS.
+ int tsPort =
cluster.getTabletServerProcesses().keySet().iterator().next();
+ testPort(tsPort, true, 1000);
+ cluster.killTabletServerOnPort(tsPort);
- {
- // Kill the master.
- int masterPort = cluster.getMasterProcesses().keySet().iterator().next();
- testPort(masterPort, true, 1000);
- cluster.killMasterOnPort(masterPort);
+ testPort(tsPort, false, 2000);
- testPort(masterPort, false, 2000);
+ // Restart it.
+ cluster.restartDeadTabletServerOnPort(tsPort);
- // Restart the master.
- cluster.restartDeadMasterOnPort(masterPort);
+ testPort(tsPort, true, 3000);
+ }
- // Test we can reach it.
- testPort(masterPort, true, 3000);
+ assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
+ assertEquals(NUM_TABLET_SERVERS,
cluster.getTabletServerProcesses().size());
}
+ }
-
- {
- // Kill the first TS.
- int tsPort =
cluster.getTabletServerProcesses().keySet().iterator().next();
- testPort(tsPort, true, 1000);
- cluster.killTabletServerOnPort(tsPort);
-
- testPort(tsPort, false, 2000);
-
- // Restart it.
- cluster.restartDeadTabletServerOnPort(tsPort);
-
- testPort(tsPort, true, 3000);
+ @Test(timeout = 50000)
+ public void testKerberos() throws Exception {
+ try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+
.numMasters(DEFAULT_NUM_MASTERS)
+
.numTservers(NUM_TABLET_SERVERS)
+ .enableKerberos()
+ .build()) {
+ try {
+ assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
+ } catch (RuntimeException e) {
+ assertTrue(e.getMessage().contains("incompatible RPC?"));
+ }
}
-
- assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
- assertEquals(NUM_TABLET_SERVERS,
cluster.getTabletServerProcesses().size());
}
/**
@@ -93,9 +90,10 @@ public class TestMiniKuduCluster {
* @param port the port to test
* @param testIsOpen true if we should want it to be open, false if we want
it closed
* @param timeout how long we're willing to wait before it happens
- * @throws InterruptedException
*/
- private void testPort(int port, boolean testIsOpen, long timeout) throws
InterruptedException {
+ private static void testPort(int port,
+ boolean testIsOpen,
+ long timeout) throws InterruptedException {
DeadlineTracker tracker = new DeadlineTracker();
while (tracker.getElapsedMillis() < timeout) {
try {
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
----------------------------------------------------------------------
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
index 16a5497..ff9801f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
@@ -24,9 +24,11 @@ import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
+import java.net.SocketException;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.file.Files;
@@ -175,6 +177,30 @@ public class TestUtils {
}
/**
+ * Finds the next free UDP port, starting with the one passed. Keep in mind
the
+ * time-of-check-time-of-use nature of this method, the returned port might
become occupied
+ * after it was checked for availability.
+ * @param startPort first port to be probed
+ * @return a currently usable port
+ * @throws IOException IOE is thrown if we can't close a socket we tried to
open or if we run
+ * out of ports to try
+ */
+ public static int findFreeUdpPort(int startPort) throws IOException {
+ DatagramSocket ds;
+ for (int i = startPort; i < 65536; i++) {
+ try {
+ SocketAddress address = new InetSocketAddress(getUniqueLocalhost(), i);
+ ds = new DatagramSocket(address);
+ } catch (SocketException e) {
+ continue;
+ }
+ ds.close();
+ return i;
+ }
+ throw new IOException("Ran out of ports");
+ }
+
+ /**
* Finds a specified number of parts, starting with one passed. Keep in mind
the
* time-of-check-time-of-use nature of this method.
* @see {@link #findFreePort(int)}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/java/kudu-client/src/test/resources/flags
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/resources/flags
b/java/kudu-client/src/test/resources/flags
index 687676f..fb4c1aa 100644
--- a/java/kudu-client/src/test/resources/flags
+++ b/java/kudu-client/src/test/resources/flags
@@ -1,4 +1,4 @@
--logtostderr
--never_fsync
--unlock_experimental_flags
---unlock_unsafe_flags
\ No newline at end of file
+--unlock_unsafe_flags
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/src/kudu/integration-tests/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.h
b/src/kudu/integration-tests/external_mini_cluster.h
index 48f6325..f1dad8c 100644
--- a/src/kudu/integration-tests/external_mini_cluster.h
+++ b/src/kudu/integration-tests/external_mini_cluster.h
@@ -403,7 +403,7 @@ class ExternalDaemon : public
RefCountedThreadSafe<ExternalDaemon> {
DISALLOW_COPY_AND_ASSIGN(ExternalDaemon);
};
-// Resumes a daemon that was stopped with ExteranlDaemon::Pause() upon
+// Resumes a daemon that was stopped with ExternalDaemon::Pause() upon
// exiting a scope.
class ScopedResumeExternalDaemon {
public:
http://git-wip-us.apache.org/repos/asf/kudu/blob/dad80bd5/src/kudu/rpc/sasl_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_server.cc b/src/kudu/rpc/sasl_server.cc
index 7170e35..6fa38c8 100644
--- a/src/kudu/rpc/sasl_server.cc
+++ b/src/kudu/rpc/sasl_server.cc
@@ -383,19 +383,13 @@ Status SaslServer::HandleInitiateRequest(const
SaslMessagePB& request) {
if (s.ok()) {
nego_ok_ = true;
RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len));
- } else { // s.IsComplete() (equivalent to SASL_CONTINUE)
+ } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE)
RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len));
}
return Status::OK();
}
Status SaslServer::SendChallengeResponse(const char* challenge, unsigned clen)
{
- if (clen < 1) {
- Status s = Status::NotAuthorized("SASL library did not provide challenge
token!");
- RETURN_NOT_OK(SendSaslError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
- return s;
- }
-
SaslMessagePB response;
response.set_state(SaslMessagePB::CHALLENGE);
response.mutable_token()->assign(challenge, clen);