abstractdog commented on code in PR #4882:
URL: https://github.com/apache/hive/pull/4882#discussion_r1759117586


##########
llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapUgiFactoryFactory.java:
##########
@@ -15,42 +15,115 @@
 package org.apache.hadoop.hive.llap.security;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.common.UgiFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** No Java application is complete until it has a FactoryFactory. */
 public class LlapUgiFactoryFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapUgiFactoryFactory.class);
+
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
-  private static class KerberosUgiFactory implements UgiFactory {
+  /**
+   * This class implements abstract logic for maintaining a single ugi for a 
specific user in a query.
+   * Subclasses implement createNewUgiInternal for creating a new ugi object 
when needed.
+   */
+  private static abstract class AbstractLlapUgiFactory implements UgiFactory {
+    Map<String, UserGroupInformation> ugis = new HashMap<>();
+
+    /**
+     * Creates an ugi for tasks in the same query and merges the credentials.
+     * This is valid to be done once per query: no vertex-level ugi and 
credentials are needed, both of them
+     * are the same within the same query.
+     * Regarding vertex user: LlapTaskCommunicator has a single "user" field,
+     * which is passed into the SignableVertexSpec.
+     * Regarding credentials: LlapTaskCommunicator creates 
SubmitWorkRequestProto instances,
+     * into which dag-level credentials are passed.
+     * The most performant way would be to use a single UGI for the same user 
in the daemon, but that's not possible,
+     * because the credentials can theoretically chance across queries.
+     */
+    @Override
+    public UserGroupInformation createUgi(String queryIdentifier, String user, 
Credentials credentials)
+        throws IOException {
+      // non-sync fast path for 99% of the calls in case of many tasks
+      if (ugis.containsKey(queryIdentifier)) {
+        UserGroupInformation ugi = ugis.get(queryIdentifier);
+        LOG.debug("Ugi ({}) already exists for queryIdentifier '{}'", ugi, 
queryIdentifier);
+        return ugi;
+      }
+      synchronized (this) {
+        // double-check and return if previous thread already created ugi for 
this query
+        if (ugis.containsKey(queryIdentifier)) {
+          return ugis.get(queryIdentifier);
+        }
+        UserGroupInformation ugi = createNewUgiInternal(queryIdentifier, user, 
credentials);
+        ugis.put(queryIdentifier, ugi);
+        return ugi;
+      }
+    }
+
+    @Override
+    public void closeFileSystemsForQuery(String queryIdentifier) {
+      LOG.debug("Closing all FileSystems for queryIdentifier '{}'", 
queryIdentifier);
+      try {
+        FileSystem.closeAllForUGI(ugis.get(queryIdentifier));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      ugis.remove(queryIdentifier);
+    }
+
+    protected abstract UserGroupInformation createNewUgiInternal(String 
queryIdentifier, String user,
+        Credentials credentials) throws IOException;
+  }
+
+  private static class KerberosUgiFactory extends AbstractLlapUgiFactory 
implements UgiFactory {
     private final UserGroupInformation baseUgi;
 
     public KerberosUgiFactory(String keytab, String principal) throws 
IOException {
       baseUgi = LlapUtil.loginWithKerberos(principal, keytab);
     }
 
     @Override
-    public UserGroupInformation createUgi() throws IOException {
+    protected UserGroupInformation createNewUgiInternal(String 
queryIdentifier, String user, Credentials credentials)
+        throws IOException {
       // Make sure the UGI is current.
       baseUgi.checkTGTAndReloginFromKeytab();
       // TODO: the only reason this is done this way is because we want unique 
Subject-s so that
       //       the FS.get gives different FS objects to different fragments.
-      // TODO: could we log in from ticket cache instead? no good method on 
UGI right now.
-      return SHIMS.cloneUgi(baseUgi);
+      // TODO: could we log in from ticket cache instead? no good method on 
UGI right now
+      UserGroupInformation ugi = SHIMS.cloneUgi(baseUgi);
+      ugi.addCredentials(credentials);

Review Comment:
   ack, will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to