Repository: incubator-impala Updated Branches: refs/heads/master fe97579fe -> 6fc399ebc
IMPALA-2347: Reuse metastore client connections in Catalog Currently we create a new connection to metastore every time Catalog connects to HMS. This was intentionally done to circumvent HIVE-5181. Given it is fixed already in Hive, this patch intends to refactor the HMS client usage on the catalog to reuse the connections. Additionally this patch makes MetaStoreClient implement AutoCloseable interface and hence all the callers can use the try-with-resources to create a new metastore client and needn't explicitly call release(). Also, this patch increases the default initial metastore pool size to 10 from a previous value of 5, which is less even for a decent DDL load. In terms of design, this patch switches the metastore client implementation to RetryingMetaStoreClient from previous implementation of HiveMetaStoreClient. The reason for this switch is to handle HMS failures from Catalog side where the entire metastore client pool cache becomes stale in the event of a metastore restart and there is no proper way to deal with it. RetryingMetaStoreClient has inbuilt retry mechanism which reconnects stale connections in the event of failures. For more details on retries and corresponding configurations, check org.apache.hadoop.hive.metastore.RetryingMetaStoreClient. Change-Id: I517c0e1efef2584cd8d34017b33574f2ad69bd52 Reviewed-on: http://gerrit.cloudera.org:8080/3984 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/15d20d8f Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/15d20d8f Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/15d20d8f Branch: refs/heads/master Commit: 15d20d8f12ffd01a1da807ea003939f1ff24a7bb Parents: fe97579 Author: Bharath Vissapragada <[email protected]> Authored: Tue Jun 14 04:54:58 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Mon Aug 15 20:32:35 2016 +0000 ---------------------------------------------------------------------- .../analysis/CreateTableAsSelectStmt.java | 5 +- .../com/cloudera/impala/catalog/Catalog.java | 2 +- .../impala/catalog/CatalogServiceCatalog.java | 20 +-- .../impala/catalog/DataSourceTable.java | 6 +- .../com/cloudera/impala/catalog/HBaseTable.java | 4 +- .../com/cloudera/impala/catalog/HdfsTable.java | 16 +-- .../cloudera/impala/catalog/ImpaladCatalog.java | 7 +- .../impala/catalog/IncompleteTable.java | 4 +- .../com/cloudera/impala/catalog/KuduTable.java | 6 +- .../impala/catalog/MetaStoreClientPool.java | 75 ++++++---- .../java/com/cloudera/impala/catalog/Table.java | 6 +- .../cloudera/impala/catalog/TableLoader.java | 6 +- .../java/com/cloudera/impala/catalog/View.java | 4 +- .../impala/service/CatalogOpExecutor.java | 140 ++++++------------- .../com/cloudera/impala/util/MetaStoreUtil.java | 6 +- .../cloudera/impala/catalog/CatalogTest.java | 7 +- 16 files changed, 124 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java index 72e30f9..7b59625 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/CreateTableAsSelectStmt.java @@ -177,8 +177,7 @@ public class CreateTableAsSelectStmt extends StatementBase { org.apache.hadoop.hive.metastore.api.Table msTbl = CatalogOpExecutor.createMetaStoreTable(createStmt_.toThrift()); - MetaStoreClient client = analyzer.getCatalog().getMetaStoreClient(); - try { + try (MetaStoreClient client = analyzer.getCatalog().getMetaStoreClient()) { // Set a valid location of this table using the same rules as the metastore. If the // user specified a location for the table this will be a no-op. msTbl.getSd().setLocation(analyzer.getCatalog().getTablePath(msTbl).toString()); @@ -199,8 +198,6 @@ public class CreateTableAsSelectStmt extends StatementBase { throw new AnalysisException(e.getMessage(), e); } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); - } finally { - client.release(); } // Finally, run analysis on the insert statement. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java b/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java index 3858028..4cd1c42 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java @@ -60,7 +60,7 @@ public abstract class Catalog { // Initial catalog version. public final static long INITIAL_CATALOG_VERSION = 0L; public static final String DEFAULT_DB = "default"; - private static final int META_STORE_CLIENT_POOL_SIZE = 5; + private static final int META_STORE_CLIENT_POOL_SIZE = 10; public static final String BUILTINS_DB = "_impala_builtins"; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java index 7c7a7de..ae92996 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java @@ -557,8 +557,7 @@ public class CatalogServiceCatalog extends Catalog { // step. ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>(); List<TTableName> tblsToBackgroundLoad = Lists.newArrayList(); - MetaStoreClient msClient = metaStoreClientPool_.getClient(); - try { + try (MetaStoreClient msClient = getMetaStoreClient()) { for (String dbName: msClient.getHiveClient().getAllDatabases()) { List<org.apache.hadoop.hive.metastore.api.Function> javaFns = Lists.newArrayList(); @@ -592,8 +591,6 @@ public class CatalogServiceCatalog extends Catalog { } } } - } finally { - msClient.release(); } dbCache_.set(newDbCache); // Submit tables for background loading. @@ -859,8 +856,7 @@ public class CatalogServiceCatalog extends Catalog { synchronized(tbl) { long newCatalogVersion = incrementAndGetCatalogVersion(); catalogLock_.writeLock().unlock(); - MetaStoreClient msClient = getMetaStoreClient(); - try { + try (MetaStoreClient msClient = getMetaStoreClient()) { org.apache.hadoop.hive.metastore.api.Table msTbl = null; try { msTbl = msClient.getHiveClient().getTable(db.getName(), @@ -870,8 +866,6 @@ public class CatalogServiceCatalog extends Catalog { db.getName() + "." + tblName.getTable_name(), e); } tbl.load(true, msClient.getHiveClient(), msTbl); - } finally { - msClient.release(); } tbl.setCatalogVersion(newCatalogVersion); return tbl; @@ -954,8 +948,7 @@ public class CatalogServiceCatalog extends Catalog { // 3) unknown (null) - There was exception thrown by the metastore client. Boolean tableExistsInMetaStore; Db db = null; - MetaStoreClient msClient = getMetaStoreClient(); - try { + try (MetaStoreClient msClient = getMetaStoreClient()) { org.apache.hadoop.hive.metastore.api.Database msDb = null; try { tableExistsInMetaStore = msClient.getHiveClient().tableExists(dbName, tblName); @@ -995,8 +988,6 @@ public class CatalogServiceCatalog extends Catalog { return false; } } - } finally { - msClient.release(); } // Add a new uninitialized table to the table cache, effectively invalidating @@ -1193,8 +1184,7 @@ public class CatalogServiceCatalog extends Catalog { : hdfsPartition.getPartitionName(); LOG.debug(String.format("Refreshing Partition metadata: %s %s", hdfsTable.getFullName(), partitionName)); - MetaStoreClient msClient = getMetaStoreClient(); - try { + try (MetaStoreClient msClient = getMetaStoreClient()) { org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null; try { hmsPartition = msClient.getHiveClient().getPartition( @@ -1212,8 +1202,6 @@ public class CatalogServiceCatalog extends Catalog { + hdfsTable.getFullName() + " " + partitionName, e); } hdfsTable.reloadPartition(hdfsPartition, hmsPartition); - } finally { - msClient.release(); } hdfsTable.setCatalogVersion(newCatalogVersion); return hdfsTable; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java b/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java index 44bd666..c42c804 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java @@ -20,7 +20,7 @@ package com.cloudera.impala.catalog; import java.util.List; import java.util.Set; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,7 +139,7 @@ public class DataSourceTable extends Table { * Throws a TableLoadingException if the metadata is incompatible with what we * support. */ - private void loadColumns(List<FieldSchema> fieldSchemas, HiveMetaStoreClient client) + private void loadColumns(List<FieldSchema> fieldSchemas, IMetaStoreClient client) throws TableLoadingException { int pos = 0; for (FieldSchema s: fieldSchemas) { @@ -159,7 +159,7 @@ public class DataSourceTable extends Table { } @Override - public void load(boolean reuseMetadata, HiveMetaStoreClient client, + public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { Preconditions.checkNotNull(msTbl); msTable_ = msTbl; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java index a649c93..d96314e 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/HBaseTable.java @@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hive.hbase.HBaseSerDe; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -325,7 +325,7 @@ public class HBaseTable extends Table { * to hdfs tables since we typically need to understand all columns to make sense * of the file at all. */ - public void load(boolean reuseMetadata, HiveMetaStoreClient client, + public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { Preconditions.checkNotNull(getMetaStoreTable()); try { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java index 4b35e66..2464376 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java @@ -41,7 +41,7 @@ import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -977,7 +977,7 @@ public class HdfsTable extends Table { } @Override - public void load(boolean reuseMetadata, HiveMetaStoreClient client, + public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { load(reuseMetadata, client, msTbl, true, true, null); } @@ -1009,7 +1009,7 @@ public class HdfsTable extends Table { * If any of these occur, user has to execute "invalidate metadata" to invalidate the * metadata cache of the table and trigger a fresh load. */ - public void load(boolean reuseMetadata, HiveMetaStoreClient client, + public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl, boolean loadFileMetadata, boolean loadTableSchema, Set<String> partitionsToUpdate) throws TableLoadingException { @@ -1098,7 +1098,7 @@ public class HdfsTable extends Table { * file/block metadata reload for the partitions specified in 'partitionsToUpdate', if * any, or for all the table partitions if 'partitionsToUpdate' is null. */ - private void updatePartitionsFromHms(HiveMetaStoreClient client, + private void updatePartitionsFromHms(IMetaStoreClient client, Set<String> partitionsToUpdate, boolean loadFileMetadata) throws Exception { LOG.debug("sync table partitions: " + name_); org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); @@ -1262,7 +1262,7 @@ public class HdfsTable extends Table { * as Avro. Additionally, this method also reconciles the schema if the column * definitions from the metastore differ from the Avro schema. */ - private void setAvroSchema(HiveMetaStoreClient client, + private void setAvroSchema(IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception { Preconditions.checkState(isSchemaLoaded_); String inputFormat = msTbl.getSd().getInputFormat(); @@ -1324,7 +1324,7 @@ public class HdfsTable extends Table { /** * Loads table schema and column stats from Hive Metastore. */ - private void loadSchema(HiveMetaStoreClient client, + private void loadSchema(IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception { nonPartFieldSchemas_.clear(); // set nullPartitionKeyValue from the hive conf. @@ -1356,7 +1356,7 @@ public class HdfsTable extends Table { * table partitions. */ private void loadPartitionsFromMetastore(List<HdfsPartition> partitions, - HiveMetaStoreClient client) throws Exception { + IMetaStoreClient client) throws Exception { Preconditions.checkNotNull(partitions); if (partitions.isEmpty()) return; LOG.info(String.format("Incrementally updating %d/%d partitions.", @@ -1373,7 +1373,7 @@ public class HdfsTable extends Table { * 'partitionNames' and adds them to the internal list of table partitions. */ private void loadPartitionsFromMetastore(Set<String> partitionNames, - HiveMetaStoreClient client) throws Exception { + IMetaStoreClient client) throws Exception { Preconditions.checkNotNull(partitionNames); if (partitionNames.isEmpty()) return; // Load partition metadata from Hive Metastore. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java index b8c3334..02c9747 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java @@ -207,19 +207,16 @@ public class ImpaladCatalog extends Catalog { */ public Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl) throws NoSuchObjectException, MetaException, TException { - MetaStoreClient client = getMetaStoreClient(); - try { + try (MetaStoreClient msClient = getMetaStoreClient()) { // If the table did not have its path set, build the path based on the the // location property of the parent database. if (msTbl.getSd().getLocation() == null || msTbl.getSd().getLocation().isEmpty()) { String dbLocation = - client.getHiveClient().getDatabase(msTbl.getDbName()).getLocationUri(); + msClient.getHiveClient().getDatabase(msTbl.getDbName()).getLocationUri(); return new Path(dbLocation, msTbl.getTableName().toLowerCase()); } else { return new Path(msTbl.getSd().getLocation()); } - } finally { - client.release(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java b/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java index e811046..88bab5e 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java @@ -20,7 +20,7 @@ package com.cloudera.impala.catalog; import java.util.List; import java.util.Set; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.common.JniUtil; @@ -70,7 +70,7 @@ public class IncompleteTable extends Table { } @Override - public void load(boolean reuseMetadata, HiveMetaStoreClient client, + public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { if (cause_ instanceof TableLoadingException) { throw (TableLoadingException) cause_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java b/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java index e60c639..28f4133 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java @@ -24,7 +24,7 @@ import java.util.Set; import javax.xml.bind.DatatypeConverter; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.log4j.Logger; @@ -133,7 +133,7 @@ public class KuduTable extends Table { /** * Load the columns from the schema list */ - private void loadColumns(List<FieldSchema> schema, HiveMetaStoreClient client, + private void loadColumns(List<FieldSchema> schema, IMetaStoreClient client, Set<String> keyColumns) throws TableLoadingException { if (keyColumns.size() == 0 || keyColumns.size() > schema.size()) { @@ -168,7 +168,7 @@ public class KuduTable extends Table { } @Override - public void load(boolean reuseMetadata, HiveMetaStoreClient client, + public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { // TODO handle 'reuseMetadata' if (getMetaStoreTable() == null || !tableParamsAreValid(msTbl.getParameters())) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java b/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java index 8ea96fe..40eb4cf 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java @@ -20,20 +20,28 @@ package com.cloudera.impala.catalog; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.log4j.Logger; import com.google.common.base.Preconditions; /** - * Manages a pool of HiveMetaStoreClient connections. If the connection pool is empty - * a new client is created and added to the pool. There is no size limit. + * Manages a pool of RetryingMetaStoreClient connections. If the connection pool is empty + * a new client is created and added to the pool. The idle pool can expand till a maximum + * size of MAX_HMS_CONNECTION_POOL_SIZE, beyond which the connections are closed. */ public class MetaStoreClientPool { // Key for config option read from hive-site.xml private static final String HIVE_METASTORE_CNXN_DELAY_MS_CONF = "impala.catalog.metastore.cnxn.creation.delay.ms"; private static final int DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF = 0; + // Maximum number of idle metastore connections in the connection pool at any point. + private static final int MAX_HMS_CONNECTION_POOL_SIZE = 32; // Number of milliseconds to sleep between creation of HMS connections. Used to debug // IMPALA-825. private final int clientCreationDelayMs_; @@ -46,29 +54,40 @@ public class MetaStoreClientPool { private final Object poolCloseLock_ = new Object(); private final HiveConf hiveConf_; + // Required for creating an instance of RetryingMetaStoreClient. + private static final HiveMetaHookLoader dummyHookLoader = new HiveMetaHookLoader() { + @Override + public HiveMetaHook getHook(org.apache.hadoop.hive.metastore.api.Table tbl) + throws MetaException { + return null; + } + }; + /** - * A wrapper around the HiveMetaStoreClient that manages interactions with the - * connection pool. + * A wrapper around the RetryingMetaStoreClient that manages interactions with the + * connection pool. This implements the AutoCloseable interface and hence the callers + * should use the try-with-resources statement while creating an instance. */ - public class MetaStoreClient { - private final HiveMetaStoreClient hiveClient_; + public class MetaStoreClient implements AutoCloseable { + private final IMetaStoreClient hiveClient_; private boolean isInUse_; private MetaStoreClient(HiveConf hiveConf) { try { LOG.debug("Creating MetaStoreClient. Pool Size = " + clientPool_.size()); - this.hiveClient_ = new HiveMetaStoreClient(hiveConf); + hiveClient_ = RetryingMetaStoreClient.getProxy(hiveConf, dummyHookLoader, + HiveMetaStoreClient.class.getName()); } catch (Exception e) { // Turn in to an unchecked exception throw new IllegalStateException(e); } - this.isInUse_ = false; + isInUse_ = false; } /** - * Returns the internal HiveMetaStoreClient object. + * Returns the internal RetryingMetaStoreClient object. */ - public HiveMetaStoreClient getHiveClient() { + public IMetaStoreClient getHiveClient() { return hiveClient_; } @@ -76,27 +95,26 @@ public class MetaStoreClientPool { * Returns this client back to the connection pool. If the connection pool has been * closed, just close the Hive client connection. */ - public void release() { + @Override + public void close() { Preconditions.checkState(isInUse_); isInUse_ = false; - // Ensure the connection isn't returned to the pool if the pool has been closed. + // Ensure the connection isn't returned to the pool if the pool has been closed + // or if the number of connections in the pool exceeds MAX_HMS_CONNECTION_POOL_SIZE. // This lock is needed to ensure proper behavior when a thread reads poolClosed // is false, but a call to pool.close() comes in immediately afterward. synchronized (poolCloseLock_) { - if (poolClosed_) { + if (poolClosed_ || clientPool_.size() >= MAX_HMS_CONNECTION_POOL_SIZE) { hiveClient_.close(); } else { - // TODO: Currently the pool does not work properly because we cannot - // reuse MetastoreClient connections. No reason to add this client back - // to the pool. See HIVE-5181. - // clientPool.add(this); - hiveClient_.close(); + clientPool_.offer(this); } } } // Marks this client as in use private void markInUse() { + Preconditions.checkState(!isInUse_); isInUse_ = true; } } @@ -106,7 +124,7 @@ public class MetaStoreClientPool { } public MetaStoreClientPool(int initialSize, HiveConf hiveConf) { - this.hiveConf_ = hiveConf; + hiveConf_ = hiveConf; clientCreationDelayMs_ = hiveConf_.getInt(HIVE_METASTORE_CNXN_DELAY_MS_CONF, DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF); addClients(initialSize); @@ -138,18 +156,13 @@ public class MetaStoreClientPool { // The pool was empty so create a new client and return that. // Serialize client creation to defend against possible race conditions accessing // local Kerberos state (see IMPALA-825). - synchronized (this) { - try { - Thread.sleep(clientCreationDelayMs_); - } catch (InterruptedException e) { - /* ignore */ - } - if (client == null) { - client = new MetaStoreClient(hiveConf_); - } else { - // TODO: Due to Hive Metastore bugs, there is leftover state from previous client - // connections so we are unable to reuse the same connection. For now simply - // reconnect each time. One possible culprit is HIVE-5181. + if (client == null) { + synchronized (this) { + try { + Thread.sleep(clientCreationDelayMs_); + } catch (InterruptedException e) { + /* ignore */ + } client = new MetaStoreClient(hiveConf_); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/Table.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Table.java b/fe/src/main/java/com/cloudera/impala/catalog/Table.java index 87f8970..8ac2499 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/Table.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/Table.java @@ -24,7 +24,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -113,7 +113,7 @@ public abstract class Table implements CatalogObject { * Populate members of 'this' from metastore info. If 'reuseMetadata' is true, reuse * valid existing metadata. */ - public abstract void load(boolean reuseMetadata, HiveMetaStoreClient client, + public abstract void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException; public void addColumn(Column col) { @@ -157,7 +157,7 @@ public abstract class Table implements CatalogObject { * errors are logged and ignored, since the absence of column stats is not critical to * the correctness of the system. */ - protected void loadAllColumnStats(HiveMetaStoreClient client) { + protected void loadAllColumnStats(IMetaStoreClient client) { LOG.debug("Loading column stats for table: " + name_); List<ColumnStatisticsObj> colStats; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java b/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java index f935c0b..78b58f6 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java @@ -57,11 +57,9 @@ public class TableLoader { public Table load(Db db, String tblName) { String fullTblName = db.getName() + "." + tblName; LOG.info("Loading metadata for: " + fullTblName); - MetaStoreClient msClient = null; Table table; // turn all exceptions into TableLoadingException - try { - msClient = catalog_.getMetaStoreClient(); + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { org.apache.hadoop.hive.metastore.api.Table msTbl = null; // All calls to getTable() need to be serialized due to HIVE-5457. synchronized (metastoreAccessLock_) { @@ -97,8 +95,6 @@ public class TableLoader { catalog_.getNextTableId(), db, tblName, new TableLoadingException( "Failed to load metadata for table: " + fullTblName + ". Running " + "'invalidate metadata " + fullTblName + "' may resolve this problem.", e)); - } finally { - if (msClient != null) msClient.release(); } return table; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/catalog/View.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/View.java b/fe/src/main/java/com/cloudera/impala/catalog/View.java index f062172..cc82f95 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/View.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/View.java @@ -21,7 +21,7 @@ import java.io.StringReader; import java.util.List; import java.util.Set; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import com.cloudera.impala.analysis.ParseNode; @@ -99,7 +99,7 @@ public class View extends Table { } @Override - public void load(boolean reuseMetadata, HiveMetaStoreClient client, + public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { try { clearColumns(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java b/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java index ec7b6ca..4814503 100644 --- a/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java @@ -470,8 +470,7 @@ public class CatalogOpExecutor { boolean reloadFileMetadata, boolean reloadTableSchema, Set<String> partitionsToUpdate) throws CatalogException { Preconditions.checkState(Thread.holdsLock(tbl)); - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(msClient, tbl); if (tbl instanceof HdfsTable) { @@ -480,8 +479,6 @@ public class CatalogOpExecutor { } else { tbl.load(true, msClient.getHiveClient(), msTbl); } - } finally { - msClient.release(); } tbl.setCatalogVersion(newCatalogVersion); } @@ -544,7 +541,9 @@ public class CatalogOpExecutor { setViewAttributes(params, msTbl); LOG.debug(String.format("Altering view %s", tableName)); applyAlterTable(msTbl); - tbl.load(true, catalog_.getMetaStoreClient().getHiveClient(), msTbl); + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + tbl.load(true, msClient.getHiveClient(), msTbl); + } tbl.setCatalogVersion(newCatalogVersion); addTableToCatalogUpdate(tbl, resp.result); } @@ -583,10 +582,9 @@ public class CatalogOpExecutor { } } - MetaStoreClient msClient = catalog_.getMetaStoreClient(); int numTargetedPartitions = 0; int numUpdatedColumns = 0; - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { // Update the table and partition row counts based on the query results. List<HdfsPartition> modifiedParts = Lists.newArrayList(); if (params.isSetTable_stats()) { @@ -616,8 +614,6 @@ public class CatalogOpExecutor { // Update the table stats. Apply the table alteration last to ensure the // lastDdlTime is as accurate as possible. applyAlterTable(msTbl); - } finally { - msClient.release(); } // Set the results to be reported to the client. @@ -812,33 +808,32 @@ public class CatalogOpExecutor { LOG.debug("Creating database " + dbName); Db newDb = null; synchronized (metastoreDdlLock_) { - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { - msClient.getHiveClient().createDatabase(db); - newDb = catalog_.addDb(dbName, db); - } catch (AlreadyExistsException e) { - if (!params.if_not_exists) { - throw new ImpalaRuntimeException( - String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e); - } - LOG.debug(String.format("Ignoring '%s' when creating database %s because " + - "IF NOT EXISTS was specified.", e, dbName)); - newDb = catalog_.getDb(dbName); - if (newDb == null) { - try { - org.apache.hadoop.hive.metastore.api.Database msDb = - msClient.getHiveClient().getDatabase(dbName); - newDb = catalog_.addDb(dbName, msDb); - } catch (TException e1) { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + try { + msClient.getHiveClient().createDatabase(db); + newDb = catalog_.addDb(dbName, db); + } catch (AlreadyExistsException e) { + if (!params.if_not_exists) { throw new ImpalaRuntimeException( - String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e1); + String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e); } + LOG.debug(String.format("Ignoring '%s' when creating database %s because " + + "IF NOT EXISTS was specified.", e, dbName)); + newDb = catalog_.getDb(dbName); + if (newDb == null) { + try { + org.apache.hadoop.hive.metastore.api.Database msDb = + msClient.getHiveClient().getDatabase(dbName); + newDb = catalog_.addDb(dbName, msDb); + } catch (TException e1) { + throw new ImpalaRuntimeException( + String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e1); + } + } + } catch (TException e) { + throw new ImpalaRuntimeException( + String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e); } - } catch (TException e) { - throw new ImpalaRuntimeException( - String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e); - } finally { - msClient.release(); } Preconditions.checkNotNull(newDb); @@ -1031,8 +1026,7 @@ public class CatalogOpExecutor { private int dropColumnStats(Table table) throws ImpalaRuntimeException { Preconditions.checkState(Thread.holdsLock(table)); int numColsUpdated = 0; - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { for (Column col: table.getColumns()) { // Skip columns that don't have stats. if (!col.getStats().hasStats()) continue; @@ -1051,8 +1045,6 @@ public class CatalogOpExecutor { "delete_table_column_statistics"), e); } } - } finally { - msClient.release(); } return numColsUpdated; } @@ -1127,16 +1119,13 @@ public class CatalogOpExecutor { } TCatalogObject removedObject = new TCatalogObject(); - MetaStoreClient msClient = catalog_.getMetaStoreClient(); synchronized (metastoreDdlLock_) { - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { msClient.getHiveClient().dropDatabase( params.getDb(), true, params.if_exists, params.cascade); } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDatabase"), e); - } finally { - msClient.release(); } Db removedDb = catalog_.removeDb(params.getDb()); // If no db was removed as part of this operation just return the current catalog @@ -1179,7 +1168,6 @@ public class CatalogOpExecutor { // Do nothing } - MetaStoreClient msClient = catalog_.getMetaStoreClient(); Db db = catalog_.getDb(params.getTable_name().db_name); if (db == null) { if (params.if_exists) return; @@ -1203,14 +1191,12 @@ public class CatalogOpExecutor { "not allowed on a " + (params.is_table ? "view: " : "table: ") + tableName; throw new CatalogException(errorMsg); } - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { msClient.getHiveClient().dropTable( tableName.getDb(), tableName.getTbl(), true, params.if_exists, params.purge); } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "dropTable"), e); - } finally { - msClient.release(); } Table table = catalog_.removeTable(params.getTable_name().db_name, @@ -1499,10 +1485,9 @@ public class CatalogOpExecutor { boolean ifNotExists, THdfsCachingOp cacheOp, List<TDistributeParam> distribute_by, TDdlExecResponse response) throws ImpalaException { - MetaStoreClient msClient = catalog_.getMetaStoreClient(); synchronized (metastoreDdlLock_) { - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { msClient.getHiveClient().createTable(newTable); // If this table should be cached, and the table location was not specified by // the user, an extra step is needed to read the table to find the location. @@ -1523,8 +1508,6 @@ public class CatalogOpExecutor { } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e); - } finally { - msClient.release(); } // Forward the operation to a specific storage backend. If the operation fails, @@ -1532,15 +1515,12 @@ public class CatalogOpExecutor { try { createDdlDelegate(newTable).setDistributeParams(distribute_by).createTable(); } catch (ImpalaRuntimeException e) { - MetaStoreClient c = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient c = catalog_.getMetaStoreClient()) { c.getHiveClient().dropTable(newTable.getDbName(), newTable.getTableName(), false, ifNotExists); } catch (Exception hE) { throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR, "dropTable"), hE); - } finally { - c.release(); } throw e; } @@ -1672,10 +1652,9 @@ public class CatalogOpExecutor { Long parentTblCacheDirId = HdfsCachingUtil.getCacheDirectiveId(msTbl.getParameters()); - MetaStoreClient msClient = catalog_.getMetaStoreClient(); partition = createHmsPartition(partitionSpec, msTbl, tableName, location); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { // Add the new partition. partition = msClient.getHiveClient().add_partition(partition); String cachePoolName = null; @@ -1721,8 +1700,6 @@ public class CatalogOpExecutor { } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e); - } finally { - msClient.release(); } if (cacheIds != null) catalog_.watchCacheDirs(cacheIds, tableName.toThrift()); // Return the table object with an updated catalog version after creating the @@ -1764,10 +1741,9 @@ public class CatalogOpExecutor { } } } - MetaStoreClient msClient = catalog_.getMetaStoreClient(); PartitionDropOptions dropOptions = PartitionDropOptions.instance(); dropOptions.purgeData(purge); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { msClient.getHiveClient().dropPartition(tableName.getDb(), tableName.getTbl(), values, dropOptions); updateLastDdlTime(msTbl, msClient); @@ -1784,8 +1760,6 @@ public class CatalogOpExecutor { } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "dropPartition"), e); - } finally { - msClient.release(); } return catalog_.dropPartition(tbl, partitionSpec); } @@ -1827,8 +1801,7 @@ public class CatalogOpExecutor { oldTbl.getMetaStoreTable().deepCopy(); msTbl.setDbName(newTableName.getDb()); msTbl.setTableName(newTableName.getTbl()); - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { // Workaround for HIVE-9720/IMPALA-1711: When renaming a table with column // stats across databases, we save, drop and restore the column stats because // the HMS does not properly move them to the new table via alteration. @@ -1864,8 +1837,6 @@ public class CatalogOpExecutor { } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e); - } finally { - msClient.release(); } // Rename the table in the Catalog and get the resulting catalog object. // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP. @@ -2222,8 +2193,7 @@ public class CatalogOpExecutor { } // Add partitions to metastore. - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { // ifNotExists and needResults are true. hmsPartitions = msClient.getHiveClient().add_partitions(hmsPartitions, true, true); @@ -2252,8 +2222,6 @@ public class CatalogOpExecutor { } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e); - } finally { - msClient.release(); } if (!cacheIds.isEmpty()) { @@ -2305,8 +2273,7 @@ public class CatalogOpExecutor { public boolean addJavaFunctionToHms(String db, org.apache.hadoop.hive.metastore.api.Function fn, boolean ifNotExists) throws ImpalaRuntimeException{ - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { msClient.getHiveClient().createFunction(fn); } catch(AlreadyExistsException e) { if (!ifNotExists) { @@ -2319,8 +2286,6 @@ public class CatalogOpExecutor { fn.getFunctionName(), e); throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "createFunction"), e); - } finally { - msClient.release(); } return true; } @@ -2331,8 +2296,7 @@ public class CatalogOpExecutor { */ public boolean dropJavaFunctionFromHms(String db, String fn, boolean ifExists) throws ImpalaRuntimeException { - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { msClient.getHiveClient().dropFunction(db, fn); } catch (NoSuchObjectException e) { if (!ifExists) { @@ -2344,8 +2308,6 @@ public class CatalogOpExecutor { LOG.error("Error executing dropFunction() metastore call: " + fn, e); throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "dropFunction"), e); - } finally { - msClient.release(); } return true; } @@ -2355,14 +2317,11 @@ public class CatalogOpExecutor { */ private void applyAlterDatabase(Db db) throws ImpalaRuntimeException { - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { msClient.getHiveClient().alterDatabase(db.getName(), db.getMetaStoreDb()); } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "alterDatabase"), e); - } finally { - msClient.release(); } } @@ -2376,9 +2335,8 @@ public class CatalogOpExecutor { */ private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException { - MetaStoreClient msClient = catalog_.getMetaStoreClient(); long lastDdlTime = -1; - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { lastDdlTime = calculateDdlTime(msTbl); msTbl.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime)); msClient.getHiveClient().alter_table( @@ -2387,7 +2345,6 @@ public class CatalogOpExecutor { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e); } finally { - msClient.release(); catalog_.updateLastDdlTime( new TTableName(msTbl.getDbName(), msTbl.getTableName()), lastDdlTime); } @@ -2395,8 +2352,7 @@ public class CatalogOpExecutor { private void applyAlterPartition(Table tbl, HdfsPartition partition) throws ImpalaException { - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { TableName tableName = tbl.getTableName(); msClient.getHiveClient().alter_partition( tableName.getDb(), tableName.getTbl(), partition.toHmsPartition()); @@ -2406,8 +2362,6 @@ public class CatalogOpExecutor { } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partition"), e); - } finally { - msClient.release(); } } @@ -2549,8 +2503,7 @@ public class CatalogOpExecutor { } if (hmsPartitions.size() == 0) return; - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'. for (int i = 0; i < hmsPartitions.size(); i += MAX_PARTITION_UPDATES_PER_RPC) { int numPartitionsToUpdate = @@ -2575,8 +2528,6 @@ public class CatalogOpExecutor { String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partitions"), e); } } - } finally { - msClient.release(); } } @@ -2879,8 +2830,7 @@ public class CatalogOpExecutor { } if (!partsToCreate.isEmpty()) { - MetaStoreClient msClient = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable().deepCopy(); List<org.apache.hadoop.hive.metastore.api.Partition> hmsParts = @@ -2958,8 +2908,6 @@ public class CatalogOpExecutor { "AlreadyExistsException thrown although ifNotExists given", e); } catch (Exception e) { throw new InternalException("Error adding partitions", e); - } finally { - msClient.release(); } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java b/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java index d3618f0..540c749 100644 --- a/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java +++ b/fe/src/main/java/com/cloudera/impala/util/MetaStoreUtil.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.log4j.Logger; @@ -93,7 +93,7 @@ public class MetaStoreUtil { * configuring retires at the connection level so it can be enabled independently. */ public static List<org.apache.hadoop.hive.metastore.api.Partition> fetchAllPartitions( - HiveMetaStoreClient client, String dbName, String tblName, int numRetries) + IMetaStoreClient client, String dbName, String tblName, int numRetries) throws MetaException, TException { Preconditions.checkArgument(numRetries >= 0); int retryAttempt = 0; @@ -124,7 +124,7 @@ public class MetaStoreUtil { * Will throw a MetaException if any partitions in 'partNames' do not exist. */ public static List<Partition> fetchPartitionsByName( - HiveMetaStoreClient client, List<String> partNames, String dbName, String tblName) + IMetaStoreClient client, List<String> partNames, String dbName, String tblName) throws MetaException, TException { LOG.trace(String.format("Fetching %d partitions for: %s.%s using partition " + "batch size: %d", partNames.size(), dbName, tblName, maxPartitionsPerRpc_)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/15d20d8f/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java b/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java index a55da33..9ed09ed 100644 --- a/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java +++ b/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java @@ -420,8 +420,7 @@ public class CatalogTest { // Now attempt to update a column's stats with mismatched stats data and ensure // we get the expected results. - MetaStoreClient client = catalog_.getMetaStoreClient(); - try { + try (MetaStoreClient client = catalog_.getMetaStoreClient()) { // Load some string stats data and use it to update the stats of different // typed columns. ColumnStatisticsData stringColStatsData = client.getHiveClient() @@ -447,10 +446,6 @@ public class CatalogTest { // Now try to apply a matching column stats data and ensure it succeeds. assertTrue(table.getColumn("string_col").updateStats(stringColStatsData)); assertEquals(1178, table.getColumn("string_col").getStats().getNumDistinctValues()); - } finally { - // Make sure to invalidate the metadata so the next test isn't using bad col stats - //catalog_.refreshTable("functional", "alltypesagg", false); - client.release(); } }
