Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1379#discussion_r199975945
  
    --- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
    @@ -89,32 +188,103 @@ public Boolean run() throws IOException, 
ServletException {
                 };
     
                 // create proxy user UGI from the UGI of the logged in user 
and execute the servlet chain as that user
    -            UserGroupInformation proxyUGI = null;
    +            TimedProxyUGI timedProxyUGI = getTimedProxyUGI(user, session);
                 try {
    -                LOG.debug("Creating proxy user = " + user);
    -                proxyUGI = UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
    -                proxyUGI.doAs(action);
    +                timedProxyUGI.proxyUGI.doAs(action);
                 } catch (UndeclaredThrowableException ute) {
                     // unwrap the real exception thrown by the action
                     throw new ServletException(ute.getCause());
                 } catch (InterruptedException ie) {
                     throw new ServletException(ie);
    -            } finally {
    -                try {
    -                    if (proxyUGI != null) {
    -                        LOG.debug("Closing FileSystem for proxy user = " + 
proxyUGI.getUserName());
    -                        FileSystem.closeAllForUGI(proxyUGI);
    -                    }
    -                } catch (Throwable t) {
    -                    LOG.warn("Error closing FileSystem for proxy user = " 
+ proxyUGI.getUserName());
    -                }
    +            }
    +            finally {
    +                release(timedProxyUGI, fragmentIndex, fragmentCount);
                 }
             } else {
                 // no user impersonation is configured
                 chain.doFilter(request, response);
             }
         }
     
    +   private TimedProxyUGI getTimedProxyUGI(String user, 
SegmentTransactionId session) throws IOException {
    +        synchronized (session.segmentTransactionId.intern()) {
    +            TimedProxyUGI timedProxyUGI = cache.get(session);
    +            if (timedProxyUGI == null || timedProxyUGI.getDelayMillis() < 
0) {
    +                cleanup();
    +                LOG.info(session.toString() + " Creating proxy user = " + 
user);
    +                UserGroupInformation proxyUGI =
    +                        UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
    +                timedProxyUGI = new TimedProxyUGI(proxyUGI, session);
    +                delayQueue.offer(timedProxyUGI);
    +                cache.put(session, timedProxyUGI);
    +            } else {
    +                timedProxyUGI.incrementCounter();
    +            }
    +            return timedProxyUGI;
    +        }
    +    }
    +
    +    private void release(TimedProxyUGI timedProxyUGI, Integer 
fragmentIndex, Integer fragmentCount) {
    +        synchronized (timedProxyUGI.session.segmentTransactionId.intern()) 
{
    +            timedProxyUGI.resetTime();
    +            timedProxyUGI.decrementCounter();
    +            if (fragmentIndex != null && 
fragmentCount.equals(fragmentIndex))
    +                closeUGI(timedProxyUGI);
    +        }
    +    }
    +
    +    private void cleanup() {
    +        TimedProxyUGI timedProxyUGI = delayQueue.poll();
    +        while (timedProxyUGI != null) {
    +            closeUGI(timedProxyUGI);
    +            LOG.info(timedProxyUGI.session.toString() + " Delay Queue Size 
= " + delayQueue.size());
    +            timedProxyUGI = delayQueue.poll();
    +        }
    +    }
    +
    +    private void closeUGI(TimedProxyUGI timedProxyUGI) {
    +        synchronized (timedProxyUGI.session.segmentTransactionId.intern()) 
{
    --- End diff --
    
    this can deadlock -- if 2 cleanup attempts happen at the same time for 
session A and session B -- session A comes first and gets a lock on, say, 
"seg1:tx1" in release() method and then comes here and tries to get a lock on 
session B "seg2:tx1", while session B would get that loc already and will, in 
turn try to get lock on session A. This might happen when new requests for both 
sessions arrive simultaneously after a delay when their previous UGIs have 
expired. Granted the probability of this is low, but non-zero, meaning this is 
dangerous and cannot be ignored. In other words, to prevent deadlocks, do not 
acquire new locks while holding other locks if another thread can do the same 
in reverse order, which is exactly the case here.


---

Reply via email to