Repository: hive
Updated Branches:
  refs/heads/master d3a5f20b4 -> f9d1b6ab7


HIVE-13151 : Clean up UGI objects in FileSystem cache for transactions (Wei 
Zheng, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f9d1b6ab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f9d1b6ab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f9d1b6ab

Branch: refs/heads/master
Commit: f9d1b6ab77ab15b8337c17fbe38557c1f7b5ce58
Parents: d3a5f20
Author: Wei Zheng <w...@apache.org>
Authored: Thu Mar 24 17:29:59 2016 -0700
Committer: Wei Zheng <w...@apache.org>
Committed: Thu Mar 24 17:29:59 2016 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/HiveEndPoint.java   | 11 +++++
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |  5 +++
 .../hive/ql/txn/compactor/CompactorThread.java  |  5 +++
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  9 +++-
 .../hadoop/hive/ql/txn/compactor/Worker.java    |  8 +++-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 47 ++++++++++++++++++++
 6 files changed, 82 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f9d1b6ab/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 4c77842..baeafad 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.hive.hcatalog.streaming;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.cli.CliSessionState;
@@ -342,6 +343,11 @@ public class HiveEndPoint {
                 return null;
               }
             } );
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi, 
exception);
+        }
       } catch (IOException e) {
         LOG.error("Error closing connection to " + endPt, e);
       } catch (InterruptedException e) {
@@ -937,6 +943,11 @@ public class HiveEndPoint {
                   }
                 }
         );
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi, 
exception);
+        }
       } catch (IOException e) {
         throw new ImpersonationFailed("Failed closing Txn Batch as user '" + 
username +
                 "' on  endPoint :" + endPt, e);

http://git-wip-us.apache.org/repos/asf/hive/blob/f9d1b6ab/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 9ffeaec..4c31a49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -272,6 +272,11 @@ public class Cleaner extends CompactorThread {
             return null;
           }
         });
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi, 
exception + " for " +
+              ci.getFullPartitionName());        }
       }
       txnHandler.markCleaned(ci);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f9d1b6ab/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 8495c66..4d6e24e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -174,6 +174,11 @@ abstract class CompactorThread extends Thread implements 
MetaStoreThread {
           return null;
         }
       });
+      try {
+        FileSystem.closeAllForUGI(ugi);
+      } catch (IOException exception) {
+        LOG.error("Could not clean up file-system handles for UGI: " + ugi, 
exception);
+      }
 
       if (wrapper.size() == 1) {
         LOG.debug("Running job as " + wrapper.get(0));

http://git-wip-us.apache.org/repos/asf/hive/blob/f9d1b6ab/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 916d9dc..98ebf53 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -226,12 +226,19 @@ public class Initiator extends CompactorThread {
       LOG.info("Going to initiate as user " + runAs);
       UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
         UserGroupInformation.getLoginUser());
-      return ugi.doAs(new PrivilegedExceptionAction<CompactionType>() {
+      CompactionType compactionType = ugi.doAs(new 
PrivilegedExceptionAction<CompactionType>() {
         @Override
         public CompactionType run() throws Exception {
           return determineCompactionType(ci, txns, sd);
         }
       });
+      try {
+        FileSystem.closeAllForUGI(ugi);
+      } catch (IOException exception) {
+        LOG.error("Could not clean up file-system handles for UGI: " + ugi, 
exception + " for " +
+            ci.getFullPartitionName());
+      }
+      return compactionType;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f9d1b6ab/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index adffa8c..e21ca27 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -34,8 +35,6 @@ import 
org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -173,6 +172,11 @@ public class Worker extends CompactorThread {
                 return null;
               }
             });
+            try {
+              FileSystem.closeAllForUGI(ugi);
+            } catch (IOException exception) {
+              LOG.error("Could not clean up file-system handles for UGI: " + 
ugi, exception + " for " +
+                  ci.getFullPartitionName());            }
           }
           txnHandler.markCompacted(ci);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f9d1b6ab/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 0786c21..04c1d17 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -51,10 +51,13 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -546,6 +549,50 @@ public class TestTxnCommands2 {
     Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded);
     Assert.assertEquals("Unexpected num total5", 
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, 
cbs.total);
   }
+
+  /**
+   * Make sure there's no FileSystem$Cache$Key leak due to UGI use
+   * @throws Exception
+   */
+  @Test
+  public void testFileSystemUnCaching() throws Exception {
+    int cacheSizeBefore;
+    int cacheSizeAfter;
+
+    // get the size of cache BEFORE
+    cacheSizeBefore = getFileSystemCacheSize();
+
+    // Insert a row to ACID table
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
+
+    // Perform a major compaction
+    runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'major'");
+    runWorker(hiveConf);
+    runCleaner(hiveConf);
+
+    // get the size of cache AFTER
+    cacheSizeAfter = getFileSystemCacheSize();
+
+    Assert.assertEquals(cacheSizeBefore, cacheSizeAfter);
+  }
+
+  private int getFileSystemCacheSize() throws Exception {
+    try {
+      Field cache = FileSystem.class.getDeclaredField("CACHE");
+      cache.setAccessible(true);
+      Object o = cache.get(null); // FileSystem.CACHE
+
+      Field mapField = o.getClass().getDeclaredField("map");
+      mapField.setAccessible(true);
+      Map map = (HashMap)mapField.get(o); // FileSystem.CACHE.map
+
+      return map.size();
+    } catch (NoSuchFieldException e) {
+      System.out.println(e);
+    }
+    return 0;
+  }
+
   private static class CompactionsByState {
     private int attempted;
     private int failed;

Reply via email to