IMPALA-2518: DROP DATABASE CASCADE removes cache directives of tables This commit fixes an issue where the DROP DATABASE CASCADE statement will not remove the cache directives of the underlying tables and their partitions.
Change-Id: I83ef5a33e06728c2b3f833a0309d9da64dce7b88 Reviewed-on: http://gerrit.cloudera.org:8080/5815 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3ee98107 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3ee98107 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3ee98107 Branch: refs/heads/master Commit: 3ee981079d0dd086d656a1ecd54bc873b91c67e3 Parents: 3b36e93 Author: Dimitris Tsirogiannis <[email protected]> Authored: Fri Jan 27 15:18:42 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Feb 3 04:52:46 2017 +0000 ---------------------------------------------------------------------- .../impala/service/CatalogOpExecutor.java | 68 ++++++++++++-------- .../org/apache/impala/util/HdfsCachingUtil.java | 13 ++-- tests/query_test/test_hdfs_caching.py | 19 ++++++ 3 files changed, 66 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3ee98107/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 3c1dad8..76571cc 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -1241,7 +1241,8 @@ public class CatalogOpExecutor { /** * Drops a database from the metastore and removes the database's metadata from the - * internal cache. Re-throws any HMS exceptions encountered during the drop. + * internal cache. Attempts to remove the HDFS cache directives of the underlying + * tables. Re-throws any HMS exceptions encountered during the drop. */ private void dropDatabase(TDropDbParams params, TDdlExecResponse resp) throws ImpalaException { @@ -1273,6 +1274,10 @@ public class CatalogOpExecutor { resp.result.setVersion(catalog_.getCatalogVersion()); return; } + // Make sure the cache directives, if any, of the underlying tables are removed + for (String tableName: removedDb.getAllTableNames()) { + uncacheTable(removedDb.getTable(tableName)); + } removedObject.setCatalog_version(removedDb.getCatalogVersion()); } removedObject.setType(TCatalogObjectType.DATABASE); @@ -1394,28 +1399,7 @@ public class CatalogOpExecutor { return; } resp.result.setVersion(table.getCatalogVersion()); - if (table instanceof HdfsTable) { - HdfsTable hdfsTable = (HdfsTable) table; - if (hdfsTable.isMarkedCached()) { - try { - HdfsCachingUtil.uncacheTbl(table.getMetaStoreTable()); - } catch (Exception e) { - LOG.error("Unable to uncache table: " + table.getFullName(), e); - } - } - if (table.getNumClusteringCols() > 0) { - for (HdfsPartition partition: hdfsTable.getPartitions()) { - if (partition.isMarkedCached()) { - try { - HdfsCachingUtil.uncachePartition(partition); - } catch (Exception e) { - LOG.error("Unable to uncache partition: " + - partition.getPartitionName(), e); - } - } - } - } - } + uncacheTable(table); } removedObject.setType(TCatalogObjectType.TABLE); removedObject.setTable(new TTable()); @@ -1426,6 +1410,34 @@ public class CatalogOpExecutor { } /** + * Drops all associated caching requests on the table and/or table's partitions, + * uncaching all table data, if applicable. Throws no exceptions, only logs errors. + * Does not update the HMS. + */ + private static void uncacheTable(Table table) { + if (!(table instanceof HdfsTable)) return; + HdfsTable hdfsTable = (HdfsTable) table; + if (hdfsTable.isMarkedCached()) { + try { + HdfsCachingUtil.removeTblCacheDirective(table.getMetaStoreTable()); + } catch (Exception e) { + LOG.error("Unable to uncache table: " + table.getFullName(), e); + } + } + if (table.getNumClusteringCols() > 0) { + for (HdfsPartition part: hdfsTable.getPartitions()) { + if (part.isMarkedCached()) { + try { + HdfsCachingUtil.removePartitionCacheDirective(part); + } catch (Exception e) { + LOG.error("Unable to uncache partition: " + part.getPartitionName(), e); + } + } + } + } + } + + /** * Truncate a table by deleting all files in its partition directories, and dropping * all column and table statistics. Acquires a table lock to protect against * concurrent table modifications. @@ -2025,7 +2037,7 @@ public class CatalogOpExecutor { part.getPartitionValuesAsStrings(true), dropOptions); ++numTargetedPartitions; if (part.isMarkedCached()) { - HdfsCachingUtil.uncachePartition(part); + HdfsCachingUtil.removePartitionCacheDirective(part); } } catch (NoSuchObjectException e) { if (!ifExists) { @@ -2396,7 +2408,7 @@ public class CatalogOpExecutor { catalog_.watchCacheDirs(cacheDirIds, tableName.toThrift()); } else { // Uncache the table. - if (cacheDirId != null) HdfsCachingUtil.uncacheTbl(msTbl); + if (cacheDirId != null) HdfsCachingUtil.removeTblCacheDirective(msTbl); // Uncache all table partitions. if (tbl.getNumClusteringCols() > 0) { for (HdfsPartition partition: hdfsTable.getPartitions()) { @@ -2404,7 +2416,7 @@ public class CatalogOpExecutor { continue; } if (partition.isMarkedCached()) { - HdfsCachingUtil.uncachePartition(partition); + HdfsCachingUtil.removePartitionCacheDirective(partition); try { applyAlterPartition(tbl, partition); } finally { @@ -2472,7 +2484,7 @@ public class CatalogOpExecutor { } else { for (HdfsPartition partition : partitions) { if (partition.isMarkedCached()) { - HdfsCachingUtil.uncachePartition(partition); + HdfsCachingUtil.removePartitionCacheDirective(partition); modifiedParts.add(partition); } } @@ -3156,7 +3168,7 @@ public class CatalogOpExecutor { for (org.apache.hadoop.hive.metastore.api.Partition part: cachedHmsParts) { try { - HdfsCachingUtil.uncachePartition(part); + HdfsCachingUtil.removePartitionCacheDirective(part); } catch (ImpalaException e1) { String msg = String.format( "Partition %s.%s(%s): State: Leaked caching directive. " + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3ee98107/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java b/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java index a14c810..a52ad91 100644 --- a/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java +++ b/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java @@ -127,8 +127,8 @@ public class HdfsCachingUtil { * Removes the cache directive associated with the table from HDFS, uncaching all * data. Also updates the table's metadata. No-op if the table is not cached. */ - public static void uncacheTbl(org.apache.hadoop.hive.metastore.api.Table table) - throws ImpalaRuntimeException { + public static void removeTblCacheDirective( + org.apache.hadoop.hive.metastore.api.Table table) throws ImpalaRuntimeException { Preconditions.checkNotNull(table); if (LOG.isTraceEnabled()) { LOG.trace("Uncaching table: " + table.getDbName() + "." + table.getTableName()); @@ -145,7 +145,8 @@ public class HdfsCachingUtil { * data. Also updates the partition's metadata to remove the cache directive ID. * No-op if the table is not cached. */ - public static void uncachePartition(HdfsPartition part) throws ImpalaException { + public static void removePartitionCacheDirective(HdfsPartition part) + throws ImpalaException { Preconditions.checkNotNull(part); Long id = getCacheDirectiveId(part.getParameters()); if (id == null) return; @@ -156,10 +157,10 @@ public class HdfsCachingUtil { /** * Convenience method for working directly on a metastore partition. See - * uncachePartition(HdfsPartition) for more details. + * removePartitionCacheDirective(HdfsPartition) for more details. */ - public static void uncachePartition( - org.apache.hadoop.hive.metastore.api.Partition part) throws ImpalaException { + public static void removePartitionCacheDirective( + org.apache.hadoop.hive.metastore.api.Partition part) throws ImpalaException { Preconditions.checkNotNull(part); Long id = getCacheDirectiveId(part.getParameters()); if (id == null) return; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3ee98107/tests/query_test/test_hdfs_caching.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_hdfs_caching.py b/tests/query_test/test_hdfs_caching.py index c0f9cfb..a837ce3 100644 --- a/tests/query_test/test_hdfs_caching.py +++ b/tests/query_test/test_hdfs_caching.py @@ -208,6 +208,25 @@ class TestHdfsCachingDdl(ImpalaTestSuite): assert num_entries_pre == get_num_cache_requests() @pytest.mark.execute_serially + def test_caching_ddl_drop_database(self, vector): + """IMPALA-2518: DROP DATABASE CASCADE should properly drop all impacted cache + directives""" + num_entries_pre = get_num_cache_requests() + # Populates the `cachedb` database with some cached tables and partitions + self.client.execute("use cachedb") + self.client.execute("create table cached_tbl_nopart (i int) cached in 'testPool'") + self.client.execute("insert into cached_tbl_nopart select 1") + self.client.execute("create table cached_tbl_part (i int) partitioned by (j int) \ + cached in 'testPool'") + self.client.execute("insert into cached_tbl_part (i,j) select 1, 2") + # We expect the number of cached entities to grow + assert num_entries_pre < get_num_cache_requests() + self.client.execute("use default") + self.client.execute("drop database cachedb cascade") + # We want to see the number of cached entities return to the original count + assert num_entries_pre == get_num_cache_requests() + + @pytest.mark.execute_serially def test_cache_reload_validation(self, vector): """This is a set of tests asserting that cache directives modified outside of Impala are picked up after reload, cf IMPALA-1645"""
