Repository: incubator-impala Updated Branches: refs/heads/master b4343895d -> b27827744
IMPALA-5259: Add REFRESH FUNCTIONS <db> statement Before this patch, Impala relied on INVALIDATE METADATA to load externally added UDFs from HMS. The problem with this approach is that INVALIDATE METADATA affects all databases and tables in the entire cluster. In this patch, we add a REFRESH FUNCTIONS <db> statement that reloads the functions of a database from HMS. We return a list of updated and removed db functions to the issuing Impalad in order to update its local catalog cache. Testing: - Ran a private build which passed. Change-Id: I3625c88bb51cca833f3293c224d3f0feb00e6e0b Reviewed-on: http://gerrit.cloudera.org:8080/6878 Reviewed-by: Taras Bobrovytsky <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/24ff0f2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/24ff0f2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/24ff0f2f Branch: refs/heads/master Commit: 24ff0f2fc21f0111f4c4fa9cbab40105e1e7ee01 Parents: b434389 Author: Taras Bobrovytsky <[email protected]> Authored: Mon May 8 13:18:10 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu May 25 03:30:03 2017 +0000 ---------------------------------------------------------------------- common/thrift/CatalogService.thrift | 3 + fe/src/main/cup/sql-parser.cup | 10 +- .../impala/analysis/ResetMetadataStmt.java | 61 +++++-- .../impala/catalog/CatalogServiceCatalog.java | 63 ++++++++ .../main/java/org/apache/impala/catalog/Db.java | 6 + .../org/apache/impala/catalog/Function.java | 9 ++ .../apache/impala/catalog/ImpaladCatalog.java | 3 + .../impala/service/CatalogOpExecutor.java | 30 ++-- .../org/apache/impala/service/Frontend.java | 18 +-- .../org/apache/impala/analysis/ParserTest.java | 2 + .../org/apache/impala/analysis/ToSqlTest.java | 21 +++ tests/custom_cluster/test_permanent_udfs.py | 160 ++++++++++++++++--- 12 files changed, 323 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/common/thrift/CatalogService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 188f71a..95ac7e9 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -193,6 +193,9 @@ struct TResetMetadataRequest { // If set, refreshes the specified partition, otherwise // refreshes the whole table 5: optional list<CatalogObjects.TPartitionKeyValue> partition_spec + + // If set, refreshes functions in the specified database. + 6: optional string db_name } // Response from TResetMetadataRequest http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/cup/sql-parser.cup ---------------------------------------------------------------------- diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index ebc7804..b88d7d0 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -661,13 +661,15 @@ overwrite_val ::= reset_metadata_stmt ::= KW_INVALIDATE KW_METADATA - {: RESULT = new ResetMetadataStmt(null, false, null); :} + {: RESULT = ResetMetadataStmt.createInvalidateStmt(null); :} | KW_INVALIDATE KW_METADATA table_name:table - {: RESULT = new ResetMetadataStmt(table, false, null); :} + {: RESULT = ResetMetadataStmt.createInvalidateStmt(table); :} | KW_REFRESH table_name:table - {: RESULT = new ResetMetadataStmt(table, true, null); :} + {: RESULT = ResetMetadataStmt.createRefreshTableStmt(table, null); :} | KW_REFRESH table_name:table partition_spec:partition - {: RESULT = new ResetMetadataStmt(table, true, partition); :} + {: RESULT = ResetMetadataStmt.createRefreshTableStmt(table, partition); :} + | KW_REFRESH KW_FUNCTIONS ident_or_default:db + {: RESULT = ResetMetadataStmt.createRefreshFunctionsStmt(db); :} ; explain_stmt ::= http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java index 6f6a3f1..ac7ca2e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java @@ -27,10 +27,16 @@ import org.apache.impala.thrift.TTableName; import com.google.common.base.Preconditions; /** - * Representation of a REFRESH/INVALIDATE METADATA statement. + * Representation of the following statements: + * INVALIDATE METADATA + * INVALIDATE METADATA <table> + * REFRESH <table> + * REFRESH <table> PARTITION <partition> + * REFRESH FUNCTIONS <database> */ public class ResetMetadataStmt extends StatementBase { - // Updated during analysis. Null if invalidating the entire catalog. + // Updated during analysis. Null if invalidating the entire catalog or refreshing + // database functions. private TableName tableName_; // true if it is a REFRESH statement. @@ -39,16 +45,36 @@ public class ResetMetadataStmt extends StatementBase { // not null when refreshing a single partition private final PartitionSpec partitionSpec_; - public ResetMetadataStmt(TableName name, boolean isRefresh, - PartitionSpec partitionSpec) { - Preconditions.checkArgument(!isRefresh || name != null); - Preconditions.checkArgument(isRefresh || partitionSpec == null); - this.tableName_ = name; + // not null when refreshing functions in a database. + private final String database_; + + private ResetMetadataStmt(TableName tableName, boolean isRefresh, + PartitionSpec partitionSpec, String db) { + Preconditions.checkArgument(!isRefresh || (tableName != null || db != null)); + Preconditions.checkArgument(isRefresh || (partitionSpec == null && db == null)); + Preconditions.checkArgument(db == null || ( + tableName == null && isRefresh && partitionSpec == null)); + + this.database_ = db; + this.tableName_ = tableName; this.isRefresh_ = isRefresh; this.partitionSpec_ = partitionSpec; if (partitionSpec_ != null) partitionSpec_.setTableName(tableName_); } + public static ResetMetadataStmt createInvalidateStmt(TableName tableName) { + return new ResetMetadataStmt(tableName, false, null, null); + } + + public static ResetMetadataStmt createRefreshTableStmt(TableName tableName, + PartitionSpec partitionSpec) { + return new ResetMetadataStmt(tableName, true, partitionSpec, null); + } + + public static ResetMetadataStmt createRefreshFunctionsStmt(String database) { + return new ResetMetadataStmt(null, true, null, database); + } + public TableName getTableName() { return tableName_; } @Override @@ -85,25 +111,28 @@ public class ResetMetadataStmt extends StatementBase { public String toSql() { StringBuilder result = new StringBuilder(); if (isRefresh_) { - result.append("INVALIDATE METADATA"); - } else { result.append("REFRESH"); + if (database_ == null) { + result.append(" ").append(tableName_); + if (partitionSpec_ != null) result.append(" " + partitionSpec_.toSql()); + } else { + result.append(" FUNCTIONS ").append(database_); + } + } else { + result.append("INVALIDATE METADATA"); + if (tableName_ != null) result.append(" ").append(tableName_); } - - if (tableName_ != null) result.append(" ").append(tableName_); - if (partitionSpec_ != null) result.append(" " + partitionSpec_.toSql()); return result.toString(); } public TResetMetadataRequest toThrift() { - TResetMetadataRequest params = new TResetMetadataRequest(); + TResetMetadataRequest params = new TResetMetadataRequest(); params.setIs_refresh(isRefresh_); if (tableName_ != null) { params.setTable_name(new TTableName(tableName_.getDb(), tableName_.getTbl())); } - if (partitionSpec_ != null) { - params.setPartition_spec(partitionSpec_.toThrift()); - } + if (partitionSpec_ != null) params.setPartition_spec(partitionSpec_.toThrift()); + if (database_ != null) params.setDb_name(database_); return params; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 18ece9b..d2a0a82 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.ResourceType; import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.ql.exec.FunctionUtils; +import org.apache.impala.analysis.Analyzer; import org.apache.impala.authorization.SentryConfig; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.common.FileSystemUtil; @@ -604,6 +605,68 @@ public class CatalogServiceCatalog extends Catalog { } /** + * Reloads function metadata for 'dbName' database. Populates the 'addedFuncs' list + * with functions that were added as a result of this operation. Populates the + * 'removedFuncs' list with functions that were removed. + */ + public void refreshFunctions(MetaStoreClient msClient, String dbName, + List<TCatalogObject> addedFuncs, List<TCatalogObject> removedFuncs) + throws CatalogException { + // Create a temporary database that will contain all the functions from the HMS. + Db tmpDb; + try { + List<org.apache.hadoop.hive.metastore.api.Function> javaFns = + Lists.newArrayList(); + for (String javaFn : msClient.getHiveClient().getFunctions(dbName, "*")) { + javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn)); + } + // Contains native functions in it's params map. + org.apache.hadoop.hive.metastore.api.Database msDb = + msClient.getHiveClient().getDatabase(dbName); + tmpDb = new Db(dbName, this, null); + // Load native UDFs into the temporary db. + loadFunctionsFromDbParams(tmpDb, msDb); + // Load Java UDFs from HMS into the temporary db. + loadJavaFunctions(tmpDb, javaFns); + + Db db = dbCache_.get().get(dbName); + if (db == null) { + throw new DatabaseNotFoundException("Database does not exist: " + dbName); + } + // Load transient functions into the temporary db. + for (Function fn: db.getTransientFunctions()) tmpDb.addFunction(fn); + + // Compute the removed functions and remove them from the db. + for (Map.Entry<String, List<Function>> e: db.getAllFunctions().entrySet()) { + for (Function fn: e.getValue()) { + if (tmpDb.getFunction( + fn, Function.CompareMode.IS_INDISTINGUISHABLE) == null) { + fn.setCatalogVersion(incrementAndGetCatalogVersion()); + removedFuncs.add(fn.toTCatalogObject()); + } + } + } + + // We will re-add all the functions to the db because it's possible that a + // function was dropped and a different function (for example, the binary is + // different) with the same name and signature was re-added in Hive. + db.removeAllFunctions(); + for (Map.Entry<String, List<Function>> e: tmpDb.getAllFunctions().entrySet()) { + for (Function fn: e.getValue()) { + // We do not need to increment and acquire a new catalog version for this + // function here because this already happens when the functions are loaded + // into tmpDb. + db.addFunction(fn); + addedFuncs.add(fn.toTCatalogObject()); + } + } + + } catch (Exception e) { + throw new CatalogException("Error refreshing functions in " + dbName + ": ", e); + } + } + + /** * Invalidates the database 'db'. This method can have potential race * conditions with external changes to the Hive metastore and hence any * conflicting changes to the objects can manifest in the form of exceptions http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/catalog/Db.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java index d8f5719..074ff92 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Db.java +++ b/fe/src/main/java/org/apache/impala/catalog/Db.java @@ -365,6 +365,12 @@ public class Db implements CatalogObject { } } + public void removeAllFunctions() { + synchronized (functions_) { + functions_.clear(); + } + } + /** * Removes a Function with the matching signature string. Returns the removed Function * if a Function was removed as a result of this call, null otherwise. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/catalog/Function.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Function.java b/fe/src/main/java/org/apache/impala/catalog/Function.java index f7f2632..2f0859f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Function.java +++ b/fe/src/main/java/org/apache/impala/catalog/Function.java @@ -25,6 +25,7 @@ import org.apache.impala.common.AnalysisException; import org.apache.impala.common.InternalException; import org.apache.impala.service.FeSupport; import org.apache.impala.thrift.TAggregateFunction; +import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TColumnType; import org.apache.impala.thrift.TFunction; @@ -310,6 +311,14 @@ public class Function implements CatalogObject { // Child classes must override this function. public String toSql(boolean ifNotExists) { return ""; } + public TCatalogObject toTCatalogObject () { + TCatalogObject result = new TCatalogObject(); + result.setType(TCatalogObjectType.FUNCTION); + result.setFn(toThrift()); + result.setCatalog_version(catalogVersion_); + return result; + } + public TFunction toThrift() { TFunction fn = new TFunction(); fn.setSignature(signatureString()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index 521a844..45db6a2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -253,6 +253,9 @@ public class ImpaladCatalog extends Catalog { addTable(catalogObject.getTable(), catalogObject.getCatalog_version()); break; case FUNCTION: + // Remove the function first, in case there is an existing function with the same + // name and signature. + removeFunction(catalogObject.getFn(), catalogObject.getCatalog_version()); addFunction(catalogObject.getFn(), catalogObject.getCatalog_version()); break; case DATA_SOURCE: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 3f7365f..c0246e8 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -987,14 +987,6 @@ public class CatalogOpExecutor { resp.result.getUpdated_catalog_object_DEPRECATED().getCatalog_version()); } - private TCatalogObject buildTCatalogFnObject(Function fn) { - TCatalogObject result = new TCatalogObject(); - result.setType(TCatalogObjectType.FUNCTION); - result.setFn(fn.toThrift()); - result.setCatalog_version(fn.getCatalogVersion()); - return result; - } - private void createFunction(TCreateFunctionParams params, TDdlExecResponse resp) throws ImpalaException { Function fn = Function.fromThrift(params.getFn()); @@ -1042,14 +1034,14 @@ public class CatalogOpExecutor { addedFn.signatureString())); } Preconditions.checkState(catalog_.addFunction(addedFn)); - addedFunctions.add(buildTCatalogFnObject(addedFn)); + addedFunctions.add(addedFn.toTCatalogObject()); } } } else { if (catalog_.addFunction(fn)) { // Flush DB changes to metastore applyAlterDatabase(catalog_.getDb(fn.dbName())); - addedFunctions.add(buildTCatalogFnObject(fn)); + addedFunctions.add(fn.toTCatalogObject()); } } @@ -1514,7 +1506,7 @@ public class CatalogOpExecutor { continue; } Preconditions.checkNotNull(catalog_.removeFunction(fn)); - removedFunctions.add(buildTCatalogFnObject(fn)); + removedFunctions.add(fn.toTCatalogObject()); } } else { ArrayList<Type> argTypes = Lists.newArrayList(); @@ -1531,7 +1523,7 @@ public class CatalogOpExecutor { } else { // Flush DB changes to metastore applyAlterDatabase(catalog_.getDb(fn.dbName())); - removedFunctions.add(buildTCatalogFnObject(fn)); + removedFunctions.add(fn.toTCatalogObject()); } } @@ -3099,7 +3091,19 @@ public class CatalogOpExecutor { resp.setResult(new TCatalogUpdateResult()); resp.getResult().setCatalog_service_id(JniCatalog.getServiceId()); - if (req.isSetTable_name()) { + if (req.isSetDb_name()) { + // This is a "refresh functions" operation. + synchronized (metastoreDdlLock_) { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + List<TCatalogObject> addedFuncs = Lists.newArrayList(); + List<TCatalogObject> removedFuncs = Lists.newArrayList(); + catalog_.refreshFunctions(msClient, req.getDb_name(), addedFuncs, removedFuncs); + resp.result.setUpdated_catalog_objects(addedFuncs); + resp.result.setRemoved_catalog_objects(removedFuncs); + resp.result.setVersion(catalog_.getCatalogVersion()); + } + } + } else if (req.isSetTable_name()) { // Results of an invalidate operation, indicating whether the table was removed // from the Metastore, and whether a new database was added to Impala as a result // of the invalidate operation. Always false for refresh. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 13faba5..0345260 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -222,20 +222,20 @@ public class Frontend { TUpdateCatalogCacheRequest req) throws CatalogException { ImpaladCatalog catalog = impaladCatalog_; + if (req.is_delta) return catalog.updateCatalog(req); + // If this is not a delta, this update should replace the current // Catalog contents so create a new catalog and populate it. - if (!req.is_delta) catalog = new ImpaladCatalog(defaultKuduMasterHosts_); + catalog = new ImpaladCatalog(defaultKuduMasterHosts_); TUpdateCatalogCacheResponse response = catalog.updateCatalog(req); - if (!req.is_delta) { - // This was not a delta update. Now that the catalog has been updated, - // replace the references to impaladCatalog_/authzChecker_ ensure - // clients continue don't see the catalog disappear. - impaladCatalog_ = catalog; - authzChecker_.set(new AuthorizationChecker(authzConfig_, - impaladCatalog_.getAuthPolicy())); - } + // Now that the catalog has been updated, replace the references to + // impaladCatalog_/authzChecker_. This ensures that clients don't see + // the catalog disappear. + impaladCatalog_ = catalog; + authzChecker_.set(new AuthorizationChecker(authzConfig_, + impaladCatalog_.getAuthPolicy())); return response; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/test/java/org/apache/impala/analysis/ParserTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index 3650ace..bda9125 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -3076,6 +3076,7 @@ public class ParserTest extends FrontendTestBase { ParsesOk("refresh Foo partition (col=2)"); ParsesOk("refresh Foo.S partition (col=2)"); ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3)"); + ParsesOk("refresh functions Foo"); ParserError("invalidate"); ParserError("invalidate metadata Foo.S.S"); @@ -3085,6 +3086,7 @@ public class ParserTest extends FrontendTestBase { ParserError("refresh"); ParserError("refresh Foo.S partition (col1 = 2, col2)"); ParserError("refresh Foo.S partition ()"); + ParserError("refresh functions Foo.S"); } @Test http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java index 98e0e3a..3094df4 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java @@ -1308,4 +1308,25 @@ public class ToSqlTest extends FrontendTestBase { "WITH t AS (SELECT * FROM functional.alltypes TABLESAMPLE SYSTEM(5)) " + "SELECT * FROM t"); } + + /** + * Tests invalidate statements are output correctly. + */ + @Test + public void testInvalidate() { + testToSql("INVALIDATE METADATA", "INVALIDATE METADATA"); + testToSql("INVALIDATE METADATA functional.alltypes", + "INVALIDATE METADATA functional.alltypes"); + } + + /** + * Tests refresh statements are output correctly. + */ + @Test + public void testRefresh() { + testToSql("REFRESH functional.alltypes", "REFRESH functional.alltypes"); + testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)", + "REFRESH functional.alltypes PARTITION (year=2009, month=1)"); + testToSql("REFRESH FUNCTIONS functional", "REFRESH FUNCTIONS functional"); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/tests/custom_cluster/test_permanent_udfs.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_permanent_udfs.py b/tests/custom_cluster/test_permanent_udfs.py index ae1c19f..3823c55 100644 --- a/tests/custom_cluster/test_permanent_udfs.py +++ b/tests/custom_cluster/test_permanent_udfs.py @@ -18,6 +18,7 @@ import glob import os import pytest +import re import shutil import subprocess @@ -28,8 +29,9 @@ from tests.common.test_dimensions import create_uncompressed_text_dimension from tests.util.filesystem_utils import get_fs_path class TestUdfPersistence(CustomClusterTestSuite): - """ Tests the behavior of UDFs and UDAs between catalog restarts. With IMPALA-1748, these - functions are persisted to the metastore and are loaded again during catalog startup""" + """ Tests the behavior of UDFs and UDAs between catalog restarts. With IMPALA-1748, + these functions are persisted to the metastore and are loaded again during catalog + startup""" DATABASE = 'udf_permanent_test' JAVA_FN_TEST_DB = 'java_permanent_test' @@ -183,7 +185,7 @@ class TestUdfPersistence(CustomClusterTestSuite): @SkipIfLocal.hive @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - catalogd_args= "--local_library_dir=%s" % LOCAL_LIBRARY_DIR) + catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR)) def test_java_udfs_hive_integration(self): ''' This test checks the integration between Hive and Impala on CREATE FUNCTION and DROP FUNCTION statements for persistent Java UDFs. @@ -215,24 +217,140 @@ class TestUdfPersistence(CustomClusterTestSuite): assert "does not exist" in hive_stdout # Create the same set of functions from Hive and make sure they are visible - # in Impala. - for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS: - self.run_stmt_in_hive(self.CREATE_HIVE_UDF_TEMPLATE.format( - db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn, - location=self.HIVE_UDF_JAR, symbol=fn_symbol)) - self.client.execute("INVALIDATE METADATA") - for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS: - result = self.client.execute("SHOW FUNCTIONS IN %s" % - self.HIVE_IMPALA_INTEGRATION_DB) - assert result is not None and len(result.data) > 0 and\ - fn in str(result.data) - self.__verify_udf_in_impala(fn) - # Drop the function in Hive and make sure it reflects in Impala. - self.run_stmt_in_hive(self.DROP_JAVA_UDF_TEMPLATE.format( - db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn)) - self.client.execute("INVALIDATE METADATA") - self.verify_function_count( - "SHOW FUNCTIONS in {0}".format(self.HIVE_IMPALA_INTEGRATION_DB), 0) + # in Impala. There are two ways to make functions visible in Impala: invalidate + # metadata and refresh functions <db>. + REFRESH_COMMANDS = ["INVALIDATE METADATA", + "REFRESH FUNCTIONS {0}".format(self.HIVE_IMPALA_INTEGRATION_DB)] + for refresh_command in REFRESH_COMMANDS: + for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS: + self.run_stmt_in_hive(self.CREATE_HIVE_UDF_TEMPLATE.format( + db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn, + location=self.HIVE_UDF_JAR, symbol=fn_symbol)) + self.client.execute(refresh_command) + for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS: + result = self.client.execute("SHOW FUNCTIONS IN {0}".format( + self.HIVE_IMPALA_INTEGRATION_DB)) + assert result is not None and len(result.data) > 0 and\ + fn in str(result.data) + self.__verify_udf_in_impala(fn) + # Drop the function in Hive and make sure it reflects in Impala. + self.run_stmt_in_hive(self.DROP_JAVA_UDF_TEMPLATE.format( + db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn)) + self.client.execute(refresh_command) + self.verify_function_count( + "SHOW FUNCTIONS in {0}".format(self.HIVE_IMPALA_INTEGRATION_DB), 0) + # Make sure we deleted all the temporary jars we copied to the local fs + assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0 + + @SkipIfIsilon.hive + @SkipIfS3.hive + @SkipIfLocal.hive + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR)) + def test_refresh_native(self): + ''' This test checks that a native function is visible in Impala after a + REFRESH FUNCTIONS command. We will add the native function through Hive + by setting DBPROPERTIES of a database.''' + # First we create the function in Impala. + create_func_impala = ("create function {database}.identity_tmp(bigint) " + "returns bigint location '{location}' symbol='Identity'") + self.client.execute(create_func_impala.format( + database=self.HIVE_IMPALA_INTEGRATION_DB, + location=get_fs_path('/test-warehouse/libTestUdfs.so'))) + + # Impala puts the native function into a database property table. We extract the key + # value pair that represents the function from the table. + describe_db_hive = "DESCRIBE DATABASE EXTENDED {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB) + result = self.run_stmt_in_hive(describe_db_hive) + regex = r"{(.*?)=(.*?)}" + match = re.search(regex, result) + func_name = match.group(1) + func_contents = match.group(2) + + # Recreate the database, this deletes the function. + self.client.execute("DROP DATABASE {database} CASCADE".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + self.client.execute("CREATE DATABASE {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + result = self.client.execute("SHOW FUNCTIONS IN {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + assert result is not None and len(result.data) == 0 + + # Place the function into the recreated database by modifying it's properties. + alter_db_hive = "ALTER DATABASE {database} SET DBPROPERTIES ('{fn_name}'='{fn_val}')" + self.run_stmt_in_hive(alter_db_hive.format( + database=self.HIVE_IMPALA_INTEGRATION_DB, + fn_name=func_name, + fn_val=func_contents)) + result = self.client.execute("SHOW FUNCTIONS IN {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + assert result is not None and len(result.data) == 0 + + # The function should be visible in Impala after a REFRESH FUNCTIONS. + self.client.execute("REFRESH FUNCTIONS {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + result = self.client.execute("SHOW FUNCTIONS IN {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + assert result is not None and len(result.data) > 0 and\ + "identity_tmp" in str(result.data) + + # Verify that the function returns a correct result. + result = self.client.execute("SELECT {database}.identity_tmp(10)".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + assert result.data[0] == "10" + # Make sure we deleted all the temporary jars we copied to the local fs + assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0 + + @SkipIfIsilon.hive + @SkipIfS3.hive + @SkipIfLocal.hive + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR)) + def test_refresh_replace(self): + ''' This test checks that if we drop a function and then create a + different function with the same name in Hive, the new function will + be visible in Impala after REFRESH FUNCTIONS.''' + # Create an original function. + create_orig_func_hive = ("create function {database}.test_func as " + "'org.apache.hadoop.hive.ql.udf.UDFHex' using jar '{jar}'") + self.run_stmt_in_hive(create_orig_func_hive.format( + database=self.HIVE_IMPALA_INTEGRATION_DB, jar=self.JAVA_UDF_JAR)) + result = self.client.execute("SHOW FUNCTIONS IN {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + assert result is not None and len(result.data) == 0 + # Verify the function becomes visible in Impala after REFRESH FUNCTIONS. + self.client.execute("REFRESH FUNCTIONS {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + result = self.client.execute("SHOW FUNCTIONS IN {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + assert (result is not None and len(result.data) == 3 and + "test_func" in str(result.data)) + result = self.client.execute("SELECT {database}.test_func(123)".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + assert result.data[0] == "7B" + + # Drop the original function and create a different function with the same name as + # the original, but a different JAR. + drop_orig_func_hive = "DROP FUNCTION {database}.test_func" + self.run_stmt_in_hive(drop_orig_func_hive.format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + create_replacement_func_hive = ("create function {database}.test_func as " + "'org.apache.hadoop.hive.ql.udf.UDFBin' using jar '{jar}'") + self.run_stmt_in_hive(create_replacement_func_hive.format( + database=self.HIVE_IMPALA_INTEGRATION_DB, jar=self.JAVA_UDF_JAR)) + self.client.execute("REFRESH FUNCTIONS {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + result = self.client.execute("SHOW FUNCTIONS IN {database}".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + assert (result is not None and len(result.data) == 1 and + "test_func" in str(result.data)) + # Verify that the function has actually been updated. + result = self.client.execute("SELECT {database}.test_func(123)".format( + database=self.HIVE_IMPALA_INTEGRATION_DB)) + assert result.data[0] == "1111011" # Make sure we deleted all the temporary jars we copied to the local fs assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
