joshelser commented on a change in pull request #4019:
URL: https://github.com/apache/hbase/pull/4019#discussion_r781586954



##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/security/oauthbearer/internals/OAuthBearerSaslClient.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.oauthbearer.internals;
+
+import static 
org.apache.hadoop.hbase.security.token.OAuthBearerTokenUtil.OAUTHBEARER_MECHANISM;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslException;
+import org.apache.hadoop.hbase.exceptions.IllegalSaslStateException;
+import org.apache.hadoop.hbase.security.auth.AuthenticateCallbackHandler;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerToken;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code SaslClient} implementation for SASL/OAUTHBEARER in Kafka. This
+ * implementation requires an instance of {@code AuthenticateCallbackHandler}
+ * that can handle an instance of {@link OAuthBearerTokenCallback} and return
+ * the {@link OAuthBearerToken} generated by the {@code login()} event on the
+ * {@code LoginContext}.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc6750#section-2.1";>RFC 6750 
Section 2.1</a>
+ *
+ *   This class has been copy-and-pasted from Kafka codebase.
+ */
[email protected]
+public class OAuthBearerSaslClient implements SaslClient {
+  static final byte BYTE_CONTROL_A = (byte) 0x01;
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuthBearerSaslClient.class);
+  private final CallbackHandler callbackHandler;
+
+  enum State {
+    SEND_CLIENT_FIRST_MESSAGE, RECEIVE_SERVER_FIRST_MESSAGE, 
RECEIVE_SERVER_MESSAGE_AFTER_FAILURE,
+    COMPLETE, FAILED
+  }
+
+  private State state;
+
+  public OAuthBearerSaslClient(AuthenticateCallbackHandler callbackHandler) {
+    this.callbackHandler = Objects.requireNonNull(callbackHandler);
+    setState(State.SEND_CLIENT_FIRST_MESSAGE);
+  }
+
+  public CallbackHandler callbackHandler() {
+    return callbackHandler;
+  }
+
+  @Override
+  public String getMechanismName() {
+    return OAUTHBEARER_MECHANISM;
+  }
+
+  @Override
+  public boolean hasInitialResponse() {
+    return true;
+  }
+
+  @Override
+  public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
+    try {
+      OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+      switch (state) {
+        case SEND_CLIENT_FIRST_MESSAGE:
+          if (challenge != null && challenge.length != 0) {
+            throw new SaslException("Expected empty challenge");
+          }
+          callbackHandler().handle(new Callback[] {callback});
+          setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
+          return new 
OAuthBearerClientInitialResponse(callback.token().value()).toBytes();
+        case RECEIVE_SERVER_FIRST_MESSAGE:
+          if (challenge != null && challenge.length != 0) {
+            String jsonErrorResponse = new String(challenge, 
StandardCharsets.UTF_8);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Sending %%x01 response to server after receiving an 
error: {}",
+                jsonErrorResponse);
+            }
+            setState(State.RECEIVE_SERVER_MESSAGE_AFTER_FAILURE);
+            return new byte[] {BYTE_CONTROL_A};
+          }
+          callbackHandler().handle(new Callback[] {callback});
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Successfully authenticated as {}", 
callback.token().principalName());
+          }
+          setState(State.COMPLETE);
+          return null;
+        default:
+          throw new IllegalSaslStateException("Unexpected challenge in Sasl 
client state " + state);
+      }
+    } catch (SaslException e) {
+      setState(State.FAILED);
+      throw e;
+    } catch (IOException | UnsupportedCallbackException e) {
+      setState(State.FAILED);
+      throw new SaslException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public boolean isComplete() {
+    return state == State.COMPLETE;
+  }
+
+  @Override
+  public byte[] unwrap(byte[] incoming, int offset, int len) {
+    if (!isComplete()) {
+      throw new IllegalStateException("Authentication exchange has not 
completed");
+    }
+    return Arrays.copyOfRange(incoming, offset, offset + len);
+  }
+
+  @Override
+  public byte[] wrap(byte[] outgoing, int offset, int len) {
+    if (!isComplete()) {
+      throw new IllegalStateException("Authentication exchange has not 
completed");
+    }
+    return Arrays.copyOfRange(outgoing, offset, offset + len);
+  }
+
+  @Override
+  public Object getNegotiatedProperty(String propName) {
+    if (!isComplete()) {
+      throw new IllegalStateException("Authentication exchange has not 
completed");
+    }
+    return null;
+  }
+
+  @Override
+  public void dispose() {
+  }
+
+  private void setState(State state) {
+    LOG.debug("Setting SASL/{} client state to {}", OAUTHBEARER_MECHANISM, 
state);
+    this.state = state;
+  }
+
+  public static String[] mechanismNamesCompatibleWithPolicy(Map<String, ?> 
props) {
+    return props != null && 
"true".equals(String.valueOf(props.get(Sasl.POLICY_NOPLAINTEXT)))
+      ? new String[] {}
+      : new String[] { OAUTHBEARER_MECHANISM};
+  }
+
+  public static class OAuthBearerSaslClientFactory implements 
SaslClientFactory {
+    @Override
+    public SaslClient createSaslClient(String[] mechanisms, String 
authorizationId, String protocol,
+      String serverName, Map<String, ?> props, CallbackHandler 
callbackHandler) {
+      String[] mechanismNamesCompatibleWithPolicy = getMechanismNames(props);
+      for (String mechanism : mechanisms) {
+        for (String s : mechanismNamesCompatibleWithPolicy) {
+          if (s.equals(mechanism)) {
+            if (!(Objects.requireNonNull(callbackHandler) instanceof 
AuthenticateCallbackHandler)) {
+              throw new IllegalArgumentException(
+                String.format("Callback handler must be castable to %s: %s",
+                  AuthenticateCallbackHandler.class.getName(),
+                  callbackHandler.getClass().getName()));
+            }
+            return new OAuthBearerSaslClient((AuthenticateCallbackHandler) 
callbackHandler);
+          }
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public String[] getMechanismNames(Map<String, ?> props) {
+      return OAuthBearerSaslClient.mechanismNamesCompatibleWithPolicy(props);

Review comment:
       If this code is being executed, it is because someone explicitly 
activated it via configuration. We should throw a RuntimeException if the 
configuration disallows us to use this implementation (e.g. if 
`Sasl.POLICY_NOPLAINTEXT == true`). We should fail in this manner until we have 
RPC encryption tested/working.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.oauthbearer.internals;
+
+import static 
org.apache.hadoop.hbase.security.token.OAuthBearerTokenUtil.OAUTHBEARER_MECHANISM;
+import java.security.Provider;
+import java.security.Security;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class OAuthBearerSaslServerProvider extends Provider {

Review comment:
       Same here -- I don't think we need to register this provider.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/security/oauthbearer/internals/OAuthBearerSaslClient.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.oauthbearer.internals;
+
+import static 
org.apache.hadoop.hbase.security.token.OAuthBearerTokenUtil.OAUTHBEARER_MECHANISM;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslException;
+import org.apache.hadoop.hbase.exceptions.IllegalSaslStateException;
+import org.apache.hadoop.hbase.security.auth.AuthenticateCallbackHandler;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerToken;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code SaslClient} implementation for SASL/OAUTHBEARER in Kafka. This
+ * implementation requires an instance of {@code AuthenticateCallbackHandler}
+ * that can handle an instance of {@link OAuthBearerTokenCallback} and return
+ * the {@link OAuthBearerToken} generated by the {@code login()} event on the
+ * {@code LoginContext}.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc6750#section-2.1";>RFC 6750 
Section 2.1</a>
+ *
+ *   This class has been copy-and-pasted from Kafka codebase.
+ */
[email protected]
+public class OAuthBearerSaslClient implements SaslClient {

Review comment:
       I wonder if we just need to extend the HBase base SASL client in order 
to get the RPC encryption. That's probably why it didn't work OOTB.

##########
File path: 
hbase-common/src/main/java/org/apache/hadoop/hbase/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.oauthbearer.internals;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.security.sasl.SaslException;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerStringUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OAuthBearer SASL client's initial message to the server.
+ *
+ * This class has been copy-and-pasted from Kafka codebase.
+ */
[email protected]
+public class OAuthBearerClientInitialResponse {

Review comment:
       I'm not a fan of the regex-based parsing happening here. I think we 
could just replace this with some protobuf code which is what the rest of HBase 
RPCs use consistently.
   
   If we are going the feature branch route, I'm happy to defer making that 
change until after we have some known-good test cases to start with.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/OAuthBearerSaslAuthenticationProvider.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import static 
org.apache.hadoop.hbase.security.token.OAuthBearerTokenUtil.TOKEN_KIND;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Base client for client/server implementations for the OAuth Bearer (JWT) 
token auth'n method.
+ */
[email protected]
+public class OAuthBearerSaslAuthenticationProvider extends 
BuiltInSaslAuthenticationProvider {
+
+  public static final SaslAuthMethod SASL_AUTH_METHOD = new SaslAuthMethod(
+      "OAUTHBEARER", (byte)83, "OAUTHBEARER", 
UserGroupInformation.AuthenticationMethod.TOKEN);
+
+  @Override
+  public SaslAuthMethod getSaslAuthMethod() {
+    return SASL_AUTH_METHOD;
+  }

Review comment:
       For clarity, this SaslAuthMethod (HBase class) is how HBase's RPC 
classes are figuring out the correct client-side logic to use given a token 
(secret material).

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.oauthbearer.internals;
+
+import static 
org.apache.hadoop.hbase.security.token.OAuthBearerTokenUtil.OAUTHBEARER_MECHANISM;
+import java.security.Provider;
+import java.security.Security;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class OAuthBearerSaslClientProvider extends Provider {
+  private static final long serialVersionUID = 1L;
+
+  protected OAuthBearerSaslClientProvider() {
+    super("SASL/OAUTHBEARER Client Provider", 1.0, "SASL/OAUTHBEARER Client 
Provider for HBase");
+    put("SaslClientFactory." + OAUTHBEARER_MECHANISM,
+      OAuthBearerSaslClient.OAuthBearerSaslClientFactory.class.getName());
+  }
+
+  public static void initialize() {
+    Security.addProvider(new OAuthBearerSaslClientProvider());

Review comment:
       I don't think we actually need to register a provider with the JCA 
security provider framework because we don't actually use it. IIRC, Kafka 
relies on JAAS to automatically perform the logins for them, whereas HBase 
explicitly sets up the implementation to use (via the 
AuthenticationProviderSelector). I think we can drop this class entirely.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/OAuthBearerTokenUtil.java
##########
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.token;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import javax.security.auth.Subject;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.hadoop.hbase.security.oauthbearer.internals.OAuthBearerSaslClientProvider;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods for obtaining OAuthBearer / JWT authentication tokens.
+ */
[email protected]
+public final class OAuthBearerTokenUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuthBearerTokenUtil.class);
+  public static final String OAUTHBEARER_MECHANISM = "OAUTHBEARER";
+  public static final String TOKEN_KIND = "JWT_AUTH_TOKEN";
+
+  static {
+    OAuthBearerSaslClientProvider.initialize(); // not part of public API
+    LOG.info("OAuthBearer SASL client provider has been initialized");
+  }
+
+  private OAuthBearerTokenUtil() {  }
+
+  /**
+   * Add token to user's subject private credentials and a hint to provider 
selector
+   * to correctly select OAuthBearer SASL provider.
+   */
+  public static void addTokenForUser(User user, String encodedToken) {
+    user.addToken(new Token<>(null, null, new Text(TOKEN_KIND), null));
+    user.runAs(new PrivilegedAction<Object>() {
+      @Override public Object run() {
+        Subject subject = Subject.getSubject(AccessController.getContext());
+        OAuthBearerToken jwt = new OAuthBearerToken() {
+          @Override public String value() {
+            return encodedToken;
+          }
+
+          @Override public long lifetimeMs() {
+            return 0;
+          }
+
+          @Override public String principalName() {
+            return null;

Review comment:
       Isn't `User user` our principal?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/OAuthBearerSaslClientAuthenticationProvider.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.provider;
+
+import static 
org.apache.hadoop.hbase.security.token.OAuthBearerTokenUtil.OAUTHBEARER_MECHANISM;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.AccessController;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.auth.AuthenticateCallbackHandler;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerToken;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerTokenCallback;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
+
[email protected]
+public class OAuthBearerSaslClientAuthenticationProvider
+    extends OAuthBearerSaslAuthenticationProvider
+    implements SaslClientAuthenticationProvider {
+
+  @Override
+  public SaslClient createClient(Configuration conf, InetAddress serverAddr,
+                                 SecurityInfo securityInfo, Token<? extends 
TokenIdentifier> token,
+                                 boolean fallbackAllowed,
+                                 Map<String, String> saslProps) throws 
IOException {
+    AuthenticateCallbackHandler callbackHandler = new 
OAuthBearerSaslClientCallbackHandler();
+    callbackHandler.configure(conf, getSaslAuthMethod().getSaslMechanism(), 
saslProps);
+    return Sasl.createSaslClient(new String[] { 
getSaslAuthMethod().getSaslMechanism() }, null,
+        null, SaslUtil.SASL_DEFAULT_REALM, saslProps, callbackHandler);
+  }
+
+  public static class OAuthBearerSaslClientCallbackHandler implements 
AuthenticateCallbackHandler {
+    private static final Logger LOG =
+      LoggerFactory.getLogger(OAuthBearerSaslClientCallbackHandler.class);
+    private boolean configured = false;
+
+    @Override public void configure(Configuration configs, String 
saslMechanism,
+      Map<String, String> saslProps) {
+      if (!OAUTHBEARER_MECHANISM.equals(saslMechanism)) {
+        throw new IllegalArgumentException(
+          String.format("Unexpected SASL mechanism: %s", saslMechanism));
+      }
+      this.configured = true;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
+      if (!configured) {
+        throw new IllegalStateException(
+          "OAuthBearerSaslClientCallbackHandler handler must be configured 
first.");
+      }
+
+      for (Callback callback : callbacks) {
+        if (callback instanceof OAuthBearerTokenCallback) {
+          handleCallback((OAuthBearerTokenCallback) callback);
+        } else {
+          throw new UnsupportedCallbackException(callback);
+        }
+      }
+    }
+
+    private void handleCallback(OAuthBearerTokenCallback callback) throws 
IOException {
+      if (callback.token() != null) {
+        throw new IllegalArgumentException("Callback had a token already");
+      }
+      Subject subject = Subject.getSubject(AccessController.getContext());
+      Set<OAuthBearerToken> privateCredentials = subject != null
+        ? subject.getPrivateCredentials(OAuthBearerToken.class)
+        : Collections.emptySet();
+      if (privateCredentials.size() == 0) {
+        throw new IOException("No OAuth Bearer tokens in Subject's private 
credentials");
+      }
+      if (privateCredentials.size() == 1) {
+        LOG.debug("Found 1 OAuthBearer token");
+        callback.token(privateCredentials.iterator().next());
+      } else {
+        /*
+         * There a very small window of time upon token refresh (on the order 
of milliseconds)
+         * where both an old and a new token appear on the Subject's private 
credentials.
+         * Rather than implement a lock to eliminate this window, we will deal 
with it by
+         * checking for the existence of multiple tokens and choosing the one 
that has the
+         * longest lifetime.  It is also possible that a bug could cause 
multiple tokens to
+         * exist (e.g. KAFKA-7902), so dealing with the unlikely possibility 
that occurs
+         * during normal operation also allows us to deal more robustly with 
potential bugs.
+         */
+        SortedSet<OAuthBearerToken> sortedByLifetime =
+          new TreeSet<>(
+            new Comparator<OAuthBearerToken>() {
+              @Override
+              public int compare(OAuthBearerToken o1, OAuthBearerToken o2) {
+                return Long.compare(o1.lifetimeMs(), o2.lifetimeMs());
+              }
+            });
+        sortedByLifetime.addAll(privateCredentials);
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Found {} OAuth Bearer tokens in Subject's private 
credentials; " +
+              "the oldest expires at {}, will use the newest, which expires at 
{}",
+            sortedByLifetime.size(), new 
Date(sortedByLifetime.first().lifetimeMs()),
+            new Date(sortedByLifetime.last().lifetimeMs()));
+        }
+        callback.token(sortedByLifetime.last());
+      }

Review comment:
       It would be nice to move choosing the private credential to its own 
method and simplifying this to be:
   ```
   callback.token(choosePrivateCredential(privateCredentials));
   ```

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/OAuthBearerTokenUtil.java
##########
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.token;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import javax.security.auth.Subject;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.hadoop.hbase.security.oauthbearer.internals.OAuthBearerSaslClientProvider;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods for obtaining OAuthBearer / JWT authentication tokens.
+ */
[email protected]
+public final class OAuthBearerTokenUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OAuthBearerTokenUtil.class);
+  public static final String OAUTHBEARER_MECHANISM = "OAUTHBEARER";
+  public static final String TOKEN_KIND = "JWT_AUTH_TOKEN";
+
+  static {
+    OAuthBearerSaslClientProvider.initialize(); // not part of public API
+    LOG.info("OAuthBearer SASL client provider has been initialized");
+  }
+
+  private OAuthBearerTokenUtil() {  }
+
+  /**
+   * Add token to user's subject private credentials and a hint to provider 
selector
+   * to correctly select OAuthBearer SASL provider.
+   */
+  public static void addTokenForUser(User user, String encodedToken) {
+    user.addToken(new Token<>(null, null, new Text(TOKEN_KIND), null));
+    user.runAs(new PrivilegedAction<Object>() {
+      @Override public Object run() {
+        Subject subject = Subject.getSubject(AccessController.getContext());
+        OAuthBearerToken jwt = new OAuthBearerToken() {
+          @Override public String value() {
+            return encodedToken;
+          }
+
+          @Override public long lifetimeMs() {
+            return 0;

Review comment:
       Can we determine the lifetime of the JWT while holding it? (without 
asking the authorization server)

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java
##########
@@ -33,7 +33,7 @@ public AccessDeniedException() {
   }
 
   public AccessDeniedException(Class<?> clazz, String s) {
-    super( "AccessDenied [" + clazz.getName() + "]: " + s);
+    super("AccessDenied [" + clazz.getName() + "]: " + s);

Review comment:
       Please revert this change for this commit.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/security/oauthbearer/internals/OAuthBearerSaslServer.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.oauthbearer.internals;
+
+import static 
org.apache.hadoop.hbase.security.token.OAuthBearerTokenUtil.OAUTHBEARER_MECHANISM;
+import com.nimbusds.jose.shaded.json.JSONObject;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.exceptions.SaslAuthenticationException;
+import org.apache.hadoop.hbase.security.auth.AuthenticateCallbackHandler;
+import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code SaslServer} implementation for SASL/OAUTHBEARER in Kafka. An instance
+ * of {@link OAuthBearerToken} is available upon successful authentication via
+ * the negotiated property "{@code OAUTHBEARER.token}"; the token could be used
+ * in a custom authorizer (to authorize based on JWT claims rather than ACLs,
+ * for example).
+ */
[email protected]
+public class OAuthBearerSaslServer implements SaslServer {
+  public static final Logger LOG = 
LoggerFactory.getLogger(OAuthBearerSaslServer.class);
+  private static final String NEGOTIATED_PROPERTY_KEY_TOKEN = 
OAUTHBEARER_MECHANISM + ".token";
+  private static final String INTERNAL_ERROR_ON_SERVER =
+    "Authentication could not be performed due to an internal error on the 
server";
+  static final String CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY =
+    "CREDENTIAL.LIFETIME.MS";
+
+  private final AuthenticateCallbackHandler callbackHandler;
+
+  private boolean complete;
+  private OAuthBearerToken tokenForNegotiatedProperty = null;
+  private String errorMessage = null;
+
+  public OAuthBearerSaslServer(CallbackHandler callbackHandler) {
+    if (!(callbackHandler instanceof AuthenticateCallbackHandler)) {
+      throw new IllegalArgumentException(
+        String.format("Callback handler must be castable to %s: %s",
+          AuthenticateCallbackHandler.class.getName(), 
callbackHandler.getClass().getName()));
+    }
+    this.callbackHandler = (AuthenticateCallbackHandler) callbackHandler;
+  }
+
+  /**
+   * @throws SaslAuthenticationException
+   *             if access token cannot be validated
+   *             <p>
+   *             <b>Note:</b> This method may throw
+   *             {@link SaslAuthenticationException} to provide custom error
+   *             messages to clients. But care should be taken to avoid 
including
+   *             any information in the exception message that should not be
+   *             leaked to unauthenticated clients. It may be safer to throw
+   *             {@link SaslException} in some cases so that a standard error
+   *             message is returned to clients.
+   *             </p>
+   */
+  @Override
+  public byte[] evaluateResponse(byte[] response)
+    throws SaslException, SaslAuthenticationException {
+    try {
+      if (response.length == 1 && response[0] == 
OAuthBearerSaslClient.BYTE_CONTROL_A &&

Review comment:
       Using protobuf will also have the benefit of being able to strongly 
represent state (rather than relying on special bytes, like this is doing).

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/security/oauthbearer/internals/knox/OAuthBearerSignedJwtTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.oauthbearer.internals.knox;

Review comment:
       Not necessarily unique to Knox, is it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to