http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/docs/src/main/asciidoc/chapters/kerberos.txt ---------------------------------------------------------------------- diff --git a/docs/src/main/asciidoc/chapters/kerberos.txt b/docs/src/main/asciidoc/chapters/kerberos.txt new file mode 100644 index 0000000..3dcac6d --- /dev/null +++ b/docs/src/main/asciidoc/chapters/kerberos.txt @@ -0,0 +1,355 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +== Kerberos + +=== Overview + +Kerberos is a network authentication protocol that provides a secure way for +peers to prove their identity over an unsecure network in a client-server model. +A centralized key-distribution center (KDC) is the service that coordinates +authentication between a client and a server. Clients and servers use "tickets", +obtained from the KDC via a password or a special file called a "keytab", to +communicate with the KDC and prove their identity. A KDC administrator must +create the principal (name for the client/server identiy) and the password +or keytab, securely passing the necessary information to the actual user/service. +Properly securing the KDC and generated ticket material is central to the security +model and is mentioned only as a warning to administrators running their own KDC. + +To interact with Kerberos programmatically, GSSAPI and SASL are two standards +which allow cross-language integration with Kerberos for authentication. GSSAPI, +the generic security service application program interface, is a standard which +Kerberos implements. In the Java programming language, the language itself also implements +GSSAPI which is leveraged by other applications, like Apache Hadoop and Apache Thrift. +SASL, simple authentication and security layer, is a framework for authentication and +and security over the network. SASL provides a number of mechanisms for authentication, +one of which is GSSAPI. Thus, SASL provides the transport which authenticates +using GSSAPI that Kerberos implements. + +Kerberos is a very complicated software application and is deserving of much +more description than can be provided here. An http://www.roguelynn.com/words/explain-like-im-5-kerberos/[explain like +I'm 5] blog post is very good at distilling the basics, while http://web.mit.edu/kerberos/[MIT Kerberos's project page] +contains lots of documentation for users or administrators. Various Hadoop "vendors" +also provide free documentation that includes step-by-step instructions for +configuring Hadoop and ZooKeeper (which will be henceforth considered as prerequisites). + +=== Within Hadoop + +Out of the box, HDFS and YARN have no ability to enforce that a user is who +they claim they are. Thus, any basic Hadoop installation should be treated as +unsecure: any user with access to the cluster has the ability to access any data. +Using Kerberos to provide authentication, users can be strongly identified, delegating +to Kerberos to determine who a user is and enforce that a user is who they claim to be. +As such, Kerberos is widely used across the entire Hadoop ecosystem for strong +authentication. Since server processes accessing HDFS or YARN are required +to use Kerberos to authenticate with HDFS, it makes sense that they also require +Kerberos authentication from their clients, in addition to other features provided +by SASL. + +A typical deployment involves the creation of Kerberos principals for all server +processes (Hadoop datanodes and namenode(s), ZooKeepers), the creation of a keytab +file for each principal and then proper configuration for the Hadoop site xml files. +Users also need Kerberos principals created for them; however, a user typically +uses a password to identify themselves instead of a keytab. Users can obtain a +ticket granting ticket (TGT) from the KDC using their password which allows them +to authenticate for the lifetime of the TGT (typically one day by default) and alleviates +the need for further password authentication. + +For client server applications, like web servers, a keytab can be created which +allow for fully-automated Kerberos identification removing the need to enter any +password, at the cost of needing to protect the keytab file. These principals +will apply directly to authentication for clients accessing Accumulo and the +Accumulo processes accessing HDFS. + +=== Configuring Accumulo + +To configure Accumulo for use with Kerberos, both client-facing and server-facing +changes must be made for a functional system on secured Hadoop. As previously mentioned, +numerous guidelines already exist on the subject of configuring Hadoop and ZooKeeper for +use with Kerberos and won't be covered here. It is assumed that you have functional +Hadoop and ZooKeeper already installed. + +==== Servers + +The first step is to obtain a Kerberos identity for the Accumulo server processes. +When running Accumulo with Kerberos enabled, a valid Kerberos identity will be required +to initiate any RPC between Accumulo processes (e.g. Master and TabletServer) in addition +to any HDFS action (e.g. client to HDFS or TabletServer to HDFS). + +===== Generate Principal and Keytab + +In the +kadmin.local+ shell or using the +-q+ option on +kadmin.local+, create a +principal for Accumulo for all hosts that are running Accumulo processes. A Kerberos +principal is of the form "primary/instance@REALM". "accumulo" is commonly the "primary" +(although not required) and the "instance" is the fully-qualified domain name for +the host that will be running the Accumulo process -- this is required. + +---- +kadmin.local -q "addprinc -randkey accumulo/host.domain.com" +---- + +Perform the above for each node running Accumulo processes in the instance, modifying +"host.domain.com" for your network. The +randkey+ option generates a random password +because we will use a keytab for authentication, not a password, since the Accumulo +server processes don't have an interactive console to enter a password into. + +---- +kadmin.local -q "xst -k accumulo.hostname.keytab accumulo/host.domain.com" +---- + +To simplify deployments, at thet cost of security, all Accumulo principals could +be globbed into a single keytab + +---- +kadmin.local -q "xst -k accumulo.service.keytab -glob accumulo*" +---- + +To ensure that the SASL handshake can occur from clients to servers and servers to servers, +all Accumulo servers must share the same instance and realm principal components as the +"client" must know these to setup the connection with the "server". + +===== Server Configuration + +A number of properties need to be changed to account to properly configure servers +in +accumulo-site.xml+. + +* *general.kerberos.keytab*=_/etc/security/keytabs/accumulo.service.keytab_ +** The path to the keytab for Accumulo on local filesystem. +** Change the value to the actual path on your system. +* *general.kerberos.principal*=_accumulo/_HOST@REALM_ +** The Kerberos principal for Accumulo, needs to match the keytab. +** "_HOST" can be used instead of the actual hostname in the principal and will be +automatically expanded to the current FQDN which reduces the configuration file burden. +* *instance.rpc.sasl.enabled*=_true_ +** Enables SASL for the Thrift Servers (supports GSSAPI) +* *instance.security.authenticator*=_org.apache.accumulo.server.security.handler.KerberosAuthenticator_ +** Configures Accumulo to use the Kerberos principal as the Accumulo username/principal +* *instance.security.authorizor*=_org.apache.accumulo.server.security.handler.KerberosAuthorizor_ +** Configures Accumulo to use the Kerberos principal for authorization purposes +* *instance.security.permissionHandler*=_org.apache.accumulo.server.security.handler.KerberosPermissionHandler_ +** Configures Accumulo to use the Kerberos principal for permission purposes +* *trace.token.type*=_org.apache.accumulo.core.client.security.tokens.KerberosToken_ +** Configures the Accumulo Tracer to use the KerberosToken for authentication when +serializing traces to the trace table. +* *trace.user*=_accumulo/_HOST@REALM_ +** The tracer process needs valid credentials to serialize traces to Accumulo. +** While the other server processes are creating a SystemToken from the provided keytab and principal, we can +still use a normal KerberosToken and the same keytab/principal to serialize traces. Like +non-Kerberized instances, the table must be created and permissions granted to the trace.user. +** The same +_HOST+ replacement is performed on this value, substituted the FQDN for +_HOST+. + +Although it should be a prerequisite, it is ever important that you have DNS properly +configured for your nodes and that Accumulo is configured to use the FQDN. It +is extremely important to use the FQDN in each of the "hosts" files for each +Accumulo process: +masters+, +monitors+, +slaves+, +tracers+, and +gc+. + +===== KerberosAuthenticator + +The +KerberosAuthenticator+ is an implementation of the pluggable security interfaces +that Accumulo provides. It builds on top of what the default ZooKeeper-based implementation, +but removes the need to create user accounts with passwords in Accumulo for clients. As +long as a client has a valid Kerberos identity, they can connect to and interact with +Accumulo, but without any permissions (e.g. cannot create tables or write data). Leveraging +ZooKeeper removes the need to change the permission handler and authorizor, so other Accumulo +functions regarding permissions and cell-level authorizations do not change. + +It is extremely important to note that, while user operations like +SecurityOperations.listLocalUsers()+, ++SecurityOperations.dropLocalUser()+, and +SecurityOperations.createLocalUser()+ will not return +errors, these methods are not equivalent to normal installations, as they will only operate on +users which have, at one point in time, authenticated with Accumulo using their Kerberos identity. +The KDC is still the authoritative entity for user management. The previously mentioned methods +are provided as they simplify management of users within Accumulo, especially with respect +to granting Authorizations and Permissions to new users. + +===== Verifying secure access + +To verify that servers have correctly started with Kerberos enabled, ensure that the processes +are actually running (they should exit immediately if login fails) and verify that you see +something similar to the following in the application log. + +---- +2015-01-07 11:57:56,826 [security.SecurityUtil] INFO : Attempting to login with keytab as accumulo/[email protected] +2015-01-07 11:57:56,830 [security.UserGroupInformation] INFO : Login successful for user accumulo/[email protected] using keytab file /etc/security/keytabs/accumulo.service.keytab +---- + +==== Clients + +===== Create client principal + +Like the Accumulo servers, clients must also have a Kerberos principal created for them. The +primary difference between a server principal is that principals for users are created +with a password and also not qualified to a specific instance (host). + +---- +kadmin.local -q "addprinc $user" +---- + +The above will prompt for a password for that user which will be used to identify that $user. +The user can verify that they can authenticate with the KDC using the command `kinit $user`. +Upon entering the correct password, a local credentials cache will be made which can be used +to authenticate with Accumulo, access HDFS, etc. + +The user can verify the state of their local credentials cache by using the command `klist`. + +---- +$ klist +Ticket cache: FILE:/tmp/krb5cc_123 +Default principal: [email protected] + +Valid starting Expires Service principal +01/07/2015 11:56:35 01/08/2015 11:56:35 krbtgt/[email protected] + renew until 01/14/2015 11:56:35 +---- + +===== Configuration + +The second thing clients need to do is to set up their client configuration file. By +default, this file is stored in +~/.accumulo/conf+, +$ACCUMULO_CONF_DIR/client.conf+ or ++$ACCUMULO_HOME/conf/client.conf+. Accumulo utilities also allow you to provide your own +copy of this file in any location using the +--config-file+ command line option. + +Three items need to be set to enable access to Accumulo: + +* +instance.rpc.sasl.enabled+=_true_ +* +kerberos.server.primary+=_accumulo_ +* +kerberos.server.realm+=_EXAMPLE.COM_ + +The second and third properties *must* match the configuration of the accumulo servers; this is +required to set up the SASL transport. + +==== Debugging + +*Q*: I have valid Kerberos credentials and a correct client configuration file but +I still get errors like: + +---- +java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] +---- + +*A*: When you have a valid client configuration and Kerberos TGT, it is possible that the search +path for your local credentials cache is incorrect. Check the value of the KRB5CCNAME environment +value, and ensure it matches the value reported by `klist`. + +---- +$ echo $KRB5CCNAME + +$ klist +Ticket cache: FILE:/tmp/krb5cc_123 +Default principal: [email protected] + +Valid starting Expires Service principal +01/07/2015 11:56:35 01/08/2015 11:56:35 krbtgt/[email protected] + renew until 01/14/2015 11:56:35 +$ export KRB5CCNAME=/tmp/krb5cc_123 +$ echo $KRB5CCNAME +/tmp/krb5cc_123 +---- + +*Q*: I thought I had everything configured correctly, but my client/server still fails to log in. +I don't know what is actually failing. + +*A*: Add the following system property to the JVM invocation: + +---- +-Dsun.security.krb5.debug=true +---- + +This will enable lots of extra debugging at the JVM level which is often sufficient to +diagnose some high-level configuration problem. Client applications can add this system property by +hand to the command line and Accumulo server processes or applications started using the `accumulo` +script by adding the property to +ACCUMULO_GENERAL_OPTS+ in +$ACCUMULO_CONF_DIR/accumulo-env.sh+. + +Additionally, you can increase the log4j levels on +org.apache.hadoop.security+, which includes the +Hadoop +UserGroupInformation+ class, which will include some high-level debug statements. This +can be controlled in your client application, or using +$ACCUMULO_CONF_DIR/generic_logger.xml+ + +*Q*: All of my Accumulo processes successfully start and log in with their +keytab, but they are unable to communicate with each other, showing the +following errors: + +---- +2015-01-12 14:47:27,055 [transport.TSaslTransport] ERROR: SASL negotiation failure +javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)] + at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) + at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94) + at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) + at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37) + at org.apache.accumulo.core.rpc.UGIAssumingTransport$1.run(UGIAssumingTransport.java:53) + at org.apache.accumulo.core.rpc.UGIAssumingTransport$1.run(UGIAssumingTransport.java:49) + at java.security.AccessController.doPrivileged(Native Method) + at javax.security.auth.Subject.doAs(Subject.java:415) + at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) + at org.apache.accumulo.core.rpc.UGIAssumingTransport.open(UGIAssumingTransport.java:49) + at org.apache.accumulo.core.rpc.ThriftUtil.createClientTransport(ThriftUtil.java:357) + at org.apache.accumulo.core.rpc.ThriftUtil.createTransport(ThriftUtil.java:255) + at org.apache.accumulo.server.master.LiveTServerSet$TServerConnection.getTableMap(LiveTServerSet.java:106) + at org.apache.accumulo.master.Master.gatherTableInformation(Master.java:996) + at org.apache.accumulo.master.Master.access$600(Master.java:160) + at org.apache.accumulo.master.Master$StatusThread.updateStatus(Master.java:911) + at org.apache.accumulo.master.Master$StatusThread.run(Master.java:901) +Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER) + at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:710) + at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248) + at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179) + at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193) + ... 16 more +Caused by: KrbException: Server not found in Kerberos database (7) - LOOKING_UP_SERVER + at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:73) + at sun.security.krb5.KrbTgsReq.getReply(KrbTgsReq.java:192) + at sun.security.krb5.KrbTgsReq.sendAndGetCreds(KrbTgsReq.java:203) + at sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:309) + at sun.security.krb5.internal.CredentialsUtil.acquireServiceCreds(CredentialsUtil.java:115) + at sun.security.krb5.Credentials.acquireServiceCreds(Credentials.java:454) + at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:641) + ... 19 more +Caused by: KrbException: Identifier doesn't match expected value (906) + at sun.security.krb5.internal.KDCRep.init(KDCRep.java:143) + at sun.security.krb5.internal.TGSRep.init(TGSRep.java:66) + at sun.security.krb5.internal.TGSRep.<init>(TGSRep.java:61) + at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:55) + ... 25 more +---- + +or + +---- +2015-01-12 14:47:29,440 [server.TThreadPoolServer] ERROR: Error occurred during processing of message. +java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: Peer indicated failure: GSS initiate failed + at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) + at org.apache.accumulo.core.rpc.UGIAssumingTransportFactory$1.run(UGIAssumingTransportFactory.java:51) + at org.apache.accumulo.core.rpc.UGIAssumingTransportFactory$1.run(UGIAssumingTransportFactory.java:48) + at java.security.AccessController.doPrivileged(Native Method) + at javax.security.auth.Subject.doAs(Subject.java:356) + at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1608) + at org.apache.accumulo.core.rpc.UGIAssumingTransportFactory.getTransport(UGIAssumingTransportFactory.java:48) + at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:208) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) + at java.lang.Thread.run(Thread.java:745) +Caused by: org.apache.thrift.transport.TTransportException: Peer indicated failure: GSS initiate failed + at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:190) + at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125) + at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) + at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) + at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) + ... 10 more +---- + +*A*: As previously mentioned, the hostname, and subsequently the address each Accumulo process is bound/listening +on, is extremely important when negotiating an SASL connection. This problem commonly arises when the Accumulo +servers are not configured to listen on the address denoted by their FQDN. + +The values in the Accumulo "hosts" files (In +$ACCUMULO_CONF_DIR+: +masters+, +monitors+, +slaves+, +tracers+, +and +gc+) should match the instance componentof the Kerberos server principal (e.g. +host+ in +accumulo/host\@EXAMPLE.COM+).
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java index d2a999d..76f332b 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java @@ -38,6 +38,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -557,7 +558,21 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { s.close(); } } - Process initProcess = exec(Initialize.class, "--instance-name", config.getInstanceName(), "--password", config.getRootPassword()); + + LinkedList<String> args = new LinkedList<>(); + args.add("--instance-name"); + args.add(config.getInstanceName()); + args.add("--user"); + args.add(config.getRootUserName()); + + // If we aren't using SASL, add in the root password + final String saslEnabled = config.getSiteConfig().get(Property.INSTANCE_RPC_SASL_ENABLED.getKey()); + if (null == saslEnabled || !Boolean.parseBoolean(saslEnabled)) { + args.add("--password"); + args.add(config.getRootPassword()); + } + + Process initProcess = exec(Initialize.class, args.toArray(new String[0])); int ret = initProcess.waitFor(); if (ret != 0) { throw new RuntimeException("Initialize process returned " + ret + ". Check the logs in " + config.getLogDir() + " for errors."); http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java index 26c23ed..6d674f3 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java @@ -52,6 +52,7 @@ public class MiniAccumuloConfigImpl { private Map<String,String> systemProperties = new HashMap<String,String>(); private String instanceName = "miniInstance"; + private String rootUserName = "root"; private File libDir; private File libExtDir; @@ -667,4 +668,23 @@ public class MiniAccumuloConfigImpl { public Configuration getHadoopConfiguration() { return hadoopConf; } + + /** + * @return the default Accumulo "superuser" + * @since 1.7.0 + */ + public String getRootUserName() { + return rootUserName; + } + + /** + * Sets the default Accumulo "superuser". + * + * @param rootUserName + * The name of the user to create with administrative permissions during initialization + * @since 1.7.0 + */ + public void setRootUserName(String rootUserName) { + this.rootUserName = rootUserName; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6bffbe1..2c21ff6 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ <!-- surefire/failsafe plugin option --> <forkCount>1</forkCount> <!-- overwritten in profiles hadoop-2 --> - <hadoop.version>2.2.0</hadoop.version> + <hadoop.version>2.3.0</hadoop.version> <htrace.version>3.0.4</htrace.version> <httpclient.version>3.1</httpclient.version> <java.ver>1.7</java.ver> @@ -359,6 +359,11 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-tools</artifactId> <version>${hadoop.version}</version> </dependency> @@ -877,6 +882,12 @@ </rules> </configuration> </plugin> + <plugin> + <!-- Allows us to get the apache-ds bundle artifacts --> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>2.4.0</version> + </plugin> </plugins> </pluginManagement> <plugins> @@ -1070,6 +1081,13 @@ </execution> </executions> </plugin> + <plugin> + <!-- Allows us to get the apache-ds bundle artifacts --> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <inherited>true</inherited> + </plugin> </plugins> <extensions> <extension> @@ -1303,7 +1321,7 @@ <!-- Denotes intention and allows the enforcer plugin to pass when the user is relying on default behavior; won't work to activate profile --> <hadoop.profile>2</hadoop.profile> - <hadoop.version>2.2.0</hadoop.version> + <hadoop.version>2.3.0</hadoop.version> <httpclient.version>3.1</httpclient.version> <slf4j.version>1.7.5</slf4j.version> </properties> @@ -1320,7 +1338,7 @@ </property> </activation> <properties> - <hadoop.version>2.2.0</hadoop.version> + <hadoop.version>2.3.0</hadoop.version> <httpclient.version>3.1</httpclient.version> <slf4j.version>1.7.5</slf4j.version> </properties> http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java ---------------------------------------------------------------------- diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java index 7eb4fbf..81509ee 100644 --- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.proxy.thrift.AccumuloProxy; import org.apache.accumulo.server.rpc.RpcWrapper; +import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; import org.apache.log4j.Logger; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TCompactProtocol; @@ -135,7 +136,7 @@ public class Proxy { @SuppressWarnings("unchecked") Constructor<? extends TProcessor> proxyProcConstructor = (Constructor<? extends TProcessor>) proxyProcClass.getConstructor(proxyIfaceClass); - final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl)); + final TProcessor processor = proxyProcConstructor.newInstance(TCredentialsUpdatingWrapper.service(RpcWrapper.service(impl), impl.getClass())); THsHaServer.Args args = new THsHaServer.Args(socket); args.processor(processor); http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java index 09ae4f4..84c3853 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java @@ -16,14 +16,24 @@ */ package org.apache.accumulo.server; +import java.io.IOException; + import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.base.Preconditions; /** * Provides a server context for Accumulo server components that operate with the system credentials and have access to the system files and configuration. @@ -38,6 +48,31 @@ public class AccumuloServerContext extends ClientContext { public AccumuloServerContext(ServerConfigurationFactory confFactory) { super(confFactory.getInstance(), getCredentials(confFactory.getInstance()), confFactory.getConfiguration()); this.confFactory = confFactory; + if (null != getServerSaslParams()) { + // Server-side "client" check to make sure we're logged in as a user we expect to be + enforceKerberosLogin(); + } + } + + /** + * A "client-side" assertion for servers to validate that they are logged in as the expected user, per the configuration, before performing any RPC + */ + // Should be private, but package-protected so EasyMock will work + void enforceKerberosLogin() { + final AccumuloConfiguration conf = confFactory.getSiteConfiguration(); + // Unwrap _HOST into the FQDN to make the kerberos principal we'll compare against + final String kerberosPrincipal = SecurityUtil.getServerPrincipal(conf.get(Property.GENERAL_KERBEROS_PRINCIPAL)); + UserGroupInformation loginUser; + try { + // The system user should be logged in via keytab when the process is started, not the currentUser() like KerberosToken + loginUser = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + throw new RuntimeException("Could not get login user", e); + } + + Preconditions.checkArgument(loginUser.hasKerberosCredentials(), "Server does not have Kerberos credentials"); + Preconditions.checkArgument(kerberosPrincipal.equals(loginUser.getUserName()), + "Expected login user to be " + kerberosPrincipal + " but was " + loginUser.getUserName()); } /** @@ -64,4 +99,35 @@ public class AccumuloServerContext extends ClientContext { return SslConnectionParams.forServer(getConfiguration()); } + public SaslConnectionParams getServerSaslParams() { + // Not functionally different than the client SASL params, just uses the site configuration + return SaslConnectionParams.forConfig(getServerConfigurationFactory().getSiteConfiguration()); + } + + /** + * Determine the type of Thrift server to instantiate given the server's configuration. + * + * @return A {@link ThriftServerType} value to denote the type of Thrift server to construct + */ + public ThriftServerType getThriftServerType() { + AccumuloConfiguration conf = getConfiguration(); + if (conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED)) { + if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + throw new IllegalStateException("Cannot create a Thrift server capable of both SASL and SSL"); + } + + return ThriftServerType.SSL; + } else if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + if (conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED)) { + throw new IllegalStateException("Cannot create a Thrift server capable of both SASL and SSL"); + } + + return ThriftServerType.SASL; + } else { + // Lets us control the type of Thrift server created, primarily for benchmarking purposes + String serverTypeName = conf.get(Property.GENERAL_RPC_SERVER_TYPE); + return ThriftServerType.get(serverTypeName); + } + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 2da6ba0..4a9f1e7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -95,11 +95,13 @@ import org.apache.accumulo.server.tablets.TabletTime; import org.apache.accumulo.server.util.ReplicationTableUtil; import org.apache.accumulo.server.util.TablePropUtil; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; @@ -280,11 +282,27 @@ public class Initialize { log.fatal("Failed to talk to zookeeper", e); return false; } - opts.rootpass = getRootPassword(opts); - return initialize(opts, instanceNamePath, fs); + + String rootUser; + try { + rootUser = getRootUserName(opts); + } catch (Exception e) { + log.fatal("Failed to obtain user for administrative privileges"); + return false; + } + + // Don't prompt for a password when we're running SASL(Kerberos) + final AccumuloConfiguration siteConf = SiteConfiguration.getInstance(); + if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + opts.rootpass = UUID.randomUUID().toString().getBytes(UTF_8); + } else { + opts.rootpass = getRootPassword(opts, rootUser); + } + + return initialize(opts, instanceNamePath, fs, rootUser); } - private static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) { + private static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs, String rootUser) { UUID uuid = UUID.randomUUID(); // the actual disk locations of the root table and tablets @@ -320,9 +338,38 @@ public class Initialize { return false; } + final ServerConfigurationFactory confFactory = new ServerConfigurationFactory(HdfsZooInstance.getInstance()); + + // When we're using Kerberos authentication, we need valid credentials to perform initialization. If the user provided some, use them. + // If they did not, fall back to the credentials present in accumulo-site.xml that the servers will use themselves. try { - AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())); - initSecurity(context, opts, uuid.toString()); + final SiteConfiguration siteConf = confFactory.getSiteConfiguration(); + if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + final UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + // We don't have any valid creds to talk to HDFS + if (!ugi.hasKerberosCredentials()) { + final String accumuloKeytab = siteConf.get(Property.GENERAL_KERBEROS_KEYTAB), accumuloPrincipal = siteConf.get(Property.GENERAL_KERBEROS_PRINCIPAL); + + // Fail if the site configuration doesn't contain appropriate credentials to login as servers + if (StringUtils.isBlank(accumuloKeytab) || StringUtils.isBlank(accumuloPrincipal)) { + log.fatal("No Kerberos credentials provided, and Accumulo is not properly configured for server login"); + return false; + } + + log.info("Logging in as " + accumuloPrincipal + " with " + accumuloKeytab); + + // Login using the keytab as the 'accumulo' user + UserGroupInformation.loginUserFromKeytab(accumuloPrincipal, accumuloKeytab); + } + } + } catch (IOException e) { + log.fatal("Failed to get the Kerberos user", e); + return false; + } + + try { + AccumuloServerContext context = new AccumuloServerContext(confFactory); + initSecurity(context, opts, uuid.toString(), rootUser); } catch (Exception e) { log.fatal("Failed to initialize security", e); return false; @@ -525,18 +572,43 @@ public class Initialize { return instanceNamePath; } - private static byte[] getRootPassword(Opts opts) throws IOException { + private static String getRootUserName(Opts opts) throws IOException { + AccumuloConfiguration conf = SiteConfiguration.getInstance(); + final String keytab = conf.get(Property.GENERAL_KERBEROS_KEYTAB); + if (keytab.equals(Property.GENERAL_KERBEROS_KEYTAB.getDefaultValue()) || !conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + return DEFAULT_ROOT_USER; + } + + ConsoleReader c = getConsoleReader(); + c.println("Running against secured HDFS"); + + if (null != opts.rootUser) { + return opts.rootUser; + } + + do { + String user = c.readLine("Principal (user) to grant administrative privileges to : "); + if (user == null) { + // should not happen + System.exit(1); + } + if (!user.isEmpty()) { + return user; + } + } while (true); + } + + private static byte[] getRootPassword(Opts opts, String rootUser) throws IOException { if (opts.cliPassword != null) { return opts.cliPassword.getBytes(UTF_8); } String rootpass; String confirmpass; do { - rootpass = getConsoleReader() - .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " (this may not be applicable for your security setup): ", '*'); + rootpass = getConsoleReader().readLine("Enter initial password for " + rootUser + " (this may not be applicable for your security setup): ", '*'); if (rootpass == null) System.exit(0); - confirmpass = getConsoleReader().readLine("Confirm initial password for " + DEFAULT_ROOT_USER + ": ", '*'); + confirmpass = getConsoleReader().readLine("Confirm initial password for " + rootUser + ": ", '*'); if (confirmpass == null) System.exit(0); if (!rootpass.equals(confirmpass)) @@ -545,8 +617,9 @@ public class Initialize { return rootpass.getBytes(UTF_8); } - private static void initSecurity(AccumuloServerContext context, Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException { - AuditedSecurityOperation.getInstance(context, true).initializeSecurity(context.rpcCreds(), DEFAULT_ROOT_USER, opts.rootpass); + private static void initSecurity(AccumuloServerContext context, Opts opts, String iid, String rootUser) throws AccumuloSecurityException, + ThriftSecurityException, IOException { + AuditedSecurityOperation.getInstance(context, true).initializeSecurity(context.rpcCreds(), rootUser, opts.rootpass); } public static void initSystemTablesConfig() throws IOException { @@ -635,6 +708,8 @@ public class Initialize { String cliInstanceName; @Parameter(names = "--password", description = "set the password on the command line") String cliPassword; + @Parameter(names = {"-u", "--user"}, description = "the name of the user to grant system permissions to") + String rootUser = null; byte[] rootpass = null; } @@ -653,8 +728,9 @@ public class Initialize { if (opts.resetSecurity) { AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())); if (isInitialized(fs)) { - opts.rootpass = getRootPassword(opts); - initSecurity(context, opts, HdfsZooInstance.getInstance().getInstanceID()); + final String rootUser = getRootUserName(opts); + opts.rootpass = getRootPassword(opts, rootUser); + initSecurity(context, opts, HdfsZooInstance.getInstance().getInstanceID(), rootUser); } else { log.fatal("Attempted to reset security on accumulo before it was initialized"); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java new file mode 100644 index 0000000..f8400e2 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.server.security.SystemCredentials.SystemToken; +import org.apache.accumulo.server.thrift.UGIAssumingProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extracts the TCredentials object from the RPC argument list and asserts that the Accumulo principal is equal to the Kerberos 'primary' component of the + * Kerberos principal (e.g. Accumulo principal of "frank" equals "frank" from "frank/hostname@DOMAIN"). + */ +public class TCredentialsUpdatingInvocationHandler<I> implements InvocationHandler { + private static final Logger log = LoggerFactory.getLogger(TCredentialsUpdatingInvocationHandler.class); + + private static final ConcurrentHashMap<String,Class<? extends AuthenticationToken>> TOKEN_CLASS_CACHE = new ConcurrentHashMap<>(); + private final I instance; + + protected TCredentialsUpdatingInvocationHandler(final I serverInstance) { + instance = serverInstance; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + updateArgs(args); + + return invokeMethod(method, args); + } + + /** + * Try to find a TCredentials object in the argument list, and, when the AuthenticationToken is a KerberosToken, set the principal from the SASL server as the + * TCredentials principal. This ensures that users can't spoof a different principal into the Credentials than what they used to authenticate. + */ + protected void updateArgs(Object[] args) throws ThriftSecurityException { + // If we don't have at least two args + if (args == null || args.length < 2) { + return; + } + + TCredentials tcreds = null; + if (args[0] != null && args[0] instanceof TCredentials) { + tcreds = (TCredentials) args[0]; + } else if (args[1] != null && args[1] instanceof TCredentials) { + tcreds = (TCredentials) args[1]; + } + + // If we don't find a tcredentials in the first two positions + if (null == tcreds) { + // Not all calls require authentication (e.g. closeMultiScan). We need to let these pass through. + log.trace("Did not find a TCredentials object in the first two positions of the argument list, not updating principal"); + return; + } + + Class<? extends AuthenticationToken> tokenClass = getTokenClassFromName(tcreds.tokenClassName); + // If the authentication token isn't a KerberosToken + if (!KerberosToken.class.isAssignableFrom(tokenClass) && !SystemToken.class.isAssignableFrom(tokenClass)) { + // Don't include messages about SystemToken since it's internal + log.debug("Will not update principal on authentication tokens other than KerberosToken. Received " + tokenClass); + throw new ThriftSecurityException("Did not receive a valid token", SecurityErrorCode.BAD_CREDENTIALS); + } + + // The Accumulo principal extracted from the SASL transport + final String principal = UGIAssumingProcessor.currentPrincipal(); + + if (null == principal) { + log.debug("Found KerberosToken in TCredentials, but did not receive principal from SASL processor"); + throw new ThriftSecurityException("Did not extract principal from Thrift SASL processor", SecurityErrorCode.BAD_CREDENTIALS); + } + + // The principal from the SASL transport should match what the user requested as their Accumulo principal + if (!principal.equals(tcreds.principal)) { + final String msg = "Principal in credentials object should match kerberos principal. Expected '" + principal + "' but was '" + tcreds.principal + "'"; + log.warn(msg); + throw new ThriftSecurityException(msg, SecurityErrorCode.BAD_CREDENTIALS); + } + } + + protected Class<? extends AuthenticationToken> getTokenClassFromName(String tokenClassName) { + Class<? extends AuthenticationToken> typedClz = TOKEN_CLASS_CACHE.get(tokenClassName); + if (null == typedClz) { + Class<?> clz; + try { + clz = Class.forName(tokenClassName); + } catch (ClassNotFoundException e) { + log.debug("Could not create class from token name: " + tokenClassName, e); + return null; + } + typedClz = clz.asSubclass(AuthenticationToken.class); + } + TOKEN_CLASS_CACHE.putIfAbsent(tokenClassName, typedClz); + return typedClz; + } + + private Object invokeMethod(Method method, Object[] args) throws Throwable { + try { + return method.invoke(instance, args); + } catch (InvocationTargetException ex) { + throw ex.getCause(); + } + } + + /** + * Visibile for testing + */ + protected ConcurrentHashMap<String,Class<? extends AuthenticationToken>> getTokenCache() { + return TOKEN_CLASS_CACHE; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java new file mode 100644 index 0000000..4621d36 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; + +/** + * Utility method to ensure that the instance of TCredentials which is passed to the implementation of a Thrift service has the correct principal from SASL at + * the Thrift transport layer when SASL/GSSAPI (kerberos) is enabled. This ensures that we use the strong authentication provided to us and disallow any other + * principal names that client (malicious or otherwise) might pass in. + */ +public class TCredentialsUpdatingWrapper { + + public static <T> T service(final T instance, final Class<? extends T> originalClass) { + InvocationHandler handler = new TCredentialsUpdatingInvocationHandler<T>(instance); + + @SuppressWarnings("unchecked") + T proxiedInstance = (T) Proxy.newProxyInstance(originalClass.getClassLoader(), originalClass.getInterfaces(), handler); + + return proxiedInstance; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 641c0bf..985df9c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.server.rpc; +import java.io.IOException; import java.lang.reflect.Field; import java.net.BindException; import java.net.InetAddress; @@ -33,26 +34,34 @@ import javax.net.ssl.SSLServerSocket; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.LoggingRunnable; import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.thrift.UGIAssumingProcessor; import org.apache.accumulo.server.util.Halt; import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TSSLTransportFactory; +import org.apache.thrift.transport.TSaslServerTransport; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; /** @@ -115,6 +124,11 @@ public class TServerUtils { portSearch = config.getBoolean(portSearchProperty); final int simpleTimerThreadpoolSize = config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE); + final ThriftServerType serverType = service.getThriftServerType(); + + if (ThriftServerType.SASL == serverType) { + processor = updateSaslProcessor(serverType, processor); + } // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once TimedProcessor timedProcessor = new TimedProcessor(config, processor, serverName, threadName); @@ -135,8 +149,9 @@ public class TServerUtils { port = 1024 + port % (65535 - 1024); try { HostAndPort addr = HostAndPort.fromParts(hostname, port); - return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks, - maxMessageSize, service.getServerSslParams(), service.getClientTimeoutInMillis()); + return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads, + simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize, + service.getServerSslParams(), service.getServerSaslParams(), service.getClientTimeoutInMillis()); } catch (TTransportException ex) { log.error("Unable to start TServer", ex); if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) { @@ -209,7 +224,31 @@ public class TServerUtils { } /** - * Create a TThreadPoolServer with the given transport and processor + * Creates a TTheadPoolServer for normal unsecure operation. Useful for comparing performance against SSL or SASL transports. + * + * @param address + * Address to bind to + * @param processor + * TProcessor for the server + * @param maxMessageSize + * Maximum size of a Thrift message allowed + * @return A configured TThreadPoolServer and its bound address information + */ + public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize) throws TTransportException { + + TServerSocket transport = new TServerSocket(address.getPort()); + TThreadPoolServer server = createThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize)); + + if (address.getPort() == 0) { + address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort()); + } + + return new ServerAddress(server, address); + + } + + /** + * Create a TThreadPoolServer with the given transport and processo with the default transport factory.r * * @param transport * TServerTransport for the server @@ -218,9 +257,23 @@ public class TServerUtils { * @return A configured TThreadPoolServer */ public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor) { - final TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); + return createThreadPoolServer(transport, processor, ThriftUtil.transportFactory()); + } + + /** + * Create a TServer with the provided server transport, processor and transport factory. + * + * @param transport + * TServerTransport for the server + * @param processor + * TProcessor for the server + * @param transportFactory + * TTransportFactory for the server + */ + public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) { + TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); options.protocolFactory(ThriftUtil.protocolFactory()); - options.transportFactory(ThriftUtil.transportFactory()); + options.transportFactory(transportFactory); options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor)); return new TThreadPoolServer(options); } @@ -284,7 +337,7 @@ public class TServerUtils { */ public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams) throws TTransportException { - org.apache.thrift.transport.TServerSocket transport; + TServerSocket transport; try { transport = getSslServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams); } catch (UnknownHostException e) { @@ -296,14 +349,63 @@ public class TServerUtils { return new ServerAddress(createThreadPoolServer(transport, processor), address); } - /** - * Create a Thrift server given the provided and Accumulo configuration. - */ - public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName, - int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) + public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SaslConnectionParams params, + final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { - return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads, - timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout); + // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does, + // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail + // when the server does an accept() to (presumably) wake up the eventing system. + log.info("Creating SASL thread pool thrift server on port=" + address.getPort()); + TServerSocket transport = new TServerSocket(address.getPort()); + + final String hostname; + try { + hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName(); + } catch (UnknownHostException e) { + throw new TTransportException(e); + } + + final UserGroupInformation serverUser; + try { + serverUser = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + throw new TTransportException(e); + } + + log.trace("Logged in as {}, creating TSsaslServerTransport factory as {}/{}", serverUser, params.getKerberosServerPrimary(), hostname); + + // Make the SASL transport factory with the instance and primary from the kerberos server principal, SASL properties + // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication ID. Despite the 'protocol' argument seeming to be useless, it + // *must* be the primary of the server. + TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory(); + saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(), + new SaslRpcServer.SaslGssCallbackHandler()); + + // Updates the clientAddress threadlocal so we know who the client's address + final ClientInfoProcessorFactory clientInfoFactory = new ClientInfoProcessorFactory(clientAddress, processor); + + // Make sure the TTransportFactory is performing a UGI.doAs + TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser); + + if (address.getPort() == 0) { + address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort()); + } + + return new ServerAddress(new TThreadPoolServer(new TThreadPoolServer.Args(transport).transportFactory(ugiTransportFactory) + .processorFactory(clientInfoFactory) + .protocolFactory(ThriftUtil.protocolFactory())), address); + } + + public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor, + String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, + SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException { + + if (ThriftServerType.SASL == serverType) { + processor = updateSaslProcessor(serverType, processor); + } + + return startTServer(address, serverType, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads, + timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout); } /** @@ -311,14 +413,33 @@ public class TServerUtils { * * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to. */ - public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads, - int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException { + public static ServerAddress startTServer(HostAndPort address,ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName, int numThreads, + int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, + SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException { + + // This is presently not supported. It's hypothetically possible, I believe, to work, but it would require changes in how the transports + // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's quality of protection addresses privacy issues. + Preconditions.checkArgument(!(sslParams != null && saslParams != null), "Cannot start a Thrift server using both SSL and SASL"); ServerAddress serverAddress; - if (sslParams != null) { - serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams); - } else { - serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize); + switch (serverType) { + case SSL: + log.debug("Instantiating SSL Thrift server"); + serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams); + break; + case SASL: + log.debug("Instantiating SASL Thrift server"); + serverAddress = createSaslThreadPoolServer(address, processor, serverSocketTimeout, saslParams, serverName, threadName, numThreads, numSTThreads, + timeBetweenThreadChecks, maxMessageSize); + break; + case THREADPOOL: + log.debug("Instantiating unsecure TThreadPool Thrift server"); + serverAddress = createBlockingServer(address, processor, maxMessageSize); + break; + case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default + default: + log.debug("Instantiating default, unsecure custom half-async Thrift server"); + serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize); } final TServer finalServer = serverAddress.server; @@ -368,4 +489,21 @@ public class TServerUtils { log.error("Unable to call shutdownNow", e); } } + + /** + * Wrap the provided processor in the {@link UGIAssumingProcessor} so Kerberos authentication works. Requires the <code>serverType</code> to be + * {@link ThriftServerType#SASL} and throws an exception when it is not. + * + * @return A {@link UGIAssumingProcessor} which wraps the provided processor + */ + private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProcessor processor) { + Preconditions.checkArgument(ThriftServerType.SASL == serverType); + + // Wrap the provided processor in our special processor which proxies the provided UGI on the logged-in UGI + // Important that we have Timed -> UGIAssuming -> [provided] to make sure that the metrics are still reported + // as the logged-in user. + log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass()); + + return new UGIAssumingProcessor(processor); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java new file mode 100644 index 0000000..60d5402 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import org.apache.commons.lang.StringUtils; + +/** + * The type of configured Thrift server to start. This is meant more as a developer knob to ensure that appropriate Thrift servers can be constructed to make a + * better test on the overhead of SSL or SASL. + * + * Both SSL and SASL don't presently work with TFramedTransport which means that the Thrift servers with asynchronous support will fail with these transports. + * As such, we want to ensure that any benchmarks against "unsecure" Accumulo use the same type of Thrift server. + */ +public enum ThriftServerType { + CUSTOM_HS_HA("custom_hs_ha"), THREADPOOL("threadpool"), SSL("ssl"), SASL("sasl"); + + private final String name; + + private ThriftServerType(String name) { + this.name = name; + } + + public static ThriftServerType get(String name) { + // Our custom HsHa server is the default (if none is provided) + if (StringUtils.isBlank(name)) { + return CUSTOM_HS_HA; + } + return ThriftServerType.valueOf(name.trim().toUpperCase()); + } + + @Override + public String toString() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java index 5fe57b7..7adb46e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java @@ -51,6 +51,7 @@ import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.security.handler.Authenticator; import org.apache.accumulo.server.security.handler.Authorizor; +import org.apache.accumulo.server.security.handler.KerberosAuthenticator; import org.apache.accumulo.server.security.handler.PermissionHandler; import org.apache.accumulo.server.security.handler.ZKAuthenticator; import org.apache.accumulo.server.security.handler.ZKAuthorizor; @@ -68,6 +69,7 @@ public class SecurityOperation { protected Authorizor authorizor; protected Authenticator authenticator; protected PermissionHandler permHandle; + protected boolean isKerberos; private static String rootUserName = null; private final ZooCache zooCache; private final String ZKUserPath; @@ -126,11 +128,11 @@ public class SecurityOperation { || !permHandle.validSecurityHandlers(authent, author)) throw new RuntimeException(authorizor + ", " + authenticator + ", and " + pm + " do not play nice with eachother. Please choose authentication and authorization mechanisms that are compatible with one another."); + + isKerberos = KerberosAuthenticator.class.isAssignableFrom(authenticator.getClass()); } public void initializeSecurity(TCredentials credentials, String rootPrincipal, byte[] token) throws AccumuloSecurityException, ThriftSecurityException { - authenticate(credentials); - if (!isSystemUser(credentials)) throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); @@ -160,11 +162,31 @@ public class SecurityOperation { throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.INVALID_INSTANCEID); Credentials creds = Credentials.fromThrift(credentials); + if (isSystemUser(credentials)) { if (!(context.getCredentials().equals(creds))) { + log.debug("Provided credentials did not match server's expected credentials. Expected " + context.getCredentials() + " but got " + creds); throw new ThriftSecurityException(creds.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS); } } else { + // Not the system user + + if (isKerberos) { + // If we have kerberos credentials for a user from the network but no account + // in the system, we need to make one before proceeding + try { + if (!authenticator.userExists(creds.getPrincipal())) { + // If we call the normal createUser method, it will loop back into this method + // when it tries to check if the user has permission to create users + _createUser(credentials, creds, Authorizations.EMPTY); + } + } catch (AccumuloSecurityException e) { + log.debug("Failed to determine if user exists", e); + throw e.asThriftException(); + } + } + + // Check that the user is authenticated (a no-op at this point for kerberos) try { if (!authenticator.authenticateUser(creds.getPrincipal(), creds.getToken())) { throw new ThriftSecurityException(creds.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS); @@ -190,6 +212,15 @@ public class SecurityOperation { return true; try { Credentials toCreds = Credentials.fromThrift(toAuth); + + if (isKerberos) { + // If we have kerberos credentials for a user from the network but no account + // in the system, we need to make one before proceeding + if (!authenticator.userExists(toCreds.getPrincipal())) { + createUser(credentials, toCreds, Authorizations.EMPTY); + } + } + return authenticator.authenticateUser(toCreds.getPrincipal(), toCreds.getToken()); } catch (AccumuloSecurityException e) { throw e.asThriftException(); @@ -579,14 +610,23 @@ public class SecurityOperation { public void createUser(TCredentials credentials, Credentials newUser, Authorizations authorizations) throws ThriftSecurityException { if (!canCreateUser(credentials, newUser.getPrincipal())) throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + _createUser(credentials, newUser, authorizations); + if (canChangeAuthorizations(credentials, newUser.getPrincipal())) { + try { + authorizor.changeAuthorizations(newUser.getPrincipal(), authorizations); + } catch (AccumuloSecurityException ase) { + throw ase.asThriftException(); + } + } + } + + protected void _createUser(TCredentials credentials, Credentials newUser, Authorizations authorizations) throws ThriftSecurityException { try { AuthenticationToken token = newUser.getToken(); authenticator.createUser(newUser.getPrincipal(), token); authorizor.initUser(newUser.getPrincipal()); permHandle.initUser(newUser.getPrincipal()); log.info("Created user " + newUser.getPrincipal() + " at the request of user " + credentials.getPrincipal()); - if (canChangeAuthorizations(credentials, newUser.getPrincipal())) - authorizor.changeAuthorizations(newUser.getPrincipal(), authorizations); } catch (AccumuloSecurityException ase) { throw ase.asThriftException(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java index 42d1313..6014139 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java @@ -69,10 +69,11 @@ public class SecurityUtil { */ public static boolean login(String principalConfig, String keyTabPath) { try { - String principalName = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principalConfig, InetAddress.getLocalHost().getCanonicalHostName()); + String principalName = getServerPrincipal(principalConfig); if (keyTabPath != null && principalName != null && keyTabPath.length() != 0 && principalName.length() != 0) { + log.info("Attempting to login with keytab as " + principalName); UserGroupInformation.loginUserFromKeytab(principalName, keyTabPath); - log.info("Succesfully logged in as user " + principalConfig); + log.info("Succesfully logged in as user " + principalName); return true; } } catch (IOException io) { @@ -80,4 +81,15 @@ public class SecurityUtil { } return false; } + + /** + * {@link org.apache.hadoop.security.SecurityUtil#getServerPrincipal(String, String)} + */ + public static String getServerPrincipal(String configuredPrincipal) { + try { + return org.apache.hadoop.security.SecurityUtil.getServerPrincipal(configuredPrincipal, InetAddress.getLocalHost().getCanonicalHostName()); + } catch (IOException e) { + throw new RuntimeException("Could not convert configured server principal: " + configuredPrincipal, e); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java index 79201b1..51d50a1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java @@ -30,8 +30,10 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.util.Base64; @@ -51,8 +53,8 @@ public final class SystemCredentials extends Credentials { private final TCredentials AS_THRIFT; - SystemCredentials(Instance instance) { - super(SYSTEM_PRINCIPAL, SystemToken.get(instance)); + SystemCredentials(Instance instance, String principal, AuthenticationToken token) { + super(principal, token); AS_THRIFT = super.toThrift(instance); } @@ -65,7 +67,16 @@ public final class SystemCredentials extends Credentials { public static SystemCredentials get(Instance instance) { check_permission(); - return new SystemCredentials(instance); + String principal = SYSTEM_PRINCIPAL; + AccumuloConfiguration conf = SiteConfiguration.getInstance(); + SaslConnectionParams saslParams = SaslConnectionParams.forConfig(conf); + if (null != saslParams) { + // Use the server's kerberos principal as the Accumulo principal. We could also unwrap the principal server-side, but the principal for SystemCredentials + // isnt' actually used anywhere, so it really doesn't matter. We can't include the kerberos principal in the SystemToken as it would break equality when + // different Accumulo servers are using different kerberos principals are their accumulo principal + principal = SecurityUtil.getServerPrincipal(conf.get(Property.GENERAL_KERBEROS_PRINCIPAL)); + } + return new SystemCredentials(instance, principal, SystemToken.get(instance)); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java new file mode 100644 index 0000000..61b8db0 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security.handler; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.util.Base64; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.security.SystemCredentials.SystemToken; +import org.apache.accumulo.server.thrift.UGIAssumingProcessor; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; + +public class KerberosAuthenticator implements Authenticator { + private static final Logger log = LoggerFactory.getLogger(KerberosAuthenticator.class); + + private static final Set<Class<? extends AuthenticationToken>> SUPPORTED_TOKENS = Sets.newHashSet(Arrays.<Class<? extends AuthenticationToken>> asList( + KerberosToken.class, SystemToken.class)); + private static final Set<String> SUPPORTED_TOKEN_NAMES = Sets.newHashSet(KerberosToken.class.getName(), SystemToken.class.getName()); + + private final ZKAuthenticator zkAuthenticator = new ZKAuthenticator(); + private String zkUserPath; + private final ZooCache zooCache; + + public KerberosAuthenticator() { + zooCache = new ZooCache(); + } + + @Override + public void initialize(String instanceId, boolean initialize) { + zkAuthenticator.initialize(instanceId, initialize); + zkUserPath = Constants.ZROOT + "/" + instanceId + "/users"; + } + + @Override + public boolean validSecurityHandlers(Authorizor auth, PermissionHandler pm) { + return true; + } + + private void createUserNodeInZk(String principal) throws KeeperException, InterruptedException { + synchronized (zooCache) { + zooCache.clear(); + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + zoo.putPrivatePersistentData(zkUserPath + "/" + principal, new byte[0], NodeExistsPolicy.FAIL); + } + } + + @Override + public void initializeSecurity(TCredentials credentials, String principal, byte[] token) throws AccumuloSecurityException, ThriftSecurityException { + try { + // remove old settings from zookeeper first, if any + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + synchronized (zooCache) { + zooCache.clear(); + if (zoo.exists(zkUserPath)) { + zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); + log.info("Removed " + zkUserPath + "/" + " from zookeeper"); + } + + principal = Base64.encodeBase64String(principal.getBytes(UTF_8)); + + // prep parent node of users with root username + zoo.putPersistentData(zkUserPath, principal.getBytes(UTF_8), NodeExistsPolicy.FAIL); + + createUserNodeInZk(principal); + } + } catch (KeeperException | InterruptedException e) { + log.error("Failed to initialize security", e); + throw new RuntimeException(e); + } + } + + @Override + public boolean authenticateUser(String principal, AuthenticationToken token) throws AccumuloSecurityException { + final String rpcPrincipal = UGIAssumingProcessor.currentPrincipal(); + + if (!rpcPrincipal.equals(principal)) { + // KerberosAuthenticator can't do perform this because KerberosToken is just a shim and doesn't contain the actual credentials + throw new AccumuloSecurityException(principal, SecurityErrorCode.AUTHENTICATOR_FAILED); + } + + // User is authenticated at the transport layer -- nothing extra is necessary + if (token instanceof KerberosToken) { + return true; + } + return false; + } + + @Override + public Set<String> listUsers() throws AccumuloSecurityException { + Set<String> base64Users = zkAuthenticator.listUsers(); + Set<String> readableUsers = new HashSet<>(); + for (String base64User : base64Users) { + readableUsers.add(new String(Base64.decodeBase64(base64User), UTF_8)); + } + return readableUsers; + } + + @Override + public synchronized void createUser(String principal, AuthenticationToken token) throws AccumuloSecurityException { + if (!(token instanceof KerberosToken)) { + throw new UnsupportedOperationException("Expected a KerberosToken but got a " + token.getClass().getSimpleName()); + } + + principal = Base64.encodeBase64String(principal.getBytes(UTF_8)); + + try { + createUserNodeInZk(principal); + } catch (KeeperException e) { + if (e.code().equals(KeeperException.Code.NODEEXISTS)) { + log.error("User already exists in ZooKeeper", e); + throw new AccumuloSecurityException(principal, SecurityErrorCode.USER_EXISTS, e); + } + log.error("Failed to create user in ZooKeeper", e); + throw new AccumuloSecurityException(principal, SecurityErrorCode.CONNECTION_ERROR, e); + } catch (InterruptedException e) { + log.error("Interrupted trying to create node for user", e); + throw new RuntimeException(e); + } + } + + @Override + public synchronized void dropUser(String user) throws AccumuloSecurityException { + user = Base64.encodeBase64String(user.getBytes(UTF_8)); + zkAuthenticator.dropUser(user); + } + + @Override + public void changePassword(String principal, AuthenticationToken token) throws AccumuloSecurityException { + throw new UnsupportedOperationException("Cannot change password with Kerberos authenticaton"); + } + + @Override + public synchronized boolean userExists(String user) throws AccumuloSecurityException { + user = Base64.encodeBase64String(user.getBytes(UTF_8)); + return zkAuthenticator.userExists(user); + } + + @Override + public Set<Class<? extends AuthenticationToken>> getSupportedTokenTypes() { + return SUPPORTED_TOKENS; + } + + @Override + public boolean validTokenClass(String tokenClass) { + return SUPPORTED_TOKEN_NAMES.contains(tokenClass); + } + +}
