Repository: hive
Updated Branches:
  refs/heads/master 41cb67e61 -> 925f19552


HIVE-13754: Fix resource leak in HiveClientCache (Chris Drome, via Mithun 
Radhakrishnan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/925f1955
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/925f1955
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/925f1955

Branch: refs/heads/master
Commit: 925f195523aba1e2bfbfebf06c6b87fafa4dbd67
Parents: 41cb67e
Author: Mithun RK <mith...@yahoo-inc.com>
Authored: Wed Aug 10 10:10:32 2016 -0700
Committer: Mithun RK <mith...@yahoo-inc.com>
Committed: Wed Aug 10 10:11:38 2016 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/common/HCatConstants.java     |  10 +
 .../apache/hive/hcatalog/common/HCatUtil.java   |  13 +-
 .../hive/hcatalog/common/HiveClientCache.java   | 219 ++++++++++++++-----
 .../hcatalog/common/TestHiveClientCache.java    |  14 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java |   2 +
 .../hive/metastore/RetryingMetaStoreClient.java |  18 +-
 .../hive/metastore/annotation/NoReconnect.java  |  29 +++
 7 files changed, 233 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
index d165e7e..72930eb 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
@@ -61,11 +61,21 @@ public final class HCatConstants {
 
   // hcatalog specific configurations, that can be put in hive-site.xml
   public static final String HCAT_HIVE_CLIENT_EXPIRY_TIME = 
"hcatalog.hive.client.cache.expiry.time";
+
   // config parameter that suggests to hcat that metastore clients not be 
cached - default is false
   // this parameter allows highly-parallel hcat usescases to not gobble up too 
many connections that
   // sit in the cache, while not in use.
   public static final String HCAT_HIVE_CLIENT_DISABLE_CACHE = 
"hcatalog.hive.client.cache.disabled";
 
+  // Indicates the initial capacity of the cache.
+  public static final String HCAT_HIVE_CLIENT_CACHE_INITIAL_CAPACITY = 
"hcatalog.hive.client.cache.initial.capacity";
+
+  // Indicates the maximum capacity of the cache. Minimum value should be the 
number of threads.
+  public static final String HCAT_HIVE_CLIENT_CACHE_MAX_CAPACITY = 
"hcatalog.hive.client.cache.max.capacity";
+
+  // Indicates whether cache statistics should be collected.
+  public static final String HCAT_HIVE_CLIENT_CACHE_STATS_ENABLED = 
"hcatalog.hive.client.cache.stats.enabled";
+
   private HCatConstants() { // restrict instantiation
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
index f3bfcfa..8b927af 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java
@@ -573,15 +573,10 @@ public class HCatUtil {
    */
   @Deprecated
   public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf) throws 
MetaException, IOException {
-    IMetaStoreClient imsc = getHiveMetastoreClient(hiveConf);
-    // Try piggybacking on the function that returns IMSC. Current 
implementation of the IMSC cache
-    // has CacheableMetaStoreClients, which are HMSC, so we can return them 
as-is. If not, it's okay
-    // for us to ignore the caching aspect and return a vanilla HMSC.
-    if (imsc instanceof HiveMetaStoreClient){
-      return (HiveMetaStoreClient)imsc;
-    } else {
-      return new HiveMetaStoreClient(hiveConf);
-    }
+    LOG.warn("HCatUtil.getHiveClient is unsafe and can be a resource leak 
depending on HMSC "
+        + "implementation and caching mechanism. Use 
HCatUtil.getHiveMetastoreClient instead.");
+
+    return new HiveMetaStoreClient(hiveConf);
   }
 
   public static void closeHiveClientQuietly(IMetaStoreClient client) {

http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java
index 9b30703..51f516e 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java
@@ -32,11 +32,13 @@ import javax.security.auth.login.LoginException;
 
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -55,8 +57,11 @@ import 
com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 class HiveClientCache {
   public final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60;
+  public final static int DEFAULT_HIVE_CACHE_INITIAL_CAPACITY = 50;
+  public final static int DEFAULT_HIVE_CACHE_MAX_CAPACITY = 50;
+  public final static boolean DEFAULT_HIVE_CLIENT_CACHE_STATS_ENABLED = false;
 
-  final private Cache<HiveClientCacheKey, ICacheableMetaStoreClient> hiveCache;
+  private final Cache<HiveClientCacheKey, ICacheableMetaStoreClient> hiveCache;
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientCache.class);
   private final int timeout;
   // This lock is used to make sure removalListener won't close a client that 
is being contemplated for returning by get()
@@ -66,6 +71,8 @@ class HiveClientCache {
 
   private final ScheduledFuture<?> cleanupHandle; // used to cleanup cache
 
+  private boolean enableStats;
+
   // Since HiveMetaStoreClient is not threadsafe, hive clients are not  shared 
across threads.
   // Thread local variable containing each thread's unique ID, is used as one 
of the keys for the cache
   // causing each thread to get a different client even if the hiveConf is 
same.
@@ -86,20 +93,78 @@ class HiveClientCache {
   }
 
   public HiveClientCache(HiveConf hiveConf) {
-    this(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME,
-        DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS));
+    this(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, 
DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS),
+        hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_CACHE_INITIAL_CAPACITY, 
DEFAULT_HIVE_CACHE_INITIAL_CAPACITY),
+        hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_CACHE_MAX_CAPACITY, 
DEFAULT_HIVE_CACHE_MAX_CAPACITY),
+        
hiveConf.getBoolean(HCatConstants.HCAT_HIVE_CLIENT_CACHE_STATS_ENABLED, 
DEFAULT_HIVE_CLIENT_CACHE_STATS_ENABLED));
+
   }
+
   /**
-   * @param timeout the length of time in seconds after a client is created 
that it should be automatically removed
+   * @deprecated This constructor will be made private or removed as more 
configuration properties are required.
    */
+  @Deprecated
   public HiveClientCache(final int timeout) {
+    this(timeout, DEFAULT_HIVE_CACHE_INITIAL_CAPACITY, 
DEFAULT_HIVE_CACHE_MAX_CAPACITY, DEFAULT_HIVE_CLIENT_CACHE_STATS_ENABLED);
+  }
+
+  /**
+   * @param timeout the length of time in seconds after a client is created 
that it should be automatically removed
+   */
+  private HiveClientCache(final int timeout, final int initialCapacity, final 
int maxCapacity, final boolean enableStats) {
     this.timeout = timeout;
-    RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient> 
removalListener =
+    this.enableStats = enableStats;
+
+    LOG.info("Initializing cache: eviction-timeout=" + timeout + " 
initial-capacity=" + initialCapacity + " maximum-capacity=" + maxCapacity);
+
+    CacheBuilder builder = CacheBuilder.newBuilder()
+      .initialCapacity(initialCapacity)
+      .maximumSize(maxCapacity)
+      .expireAfterAccess(timeout, TimeUnit.SECONDS)
+      .removalListener(createRemovalListener());
+
+    /*
+     * Guava versions <12.0 have stats collection enabled by default and do 
not expose a recordStats method.
+     * Check for newer versions of the library and ensure that stats 
collection is enabled by default.
+     */
+    try {
+      java.lang.reflect.Method m = builder.getClass().getMethod("recordStats", 
null);
+      m.invoke(builder, null);
+    } catch (NoSuchMethodException e) {
+      LOG.debug("Using a version of guava <12.0. Stats collection is enabled 
by default.");
+    } catch (Exception e) {
+      LOG.warn("Unable to invoke recordStats method.", e);
+    }
+
+    this.hiveCache = builder.build();
+
+    /*
+     * We need to use a cleanup interval, which is how often the cleanup 
thread will kick in
+     * and go do a check to see if any of the connections can be expired. We 
don't want to
+     * do this too often, because it'd be like having a mini-GC going off 
every so often,
+     * so we limit it to a minimum of DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS. 
If the client
+     * has explicitly set a larger timeout on the cache, though, we respect 
that, and use that
+     */
+    long cleanupInterval = timeout > DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS ? 
timeout : DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS;
+
+    this.cleanupHandle = createCleanupThread(cleanupInterval);
+
+    createShutdownHook();
+  }
+
+  private RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient> 
createRemovalListener() {
+    RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient> listener =
       new RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient>() {
         @Override
         public void onRemoval(RemovalNotification<HiveClientCacheKey, 
ICacheableMetaStoreClient> notification) {
           ICacheableMetaStoreClient hiveMetaStoreClient = 
notification.getValue();
           if (hiveMetaStoreClient != null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Evicting client: " + 
Integer.toHexString(System.identityHashCode(hiveMetaStoreClient)));
+            }
+
+            // TODO: This global lock may not be necessary as all concurrent 
methods in ICacheableMetaStoreClient
+            // are synchronized.
             synchronized (CACHE_TEARDOWN_LOCK) {
               hiveMetaStoreClient.setExpiredFromCache();
               hiveMetaStoreClient.tearDownIfUnused();
@@ -107,33 +172,20 @@ class HiveClientCache {
           }
         }
       };
-    hiveCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(timeout, TimeUnit.SECONDS)
-      .removalListener(removalListener)
-      .build();
 
+    return listener;
+  }
+
+  private ScheduledFuture<?> createCleanupThread(long interval) {
     // Add a maintenance thread that will attempt to trigger a cache clean 
continuously
     Runnable cleanupThread = new Runnable() {
       @Override
       public void run() {
-        hiveCache.cleanUp();
+        cleanup();
       }
     };
 
     /**
-     * We need to use a cleanup interval, which is how often the cleanup 
thread will kick in
-     * and go do a check to see if any of the connections can be expired. We 
don't want to
-     * do this too often, because it'd be like having a mini-GC going off 
every so often,
-     * so we limit it to a minimum of DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS. 
If the client
-     * has explicitly set a larger timeout on the cache, though, we respect 
that, and use that
-     */
-    long cleanupInterval = DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS;
-
-    if (timeout > cleanupInterval){
-      cleanupInterval = timeout;
-    }
-
-    /**
      * Create the cleanup handle. In addition to cleaning up every 
cleanupInterval, we add
      * a slight offset, so that the very first time it runs, it runs with a 
slight delay, so
      * as to catch any other connections that were closed when the first 
timeout happened.
@@ -142,13 +194,14 @@ class HiveClientCache {
      * it can be cleaned every 
max(DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS,timeout) seconds
      */
     ThreadFactory daemonThreadFactory = (new 
ThreadFactoryBuilder()).setDaemon(true)
-        .setNameFormat("HiveClientCache-cleaner-%d").build();
-
-    cleanupHandle = Executors.newScheduledThreadPool(1, 
daemonThreadFactory).scheduleWithFixedDelay(
-        cleanupThread,
-        timeout + 5, cleanupInterval, TimeUnit.SECONDS);
+      .setNameFormat("HiveClientCache-cleaner-%d")
+      .build();
 
+    return Executors.newScheduledThreadPool(1, daemonThreadFactory)
+      .scheduleWithFixedDelay(cleanupThread, timeout + 5, interval, 
TimeUnit.SECONDS);
+  }
 
+  private void createShutdownHook() {
     // Add a shutdown hook for cleanup, if there are elements remaining in the 
cache which were not cleaned up.
     // This is the best effort approach. Ignore any error while doing so. 
Notice that most of the clients
     // would get cleaned up via either the removalListener or the close() 
call, only the active clients
@@ -163,6 +216,7 @@ class HiveClientCache {
         closeAllClientsQuietly();
       }
     };
+
     Runtime.getRuntime().addShutdownHook(cleanupHiveClientShutdownThread);
   }
 
@@ -178,10 +232,19 @@ class HiveClientCache {
     } catch (Exception e) {
       LOG.warn("Clean up of hive clients in the cache failed. Ignored", e);
     }
+
+    if (this.enableStats) {
+      LOG.info("Cache statistics after shutdown: size=" + hiveCache.size() + " 
" + hiveCache.stats());
+    }
   }
 
   public void cleanup() {
+    // TODO: periodically reload a new HiveConf to check if stats reporting is 
enabled.
     hiveCache.cleanUp();
+
+    if (enableStats) {
+      LOG.info("Cache statistics after cleanup: size=" + hiveCache.size() + " 
" + hiveCache.stats());
+    }
   }
 
   /**
@@ -193,9 +256,10 @@ class HiveClientCache {
    * @throws IOException
    * @throws LoginException
    */
-  public ICacheableMetaStoreClient get(final HiveConf hiveConf) throws 
MetaException, IOException, LoginException {
+  public IMetaStoreClient get(final HiveConf hiveConf) throws MetaException, 
IOException, LoginException {
     final HiveClientCacheKey cacheKey = 
HiveClientCacheKey.fromHiveConf(hiveConf, getThreadId());
     ICacheableMetaStoreClient cacheableHiveMetaStoreClient = null;
+
     // the hmsc is not shared across threads. So the only way it could get 
closed while we are doing healthcheck
     // is if removalListener closes it. The synchronization takes care that 
removalListener won't do it
     synchronized (CACHE_TEARDOWN_LOCK) {
@@ -255,7 +319,7 @@ class HiveClientCache {
    * UserGroupInformation and metaStoreURIs are same. This function can evolve 
to express
    * the cases when HiveConf is different but the same hiveMetaStoreClient can 
be used
    */
-  public static class HiveClientCacheKey {
+  static class HiveClientCacheKey {
     final private String metaStoreURIs;
     final private UserGroupInformation ugi;
     final private HiveConf hiveConf;
@@ -295,24 +359,38 @@ class HiveClientCache {
         append(ugi).
         append(threadId).toHashCode();
     }
+
+    @Override
+    public String toString() {
+      return "HiveClientCacheKey: uri=" + this.metaStoreURIs + " ugi=" + 
this.ugi + " thread=" + this.threadId; 
+    }
   }
 
+  @InterfaceAudience.Private
   public interface ICacheableMetaStoreClient extends IMetaStoreClient {
-
+    @NoReconnect
     void acquire();
 
-    void release();
-
+    @NoReconnect
     void setExpiredFromCache();
 
+    @NoReconnect
     AtomicInteger getUsers();
 
+    @NoReconnect
     boolean isClosed();
 
+    /**
+     * @deprecated This method is not used internally and should not be 
visible through HCatClient.create.
+     */
+    @Deprecated
+    @NoReconnect
     boolean isOpen();
 
+    @NoReconnect
     void tearDownIfUnused();
 
+    @NoReconnect
     void tearDown();
   }
 
@@ -324,26 +402,48 @@ class HiveClientCache {
     private final AtomicInteger users = new AtomicInteger(0);
     private volatile boolean expiredFromCache = false;
     private boolean isClosed = false;
-    private final long expiryTime;
-    private static final int EXPIRY_TIME_EXTENSION_IN_MILLIS = 60 * 1000;
 
     CacheableHiveMetaStoreClient(final HiveConf conf, final Integer timeout, 
Boolean allowEmbedded)
         throws MetaException {
       super(conf, null, allowEmbedded);
-      // Extend the expiry time with some extra time on top of guava expiry 
time to make sure
-      // that items closed() are for sure expired and would never be returned 
by guava.
-      this.expiryTime = System.currentTimeMillis() + timeout * 1000 + 
EXPIRY_TIME_EXTENSION_IN_MILLIS;
     }
 
-    public void acquire() {
+    /**
+     * Increments the user count and optionally renews the expiration time.
+     * <code>renew</code> should correspond with the expiration policy of the 
cache.
+     * When the policy is <code>expireAfterAccess</code>, the expiration time 
should be extended.
+     * When the policy is <code>expireAfterWrite</code>, the expiration time 
should not be extended.
+     * A mismatch with the policy will lead to closing the connection 
unnecessarily after the initial
+     * expiration time is generated.
+     * @param renew whether the expiration time should be extended.
+     */
+    public synchronized void acquire() {
       users.incrementAndGet();
+      if (users.get() > 1) {
+        LOG.warn("Unexpected increment of user count beyond one: " + 
users.get() + " " + this);
+      }
     }
 
-    public void release() {
-      users.decrementAndGet();
+    /**
+     * Decrements the user count.
+     */
+    private void release() {
+      if (users.get() > 0) {
+        users.decrementAndGet();
+      } else {
+        LOG.warn("Unexpected attempt to decrement user count of zero: " + 
users.get() + " " + this);
+      }
     }
 
-    public void setExpiredFromCache() {
+    /**
+     * Communicate to the client that it is no longer in the cache.
+     * The expiration time should be voided to allow the connection to be 
closed at the first opportunity.
+     */
+    public synchronized void setExpiredFromCache() {
+      if (users.get() != 0) {
+        LOG.warn("Evicted client has non-zero user count: " + users.get());
+      }
+
       expiredFromCache = true;
     }
 
@@ -363,6 +463,7 @@ class HiveClientCache {
      * invalid data renders the client unusable for future use (example: 
create a table with very long table name)
      * @return
      */
+    @Deprecated
     public boolean isOpen() {
       try {
         // Look for an unlikely database name and see if either MetaException 
or TException is thrown
@@ -378,28 +479,31 @@ class HiveClientCache {
      * This *MUST* be called by anyone who uses this client.
      */
     @Override
-    public void close() {
+    public synchronized void close() {
       release();
-      if (System.currentTimeMillis() >= expiryTime)
-        setExpiredFromCache();
       tearDownIfUnused();
     }
 
     /**
-     * Tear down only if
-     *  1. There are no active user
-     *  2. It has expired from the cache
+     * Attempt to tear down the client connection.
+     * The connection will be closed if the following conditions hold:
+     *  1. There are no active user holding the client.
+     *  2. The client has been evicted from the cache.
      */
-    public void tearDownIfUnused() {
+    public synchronized void tearDownIfUnused() {
+      if (users.get() != 0) {
+        LOG.warn("Non-zero user count preventing client tear down: users=" + 
users.get() + " expired=" + expiredFromCache);
+      }
+
       if (users.get() == 0 && expiredFromCache) {
         this.tearDown();
       }
     }
 
     /**
-     * Close if not closed already
+     * Close the underlying objects irrespective of whether they are in use or 
not.
      */
-    public synchronized void tearDown() {
+    public void tearDown() {
       try {
         if (!isClosed) {
           super.close();
@@ -410,12 +514,23 @@ class HiveClientCache {
       }
     }
 
+    @Override
+    public String toString() {
+      return "HCatClient: thread: " + Thread.currentThread().getId() + " 
users=" + users.get()
+        + " expired=" + expiredFromCache + " closed=" + isClosed;
+    }
+
     /**
-     * Last effort to clean up, may not even get called.
+     * GC is attempting to destroy the object.
+     * No one references this client anymore, so it can be torn down without 
worrying about user counts.
      * @throws Throwable
      */
     @Override
     protected void finalize() throws Throwable {
+      if (users.get() != 0) {
+        LOG.warn("Closing client with non-zero user count: users=" + 
users.get() + " expired=" + expiredFromCache);
+      }
+
       try {
         this.tearDown();
       } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java
index b2c9c7a..c77bc48 100644
--- 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java
@@ -41,6 +41,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -77,14 +78,15 @@ public class TestHiveClientCache {
   @Test
   public void testCacheHit() throws IOException, MetaException, LoginException 
{
     HiveClientCache cache = new HiveClientCache(1000);
-    HiveClientCache.ICacheableMetaStoreClient client = cache.get(hiveConf);
+    HiveClientCache.ICacheableMetaStoreClient client = 
(HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf);
     assertNotNull(client);
     client.close(); // close shouldn't matter
 
     // Setting a non important configuration should return the same client only
     hiveConf.setIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS, 10);
-    HiveClientCache.ICacheableMetaStoreClient client2 = cache.get(hiveConf);
+    HiveClientCache.ICacheableMetaStoreClient client2 = 
(HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf);
     assertNotNull(client2);
+    assertSame(client, client2);
     assertEquals(client.getUsers(), client2.getUsers());
     client2.close();
   }
@@ -109,11 +111,11 @@ public class TestHiveClientCache {
   @Test
   public void testCacheExpiry() throws IOException, MetaException, 
LoginException, InterruptedException {
     HiveClientCache cache = new HiveClientCache(1);
-    HiveClientCache.ICacheableMetaStoreClient client = cache.get(hiveConf);
+    HiveClientCache.ICacheableMetaStoreClient client = 
(HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf);
     assertNotNull(client);
 
     Thread.sleep(2500);
-    HiveClientCache.ICacheableMetaStoreClient client2 = cache.get(hiveConf);
+    HiveClientCache.ICacheableMetaStoreClient client2 = 
(HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf);
     client.close();
     assertTrue(client.isClosed()); // close() after *expiry time* and *a cache 
access* should  have tore down the client
 
@@ -154,9 +156,9 @@ public class TestHiveClientCache {
   @Test
   public void testCloseAllClients() throws IOException, MetaException, 
LoginException {
     final HiveClientCache cache = new HiveClientCache(1000);
-    HiveClientCache.ICacheableMetaStoreClient client1 = cache.get(hiveConf);
+    HiveClientCache.ICacheableMetaStoreClient client1 = 
(HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf);
     hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, " "); // URIs are checked 
for string equivalence, even spaces make them different
-    HiveClientCache.ICacheableMetaStoreClient client2 = cache.get(hiveConf);
+    HiveClientCache.ICacheableMetaStoreClient client2 = 
(HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf);
     cache.closeAllClientsQuietly();
     assertTrue(client1.isClosed());
     assertTrue(client2.isClosed());

http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index b6fe502..8dc4e28 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
 import 
org.apache.hadoop.hive.common.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -127,6 +128,7 @@ public interface IMetaStoreClient {
   /**
    * close connection to meta store
    */
+  @NoReconnect
   void close();
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
index c0d9c0c..4895bff 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TApplicationException;
@@ -139,13 +140,20 @@ public class RetryingMetaStoreClient implements 
InvocationHandler {
     Object ret = null;
     int retriesMade = 0;
     TException caughtException = null;
+
+    boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class);
+
     while (true) {
       try {
         reloginExpiringKeytabUser();
-        if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
-          base.reconnect();
-          lastConnectionTime = System.currentTimeMillis();
+
+        if (allowReconnect) {
+          if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
+            base.reconnect();
+            lastConnectionTime = System.currentTimeMillis();
+          }
         }
+
         if (metaCallTimeMap == null) {
           ret = method.invoke(base, args);
         } else {
@@ -228,10 +236,10 @@ public class RetryingMetaStoreClient implements 
InvocationHandler {
   }
 
   private boolean hasConnectionLifeTimeReached(Method method) {
-    if (connectionLifeTimeInMillis <= 0 || localMetaStore ||
-        method.getName().equalsIgnoreCase("close")) {
+    if (connectionLifeTimeInMillis <= 0 || localMetaStore) {
       return false;
     }
+
     boolean shouldReconnect =
         (System.currentTimeMillis() - lastConnectionTime) >= 
connectionLifeTimeInMillis;
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/metastore/src/java/org/apache/hadoop/hive/metastore/annotation/NoReconnect.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/annotation/NoReconnect.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/annotation/NoReconnect.java
new file mode 100644
index 0000000..edf0831
--- /dev/null
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/annotation/NoReconnect.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface NoReconnect {
+}

Reply via email to