Repository: hive Updated Branches: refs/heads/master 41cb67e61 -> 925f19552
HIVE-13754: Fix resource leak in HiveClientCache (Chris Drome, via Mithun Radhakrishnan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/925f1955 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/925f1955 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/925f1955 Branch: refs/heads/master Commit: 925f195523aba1e2bfbfebf06c6b87fafa4dbd67 Parents: 41cb67e Author: Mithun RK <mith...@yahoo-inc.com> Authored: Wed Aug 10 10:10:32 2016 -0700 Committer: Mithun RK <mith...@yahoo-inc.com> Committed: Wed Aug 10 10:11:38 2016 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/common/HCatConstants.java | 10 + .../apache/hive/hcatalog/common/HCatUtil.java | 13 +- .../hive/hcatalog/common/HiveClientCache.java | 219 ++++++++++++++----- .../hcatalog/common/TestHiveClientCache.java | 14 +- .../hadoop/hive/metastore/IMetaStoreClient.java | 2 + .../hive/metastore/RetryingMetaStoreClient.java | 18 +- .../hive/metastore/annotation/NoReconnect.java | 29 +++ 7 files changed, 233 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java index d165e7e..72930eb 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java @@ -61,11 +61,21 @@ public final class HCatConstants { // hcatalog specific configurations, that can be put in hive-site.xml public static final String HCAT_HIVE_CLIENT_EXPIRY_TIME = "hcatalog.hive.client.cache.expiry.time"; + // config parameter that suggests to hcat that metastore clients not be cached - default is false // this parameter allows highly-parallel hcat usescases to not gobble up too many connections that // sit in the cache, while not in use. public static final String HCAT_HIVE_CLIENT_DISABLE_CACHE = "hcatalog.hive.client.cache.disabled"; + // Indicates the initial capacity of the cache. + public static final String HCAT_HIVE_CLIENT_CACHE_INITIAL_CAPACITY = "hcatalog.hive.client.cache.initial.capacity"; + + // Indicates the maximum capacity of the cache. Minimum value should be the number of threads. + public static final String HCAT_HIVE_CLIENT_CACHE_MAX_CAPACITY = "hcatalog.hive.client.cache.max.capacity"; + + // Indicates whether cache statistics should be collected. + public static final String HCAT_HIVE_CLIENT_CACHE_STATS_ENABLED = "hcatalog.hive.client.cache.stats.enabled"; + private HCatConstants() { // restrict instantiation } http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index f3bfcfa..8b927af 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -573,15 +573,10 @@ public class HCatUtil { */ @Deprecated public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf) throws MetaException, IOException { - IMetaStoreClient imsc = getHiveMetastoreClient(hiveConf); - // Try piggybacking on the function that returns IMSC. Current implementation of the IMSC cache - // has CacheableMetaStoreClients, which are HMSC, so we can return them as-is. If not, it's okay - // for us to ignore the caching aspect and return a vanilla HMSC. - if (imsc instanceof HiveMetaStoreClient){ - return (HiveMetaStoreClient)imsc; - } else { - return new HiveMetaStoreClient(hiveConf); - } + LOG.warn("HCatUtil.getHiveClient is unsafe and can be a resource leak depending on HMSC " + + "implementation and caching mechanism. Use HCatUtil.getHiveMetastoreClient instead."); + + return new HiveMetaStoreClient(hiveConf); } public static void closeHiveClientQuietly(IMetaStoreClient client) { http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java index 9b30703..51f516e 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java @@ -32,11 +32,13 @@ import javax.security.auth.login.LoginException; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.annotation.NoReconnect; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -55,8 +57,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ class HiveClientCache { public final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60; + public final static int DEFAULT_HIVE_CACHE_INITIAL_CAPACITY = 50; + public final static int DEFAULT_HIVE_CACHE_MAX_CAPACITY = 50; + public final static boolean DEFAULT_HIVE_CLIENT_CACHE_STATS_ENABLED = false; - final private Cache<HiveClientCacheKey, ICacheableMetaStoreClient> hiveCache; + private final Cache<HiveClientCacheKey, ICacheableMetaStoreClient> hiveCache; private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class); private final int timeout; // This lock is used to make sure removalListener won't close a client that is being contemplated for returning by get() @@ -66,6 +71,8 @@ class HiveClientCache { private final ScheduledFuture<?> cleanupHandle; // used to cleanup cache + private boolean enableStats; + // Since HiveMetaStoreClient is not threadsafe, hive clients are not shared across threads. // Thread local variable containing each thread's unique ID, is used as one of the keys for the cache // causing each thread to get a different client even if the hiveConf is same. @@ -86,20 +93,78 @@ class HiveClientCache { } public HiveClientCache(HiveConf hiveConf) { - this(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, - DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS)); + this(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS), + hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_CACHE_INITIAL_CAPACITY, DEFAULT_HIVE_CACHE_INITIAL_CAPACITY), + hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_CACHE_MAX_CAPACITY, DEFAULT_HIVE_CACHE_MAX_CAPACITY), + hiveConf.getBoolean(HCatConstants.HCAT_HIVE_CLIENT_CACHE_STATS_ENABLED, DEFAULT_HIVE_CLIENT_CACHE_STATS_ENABLED)); + } + /** - * @param timeout the length of time in seconds after a client is created that it should be automatically removed + * @deprecated This constructor will be made private or removed as more configuration properties are required. */ + @Deprecated public HiveClientCache(final int timeout) { + this(timeout, DEFAULT_HIVE_CACHE_INITIAL_CAPACITY, DEFAULT_HIVE_CACHE_MAX_CAPACITY, DEFAULT_HIVE_CLIENT_CACHE_STATS_ENABLED); + } + + /** + * @param timeout the length of time in seconds after a client is created that it should be automatically removed + */ + private HiveClientCache(final int timeout, final int initialCapacity, final int maxCapacity, final boolean enableStats) { this.timeout = timeout; - RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient> removalListener = + this.enableStats = enableStats; + + LOG.info("Initializing cache: eviction-timeout=" + timeout + " initial-capacity=" + initialCapacity + " maximum-capacity=" + maxCapacity); + + CacheBuilder builder = CacheBuilder.newBuilder() + .initialCapacity(initialCapacity) + .maximumSize(maxCapacity) + .expireAfterAccess(timeout, TimeUnit.SECONDS) + .removalListener(createRemovalListener()); + + /* + * Guava versions <12.0 have stats collection enabled by default and do not expose a recordStats method. + * Check for newer versions of the library and ensure that stats collection is enabled by default. + */ + try { + java.lang.reflect.Method m = builder.getClass().getMethod("recordStats", null); + m.invoke(builder, null); + } catch (NoSuchMethodException e) { + LOG.debug("Using a version of guava <12.0. Stats collection is enabled by default."); + } catch (Exception e) { + LOG.warn("Unable to invoke recordStats method.", e); + } + + this.hiveCache = builder.build(); + + /* + * We need to use a cleanup interval, which is how often the cleanup thread will kick in + * and go do a check to see if any of the connections can be expired. We don't want to + * do this too often, because it'd be like having a mini-GC going off every so often, + * so we limit it to a minimum of DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS. If the client + * has explicitly set a larger timeout on the cache, though, we respect that, and use that + */ + long cleanupInterval = timeout > DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS ? timeout : DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS; + + this.cleanupHandle = createCleanupThread(cleanupInterval); + + createShutdownHook(); + } + + private RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient> createRemovalListener() { + RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient> listener = new RemovalListener<HiveClientCacheKey, ICacheableMetaStoreClient>() { @Override public void onRemoval(RemovalNotification<HiveClientCacheKey, ICacheableMetaStoreClient> notification) { ICacheableMetaStoreClient hiveMetaStoreClient = notification.getValue(); if (hiveMetaStoreClient != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Evicting client: " + Integer.toHexString(System.identityHashCode(hiveMetaStoreClient))); + } + + // TODO: This global lock may not be necessary as all concurrent methods in ICacheableMetaStoreClient + // are synchronized. synchronized (CACHE_TEARDOWN_LOCK) { hiveMetaStoreClient.setExpiredFromCache(); hiveMetaStoreClient.tearDownIfUnused(); @@ -107,33 +172,20 @@ class HiveClientCache { } } }; - hiveCache = CacheBuilder.newBuilder() - .expireAfterWrite(timeout, TimeUnit.SECONDS) - .removalListener(removalListener) - .build(); + return listener; + } + + private ScheduledFuture<?> createCleanupThread(long interval) { // Add a maintenance thread that will attempt to trigger a cache clean continuously Runnable cleanupThread = new Runnable() { @Override public void run() { - hiveCache.cleanUp(); + cleanup(); } }; /** - * We need to use a cleanup interval, which is how often the cleanup thread will kick in - * and go do a check to see if any of the connections can be expired. We don't want to - * do this too often, because it'd be like having a mini-GC going off every so often, - * so we limit it to a minimum of DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS. If the client - * has explicitly set a larger timeout on the cache, though, we respect that, and use that - */ - long cleanupInterval = DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS; - - if (timeout > cleanupInterval){ - cleanupInterval = timeout; - } - - /** * Create the cleanup handle. In addition to cleaning up every cleanupInterval, we add * a slight offset, so that the very first time it runs, it runs with a slight delay, so * as to catch any other connections that were closed when the first timeout happened. @@ -142,13 +194,14 @@ class HiveClientCache { * it can be cleaned every max(DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS,timeout) seconds */ ThreadFactory daemonThreadFactory = (new ThreadFactoryBuilder()).setDaemon(true) - .setNameFormat("HiveClientCache-cleaner-%d").build(); - - cleanupHandle = Executors.newScheduledThreadPool(1, daemonThreadFactory).scheduleWithFixedDelay( - cleanupThread, - timeout + 5, cleanupInterval, TimeUnit.SECONDS); + .setNameFormat("HiveClientCache-cleaner-%d") + .build(); + return Executors.newScheduledThreadPool(1, daemonThreadFactory) + .scheduleWithFixedDelay(cleanupThread, timeout + 5, interval, TimeUnit.SECONDS); + } + private void createShutdownHook() { // Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up. // This is the best effort approach. Ignore any error while doing so. Notice that most of the clients // would get cleaned up via either the removalListener or the close() call, only the active clients @@ -163,6 +216,7 @@ class HiveClientCache { closeAllClientsQuietly(); } }; + Runtime.getRuntime().addShutdownHook(cleanupHiveClientShutdownThread); } @@ -178,10 +232,19 @@ class HiveClientCache { } catch (Exception e) { LOG.warn("Clean up of hive clients in the cache failed. Ignored", e); } + + if (this.enableStats) { + LOG.info("Cache statistics after shutdown: size=" + hiveCache.size() + " " + hiveCache.stats()); + } } public void cleanup() { + // TODO: periodically reload a new HiveConf to check if stats reporting is enabled. hiveCache.cleanUp(); + + if (enableStats) { + LOG.info("Cache statistics after cleanup: size=" + hiveCache.size() + " " + hiveCache.stats()); + } } /** @@ -193,9 +256,10 @@ class HiveClientCache { * @throws IOException * @throws LoginException */ - public ICacheableMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOException, LoginException { + public IMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOException, LoginException { final HiveClientCacheKey cacheKey = HiveClientCacheKey.fromHiveConf(hiveConf, getThreadId()); ICacheableMetaStoreClient cacheableHiveMetaStoreClient = null; + // the hmsc is not shared across threads. So the only way it could get closed while we are doing healthcheck // is if removalListener closes it. The synchronization takes care that removalListener won't do it synchronized (CACHE_TEARDOWN_LOCK) { @@ -255,7 +319,7 @@ class HiveClientCache { * UserGroupInformation and metaStoreURIs are same. This function can evolve to express * the cases when HiveConf is different but the same hiveMetaStoreClient can be used */ - public static class HiveClientCacheKey { + static class HiveClientCacheKey { final private String metaStoreURIs; final private UserGroupInformation ugi; final private HiveConf hiveConf; @@ -295,24 +359,38 @@ class HiveClientCache { append(ugi). append(threadId).toHashCode(); } + + @Override + public String toString() { + return "HiveClientCacheKey: uri=" + this.metaStoreURIs + " ugi=" + this.ugi + " thread=" + this.threadId; + } } + @InterfaceAudience.Private public interface ICacheableMetaStoreClient extends IMetaStoreClient { - + @NoReconnect void acquire(); - void release(); - + @NoReconnect void setExpiredFromCache(); + @NoReconnect AtomicInteger getUsers(); + @NoReconnect boolean isClosed(); + /** + * @deprecated This method is not used internally and should not be visible through HCatClient.create. + */ + @Deprecated + @NoReconnect boolean isOpen(); + @NoReconnect void tearDownIfUnused(); + @NoReconnect void tearDown(); } @@ -324,26 +402,48 @@ class HiveClientCache { private final AtomicInteger users = new AtomicInteger(0); private volatile boolean expiredFromCache = false; private boolean isClosed = false; - private final long expiryTime; - private static final int EXPIRY_TIME_EXTENSION_IN_MILLIS = 60 * 1000; CacheableHiveMetaStoreClient(final HiveConf conf, final Integer timeout, Boolean allowEmbedded) throws MetaException { super(conf, null, allowEmbedded); - // Extend the expiry time with some extra time on top of guava expiry time to make sure - // that items closed() are for sure expired and would never be returned by guava. - this.expiryTime = System.currentTimeMillis() + timeout * 1000 + EXPIRY_TIME_EXTENSION_IN_MILLIS; } - public void acquire() { + /** + * Increments the user count and optionally renews the expiration time. + * <code>renew</code> should correspond with the expiration policy of the cache. + * When the policy is <code>expireAfterAccess</code>, the expiration time should be extended. + * When the policy is <code>expireAfterWrite</code>, the expiration time should not be extended. + * A mismatch with the policy will lead to closing the connection unnecessarily after the initial + * expiration time is generated. + * @param renew whether the expiration time should be extended. + */ + public synchronized void acquire() { users.incrementAndGet(); + if (users.get() > 1) { + LOG.warn("Unexpected increment of user count beyond one: " + users.get() + " " + this); + } } - public void release() { - users.decrementAndGet(); + /** + * Decrements the user count. + */ + private void release() { + if (users.get() > 0) { + users.decrementAndGet(); + } else { + LOG.warn("Unexpected attempt to decrement user count of zero: " + users.get() + " " + this); + } } - public void setExpiredFromCache() { + /** + * Communicate to the client that it is no longer in the cache. + * The expiration time should be voided to allow the connection to be closed at the first opportunity. + */ + public synchronized void setExpiredFromCache() { + if (users.get() != 0) { + LOG.warn("Evicted client has non-zero user count: " + users.get()); + } + expiredFromCache = true; } @@ -363,6 +463,7 @@ class HiveClientCache { * invalid data renders the client unusable for future use (example: create a table with very long table name) * @return */ + @Deprecated public boolean isOpen() { try { // Look for an unlikely database name and see if either MetaException or TException is thrown @@ -378,28 +479,31 @@ class HiveClientCache { * This *MUST* be called by anyone who uses this client. */ @Override - public void close() { + public synchronized void close() { release(); - if (System.currentTimeMillis() >= expiryTime) - setExpiredFromCache(); tearDownIfUnused(); } /** - * Tear down only if - * 1. There are no active user - * 2. It has expired from the cache + * Attempt to tear down the client connection. + * The connection will be closed if the following conditions hold: + * 1. There are no active user holding the client. + * 2. The client has been evicted from the cache. */ - public void tearDownIfUnused() { + public synchronized void tearDownIfUnused() { + if (users.get() != 0) { + LOG.warn("Non-zero user count preventing client tear down: users=" + users.get() + " expired=" + expiredFromCache); + } + if (users.get() == 0 && expiredFromCache) { this.tearDown(); } } /** - * Close if not closed already + * Close the underlying objects irrespective of whether they are in use or not. */ - public synchronized void tearDown() { + public void tearDown() { try { if (!isClosed) { super.close(); @@ -410,12 +514,23 @@ class HiveClientCache { } } + @Override + public String toString() { + return "HCatClient: thread: " + Thread.currentThread().getId() + " users=" + users.get() + + " expired=" + expiredFromCache + " closed=" + isClosed; + } + /** - * Last effort to clean up, may not even get called. + * GC is attempting to destroy the object. + * No one references this client anymore, so it can be torn down without worrying about user counts. * @throws Throwable */ @Override protected void finalize() throws Throwable { + if (users.get() != 0) { + LOG.warn("Closing client with non-zero user count: users=" + users.get() + " expired=" + expiredFromCache); + } + try { this.tearDown(); } finally { http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java index b2c9c7a..c77bc48 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHiveClientCache.java @@ -41,6 +41,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -77,14 +78,15 @@ public class TestHiveClientCache { @Test public void testCacheHit() throws IOException, MetaException, LoginException { HiveClientCache cache = new HiveClientCache(1000); - HiveClientCache.ICacheableMetaStoreClient client = cache.get(hiveConf); + HiveClientCache.ICacheableMetaStoreClient client = (HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf); assertNotNull(client); client.close(); // close shouldn't matter // Setting a non important configuration should return the same client only hiveConf.setIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS, 10); - HiveClientCache.ICacheableMetaStoreClient client2 = cache.get(hiveConf); + HiveClientCache.ICacheableMetaStoreClient client2 = (HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf); assertNotNull(client2); + assertSame(client, client2); assertEquals(client.getUsers(), client2.getUsers()); client2.close(); } @@ -109,11 +111,11 @@ public class TestHiveClientCache { @Test public void testCacheExpiry() throws IOException, MetaException, LoginException, InterruptedException { HiveClientCache cache = new HiveClientCache(1); - HiveClientCache.ICacheableMetaStoreClient client = cache.get(hiveConf); + HiveClientCache.ICacheableMetaStoreClient client = (HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf); assertNotNull(client); Thread.sleep(2500); - HiveClientCache.ICacheableMetaStoreClient client2 = cache.get(hiveConf); + HiveClientCache.ICacheableMetaStoreClient client2 = (HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf); client.close(); assertTrue(client.isClosed()); // close() after *expiry time* and *a cache access* should have tore down the client @@ -154,9 +156,9 @@ public class TestHiveClientCache { @Test public void testCloseAllClients() throws IOException, MetaException, LoginException { final HiveClientCache cache = new HiveClientCache(1000); - HiveClientCache.ICacheableMetaStoreClient client1 = cache.get(hiveConf); + HiveClientCache.ICacheableMetaStoreClient client1 = (HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, " "); // URIs are checked for string equivalence, even spaces make them different - HiveClientCache.ICacheableMetaStoreClient client2 = cache.get(hiveConf); + HiveClientCache.ICacheableMetaStoreClient client2 = (HiveClientCache.ICacheableMetaStoreClient) cache.get(hiveConf); cache.closeAllClientsQuietly(); assertTrue(client1.isClosed()); assertTrue(client2.isClosed()); http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index b6fe502..8dc4e28 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public; import org.apache.hadoop.hive.common.classification.InterfaceStability.Evolving; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.annotation.NoReconnect; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; @@ -127,6 +128,7 @@ public interface IMetaStoreClient { /** * close connection to meta store */ + @NoReconnect void close(); /** http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java index c0d9c0c..4895bff 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.annotation.NoReconnect; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TApplicationException; @@ -139,13 +140,20 @@ public class RetryingMetaStoreClient implements InvocationHandler { Object ret = null; int retriesMade = 0; TException caughtException = null; + + boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class); + while (true) { try { reloginExpiringKeytabUser(); - if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { - base.reconnect(); - lastConnectionTime = System.currentTimeMillis(); + + if (allowReconnect) { + if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { + base.reconnect(); + lastConnectionTime = System.currentTimeMillis(); + } } + if (metaCallTimeMap == null) { ret = method.invoke(base, args); } else { @@ -228,10 +236,10 @@ public class RetryingMetaStoreClient implements InvocationHandler { } private boolean hasConnectionLifeTimeReached(Method method) { - if (connectionLifeTimeInMillis <= 0 || localMetaStore || - method.getName().equalsIgnoreCase("close")) { + if (connectionLifeTimeInMillis <= 0 || localMetaStore) { return false; } + boolean shouldReconnect = (System.currentTimeMillis() - lastConnectionTime) >= connectionLifeTimeInMillis; if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/925f1955/metastore/src/java/org/apache/hadoop/hive/metastore/annotation/NoReconnect.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/annotation/NoReconnect.java b/metastore/src/java/org/apache/hadoop/hive/metastore/annotation/NoReconnect.java new file mode 100644 index 0000000..edf0831 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/annotation/NoReconnect.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface NoReconnect { +}