ayushtkn commented on code in PR #3626:
URL: https://github.com/apache/hive/pull/3626#discussion_r1040866467


##########
llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java:
##########
@@ -117,10 +121,72 @@ private Token<LlapTokenIdentifier> 
extractToken(ByteString tokenBytes) throws IO
 
   private ByteString getTokenBytes(final String appId) throws IOException, 
ServiceException {
     assert clientInstance != null;
+    if (currentToken != null) {
+      try {
+        return fetchTokenWithCurrentToken(appId);
+      } catch (Exception e) {
+        LOG.error("IOException while getting delgation token, trying with 
kerberos login", e);

Review Comment:
   How do you know it is an IOE only, you are catching ``Exception`` here, We 
can have any RuntimeExceptions as well or so.
   I have doubts as well, should we just catch the checked Exceptions like IOE 
& ServiceException and let any other propagate rather than hiding? some NPE or 
so will get hidden
   
   Second. Typo. delgation -> delegation
   
   LOG level I think should be warn?



##########
llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java:
##########
@@ -117,10 +121,72 @@ private Token<LlapTokenIdentifier> 
extractToken(ByteString tokenBytes) throws IO
 
   private ByteString getTokenBytes(final String appId) throws IOException, 
ServiceException {
     assert clientInstance != null;
+    if (currentToken != null) {
+      try {
+        return fetchTokenWithCurrentToken(appId);
+      } catch (Exception e) {
+        LOG.error("IOException while getting delgation token, trying with 
kerberos login", e);
+        return fetchTokenWithKerberosLogin(appId);
+      }
+    } else {
+      LOG.info("currentToken is null, let's try to fetch one with kerberos 
login");
+      return fetchTokenWithKerberosLogin(appId);
+    }
+  }
+
+  /**
+   * Tries to fetch token by LlapManagementProtocolClientImpl from a daemon on 
the cluster.
+   * Assumes that we already have a valid currentToken for communicating with 
the daemons.
+   */
+  private ByteString fetchTokenWithCurrentToken(final String appId) throws 
IOException, ServiceException {
+    UserGroupInformation ugi = getUgiWithCurrentToken();
+    return getTokenWithUgi(appId, ugi);
+  }
+
+  /**
+   * Tries to fetch token by LlapManagementProtocolClientImpl with a kerberos 
login.
+   * 1. Logs in to kerberos
+   * 2. Obtains an authenticated ugi
+   * 3. Fetches a delegation token from a deamon.

Review Comment:
   typo deamon -> daemon



##########
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java:
##########
@@ -584,6 +581,17 @@ public void indicateError(Throwable t) {
         });
   }
 
+  /**
+   * This is a best effort action in case of an invalid LLAP_TOKEN. The 
communicator (LlapProtocolClientProxy) periodically
+   * refreshes tokens, but there is a relatively small time window (compared 
to the whole token maxLifetime),
+   * when the token is cancelled and it's not yet fetched from daemons. By the 
time we detect this problem, the current
+   * task attempt is already failed, but usually there are at least 3 task 
attempts before failing the task and dag, therefore
+   * fetching an LLAP_TOKEN synchronously here definitely solves the invalid 
token problem (as the next task attempt will use it).
+   */
+  private void handleInvalidToken(Throwable t) {
+    this.communicator.refreshToken();

Review Comment:
   You don't need the argument ``Throwable t``



##########
llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java:
##########
@@ -31,6 +31,9 @@
 public class LlapServerSecurityInfo extends SecurityInfo {
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapServerSecurityInfo.class);
 
+  public LlapServerSecurityInfo() {
+  }
+

Review Comment:
   why?



##########
llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java:
##########
@@ -30,6 +30,9 @@
 public class LlapTokenSelector implements TokenSelector<LlapTokenIdentifier> {
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapTokenSelector.class);
 
+  public LlapTokenSelector() {
+  }
+

Review Comment:
   Who is using this and the empty constructor above?



##########
llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java:
##########
@@ -94,6 +105,35 @@ public void sendUpdateFragment(final 
UpdateFragmentRequestProto request, final S
     queueRequest(new SendUpdateFragmentCallable(nodeId, request, callback));
   }
 
+  protected void initPeriodicTokenRefresh(Configuration conf) {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    long tokenRenewInterval =
+        HiveConf.getTimeVar(conf, 
ConfVars.LLAP_DELEGATION_TOKEN_RENEW_INTERVAL, TimeUnit.SECONDS);
+    // if the tokenRenewInterval is low (e.g. testing), let's use the half of 
it as interval instead of the constant
+    long interval = Math.min(tokenRenewInterval / 2, 
LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS);
+
+    LOG.info("Initializing periodic token refresh in AM, will run in every 
{}s", interval);
+    tokenClient = new LlapTokenClient(conf);
+
+    newTokenChecker.scheduleAtFixedRate(() -> {
+      fetchToken();
+    }, 0, interval, TimeUnit.SECONDS);

Review Comment:
   Can use lambda here:
   ```
       newTokenChecker.scheduleAtFixedRate(this::fetchToken, 0, interval, 
TimeUnit.SECONDS);
   ```



##########
llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java:
##########
@@ -94,6 +105,35 @@ public void sendUpdateFragment(final 
UpdateFragmentRequestProto request, final S
     queueRequest(new SendUpdateFragmentCallable(nodeId, request, callback));
   }
 
+  protected void initPeriodicTokenRefresh(Configuration conf) {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    long tokenRenewInterval =
+        HiveConf.getTimeVar(conf, 
ConfVars.LLAP_DELEGATION_TOKEN_RENEW_INTERVAL, TimeUnit.SECONDS);
+    // if the tokenRenewInterval is low (e.g. testing), let's use the half of 
it as interval instead of the constant
+    long interval = Math.min(tokenRenewInterval / 2, 
LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS);

Review Comment:
   I don't catch this logic.
   Why don't we just use the value ``LLAP_DELEGATION_TOKEN_RENEW_INTERVAL`` 
configured directly? Why divide by 2 and why are we making the max limit as 
```LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS```, why I can't have a bigger 
number than this? The value of this is also not configurable but hard-coded.
   
   So, can't we give the control to the user conf itself? rather than having 
our own logics here, and may be if consider in some situations this value isn't 
correct or too big, we can have some validation checks and throw an exception 
to make the user configure a correct value?



##########
llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java:
##########
@@ -94,6 +105,35 @@ public void sendUpdateFragment(final 
UpdateFragmentRequestProto request, final S
     queueRequest(new SendUpdateFragmentCallable(nodeId, request, callback));
   }
 
+  protected void initPeriodicTokenRefresh(Configuration conf) {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    long tokenRenewInterval =
+        HiveConf.getTimeVar(conf, 
ConfVars.LLAP_DELEGATION_TOKEN_RENEW_INTERVAL, TimeUnit.SECONDS);
+    // if the tokenRenewInterval is low (e.g. testing), let's use the half of 
it as interval instead of the constant
+    long interval = Math.min(tokenRenewInterval / 2, 
LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS);
+
+    LOG.info("Initializing periodic token refresh in AM, will run in every 
{}s", interval);
+    tokenClient = new LlapTokenClient(conf);
+
+    newTokenChecker.scheduleAtFixedRate(() -> {
+      fetchToken();
+    }, 0, interval, TimeUnit.SECONDS);
+  }
+
+  private synchronized void fetchToken() {
+    LOG.debug("Trying to fetch a new token...");
+    try {
+      Token<LlapTokenIdentifier> newToken =
+          tokenClient.withCurrentToken(new 
Token<LlapTokenIdentifier>(token)).getDelegationToken(null);
+      LOG.debug("Received new token: {}, old was: {}", newToken, token);

Review Comment:
   Is logging token safe?



##########
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java:
##########
@@ -403,9 +401,8 @@ public void indicateError(Throwable t) {
                   + " appId=" + 
currentQueryIdentifierProto.getApplicationIdString()
                   + " dagId=" + currentQueryIdentifierProto.getDagIndex()
                   + " to node " + node.getHost());
-              if (!processSendError(t)) {
-                callback.setError(null, t);
-              }
+              processSendError(t);

Review Comment:
   We have appId till here, why don't we pass this and use this in 
``fetchToken`` instead of passing ``null`` there?



##########
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java:
##########
@@ -190,31 +173,46 @@ public LlapTaskCommunicator(
     }
   }
 
+  @SuppressWarnings("unchecked")
+  private Token<LlapTokenIdentifier> getLlapToken() {
+    Token<LlapTokenIdentifier> token = null;
+    Credentials credentials = getContext().getAMCredentials();
+    if (credentials != null) {
+      token = (Token<LlapTokenIdentifier>) 
credentials.getToken(LlapTokenIdentifier.KIND_NAME);
+    }
+    Preconditions.checkState((token != null) == 
UserGroupInformation.isSecurityEnabled());
+    LOG.info("Task communicator with a token {}", token);

Review Comment:
   The Precondition is pretty deep, took me a second to understand, can of 
comment telling above, that there should be a token if security is enabled
   BTW. if security is disabled the log line will log the token and as as 
``null`` 



##########
llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenManager.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * LlapTokenManager is for renewing and recreating LLAP tokens on a cluster.
+ * See further details in implementations.
+ */
+public interface LlapTokenManager {
+  public static final long LLAP_TOKEN_CHECK_INTERVAL_IN_DEAMON_SECONDS = 300;

Review Comment:
   ```  public static final `` is by default, it isn't required :-) 
   Typo in deamon -> daemon 



##########
llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java:
##########
@@ -313,6 +311,7 @@ public GetTokenResponseProto 
getDelegationToken(RpcController controller,
     }
     ByteString bs = ByteString.copyFrom(out.toByteArray());
     GetTokenResponseProto response = 
GetTokenResponseProto.newBuilder().setToken(bs).build();
+

Review Comment:
   nit: revert not required



##########
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java:
##########
@@ -190,31 +173,46 @@ public LlapTaskCommunicator(
     }
   }
 
+  @SuppressWarnings("unchecked")
+  private Token<LlapTokenIdentifier> getLlapToken() {
+    Token<LlapTokenIdentifier> token = null;
+    Credentials credentials = getContext().getAMCredentials();
+    if (credentials != null) {
+      token = (Token<LlapTokenIdentifier>) 
credentials.getToken(LlapTokenIdentifier.KIND_NAME);
+    }
+    Preconditions.checkState((token != null) == 
UserGroupInformation.isSecurityEnabled());
+    LOG.info("Task communicator with a token {}", token);
+    return token;
+  }
+
   void setScheduler(LlapTaskSchedulerService peer) {
     this.scheduler = peer;
   }
 
   private static final String LLAP_TOKEN_NAME = 
LlapTokenIdentifier.KIND_NAME.toString();
 
-  /**
-   * @return true iff the error is fatal and we should give up on everything.
-   */
-  private boolean processSendError(Throwable t) {
+  private void processSendError(Throwable t) {
     Throwable cause = t;
     while (cause != null) {
-      if (cause instanceof RetriableException) return false;
-      if (((cause instanceof InvalidToken && cause.getMessage() != null)
-          || (cause instanceof RemoteException && cause.getCause() == null
-              && cause.getMessage() != null && 
cause.getMessage().contains("InvalidToken")))
-          && cause.getMessage().contains(LLAP_TOKEN_NAME)) {
-        break;
+      if (isInvalidTokenError(cause)) {
+        handleInvalidToken(cause);
+        return;
       }
       cause = cause.getCause();
     }
-    if (cause == null) return false;
-    LOG.error("Reporting fatal error - LLAP token appears to be invalid.", t);
-    getContext().reportError(ServicePluginErrorDefaults.OTHER_FATAL, 
cause.getMessage(), null);
-    return true;
+  }
+
+  private boolean isInvalidTokenError(Throwable cause) {
+    LOG.debug("Checking for invalid token error, cause: {}, cause.getCause(): 
{}", cause, cause.getCause());
+    /*
+     * The lastest message discovered (while doing HIVE-26569) for a 
RemoteException is:
+     * "Current (LLAP_TOKEN; LLAP_TOKEN owner=hive/***, renewer=hive, 
realUser=, issueDate=1670317803579, maxDate=1671527403579,
+     * sequenceNumber=297, masterKeyId=296, cluster ***, app ID , signing 
false) can't be found in cache"
+     */
+    return cause.getMessage().contains(LLAP_TOKEN_NAME)

Review Comment:
   can ``cause.getMessage`` be ``null`` in any case? if so better we have a 
``null`` check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to