Repository: incubator-impala Updated Branches: refs/heads/master 1af0aa4ad -> 32294220c
IMPALA-3983/IMPALA-3974: Delete function jar resources after load The Catalog copies the UDF jar files to the local file system to load the Java UDF classes for validation purposes. However we do not clean them up after the UDF load and hence on a deployment with large number of functions registered, these jar can accumulate over a period of time and can fill up the tmp space. We fix it by deleting the jar resource once the function is loaded. Also, this patch switches to --local_library_dir for copying these temporary jars instead of using the path from java.io.tmpdir. Change-Id: I5f9dedb5b342415380c83e61a72eb497371a8199 Reviewed-on: http://gerrit.cloudera.org:8080/4617 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c03cfe51 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c03cfe51 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c03cfe51 Branch: refs/heads/master Commit: c03cfe51fc6640a4b5d0587582123b56b7575888 Parents: 1af0aa4 Author: Bharath Vissapragada <[email protected]> Authored: Mon Oct 3 14:34:59 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Fri Nov 4 00:07:18 2016 +0000 ---------------------------------------------------------------------- be/src/catalog/catalog.cc | 7 +++++-- .../apache/impala/catalog/CatalogServiceCatalog.java | 15 +++++++++------ .../org/apache/impala/common/FileSystemUtil.java | 13 +++++++++++++ .../java/org/apache/impala/service/JniCatalog.java | 6 ++++-- .../impala/testutil/CatalogServiceTestCatalog.java | 3 ++- tests/custom_cluster/test_permanent_udfs.py | 9 +++++++++ 6 files changed, 42 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/be/src/catalog/catalog.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc index 44771c8..9e7af4e 100644 --- a/be/src/catalog/catalog.cc +++ b/be/src/catalog/catalog.cc @@ -30,6 +30,7 @@ using namespace impala; DECLARE_bool(load_auth_to_local_rules); DECLARE_string(principal); +DECLARE_string(local_library_dir); DEFINE_bool(load_catalog_in_background, false, "If true, loads catalog metadata in the background. If false, metadata is loaded " @@ -44,7 +45,8 @@ DECLARE_int32(non_impala_java_vlog); Catalog::Catalog() { JniMethodDescriptor methods[] = { - {"<init>", "(ZILjava/lang/String;IIZLjava/lang/String;)V", &catalog_ctor_}, + {"<init>", "(ZILjava/lang/String;IIZLjava/lang/String;Ljava/lang/String;)V", + &catalog_ctor_}, {"updateCatalog", "([B)[B", &update_metastore_id_}, {"execDdl", "([B)[B", &exec_ddl_id_}, {"resetMetadata", "([B)[B", &reset_metadata_id_}, @@ -74,10 +76,11 @@ Catalog::Catalog() { // and impala is kerberized. jboolean auth_to_local = FLAGS_load_auth_to_local_rules && !FLAGS_principal.empty(); jstring principal = jni_env->NewStringUTF(FLAGS_principal.c_str()); + jstring local_library_dir = jni_env->NewStringUTF(FLAGS_local_library_dir.c_str()); jobject catalog = jni_env->NewObject(catalog_class_, catalog_ctor_, load_in_background, num_metadata_loading_threads, sentry_config, FlagToTLogLevel(FLAGS_v), FlagToTLogLevel(FLAGS_non_impala_java_vlog), - auth_to_local, principal); + auth_to_local, principal, local_library_dir); EXIT_IF_EXC(jni_env); ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, catalog, &catalog_)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 10f6e4c..f56f502 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -145,15 +145,15 @@ public class CatalogServiceCatalog extends Catalog { private final SentryProxy sentryProxy_; // Local temporary directory to copy UDF Jars. - private static final String LOCAL_LIBRARY_PATH = new String("file://" + - System.getProperty("java.io.tmpdir")); + private static String localLibraryPath_; /** * Initialize the CatalogServiceCatalog. If loadInBackground is true, table metadata * will be loaded in the background */ public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads, - SentryConfig sentryConfig, TUniqueId catalogServiceId, String kerberosPrincipal) { + SentryConfig sentryConfig, TUniqueId catalogServiceId, String kerberosPrincipal, + String localLibraryPath) { super(true); catalogServiceId_ = catalogServiceId; tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads); @@ -173,6 +173,7 @@ public class CatalogServiceCatalog extends Catalog { } else { sentryProxy_ = null; } + localLibraryPath_ = new String("file://" + localLibraryPath); } /** @@ -428,7 +429,7 @@ public class CatalogServiceCatalog extends Catalog { /** * Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF * class referred to by the given Java function. This method copies the UDF Jar - * referenced by "function" to a temporary file in "LOCAL_LIBRARY_PATH" and loads it + * referenced by "function" to a temporary file in localLibraryPath_ and loads it * into the jvm. Then we scan all the methods in the class using reflection and extract * those methods and create corresponding Impala functions. Currently Impala supports * only "JAR" files for symbols and also a single Jar containing all the dependent @@ -447,9 +448,9 @@ public class CatalogServiceCatalog extends Catalog { } String jarUri = function.getResourceUris().get(0).getUri(); Class<?> udfClass = null; + Path localJarPath = null; try { - Path localJarPath = new Path(LOCAL_LIBRARY_PATH, - UUID.randomUUID().toString() + ".jar"); + localJarPath = new Path(localLibraryPath_, UUID.randomUUID().toString() + ".jar"); try { FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath); } catch (IOException e) { @@ -501,6 +502,8 @@ public class CatalogServiceCatalog extends Catalog { function.getFunctionName(); LOG.error(errorMsg); throw new ImpalaRuntimeException(errorMsg, e); + } finally { + if (localJarPath != null) FileSystemUtil.deleteIfExists(localJarPath); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index d9fd6e8..c81e27f 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -377,6 +377,19 @@ public class FileSystemUtil { } /** + * Delete the file at 'path' if it exists. + */ + public static void deleteIfExists(Path path) { + try { + FileSystem fs = path.getFileSystem(CONF); + if (!fs.exists(path)) return; + fs.delete(path); + } catch (IOException e) { + LOG.warn("Encountered an exception deleting file at path " + path.toString(), e); + } + } + + /** * Returns true if the given path is a location which supports caching (e.g. HDFS). */ public static boolean isPathCacheable(Path path) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/fe/src/main/java/org/apache/impala/service/JniCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java index 7d0af54..b35877f 100644 --- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java +++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java @@ -81,7 +81,8 @@ public class JniCatalog { public JniCatalog(boolean loadInBackground, int numMetadataLoadingThreads, String sentryServiceConfig, int impalaLogLevel, int otherLogLevel, - boolean allowAuthToLocal, String kerberosPrincipal) throws InternalException { + boolean allowAuthToLocal, String kerberosPrincipal, String localLibraryPath) + throws InternalException { BackendConfig.setAuthToLocal(allowAuthToLocal); Preconditions.checkArgument(numMetadataLoadingThreads > 0); // This trick saves having to pass a TLogLevel enum, which is an object and more @@ -98,7 +99,8 @@ public class JniCatalog { LOG.info(JniUtil.getJavaVersion()); catalog_ = new CatalogServiceCatalog(loadInBackground, - numMetadataLoadingThreads, sentryConfig, getServiceId(), kerberosPrincipal); + numMetadataLoadingThreads, sentryConfig, getServiceId(), kerberosPrincipal, + localLibraryPath); try { catalog_.reset(); } catch (CatalogException e) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java index 92b127f..c364ba5 100644 --- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java +++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java @@ -31,7 +31,8 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog { public CatalogServiceTestCatalog(boolean loadInBackground, int numLoadingThreads, SentryConfig sentryConfig, TUniqueId catalogServiceId) { - super(loadInBackground, numLoadingThreads, sentryConfig, catalogServiceId, null); + super(loadInBackground, numLoadingThreads, sentryConfig, catalogServiceId, null, + System.getProperty("java.io.tmpdir")); // Cache pools are typically loaded asynchronously, but as there is no fixed execution // order for tests, the cache pools are loaded synchronously before the tests are http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/tests/custom_cluster/test_permanent_udfs.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_permanent_udfs.py b/tests/custom_cluster/test_permanent_udfs.py index c979cd1..0170b8e 100644 --- a/tests/custom_cluster/test_permanent_udfs.py +++ b/tests/custom_cluster/test_permanent_udfs.py @@ -15,10 +15,13 @@ # specific language governing permissions and limitations # under the License. +import glob import os import pytest +import shutil import subprocess +from tempfile import mkdtemp from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_uncompressed_text_dimension @@ -33,6 +36,7 @@ class TestUdfPersistence(CustomClusterTestSuite): HIVE_IMPALA_INTEGRATION_DB = 'hive_impala_integration_db' HIVE_UDF_JAR = os.getenv('DEFAULT_FS') + '/test-warehouse/hive-exec.jar'; JAVA_UDF_JAR = os.getenv('DEFAULT_FS') + '/test-warehouse/impala-hive-udfs.jar'; + LOCAL_LIBRARY_DIR = mkdtemp(dir="/tmp") @classmethod def get_workload(cls): @@ -76,6 +80,7 @@ class TestUdfPersistence(CustomClusterTestSuite): self.client.execute("DROP DATABASE IF EXISTS %s CASCADE" % self.JAVA_FN_TEST_DB) self.client.execute("DROP DATABASE IF EXISTS %s CASCADE" % self.HIVE_IMPALA_INTEGRATION_DB) + shutil.rmtree(self.LOCAL_LIBRARY_DIR, ignore_errors=True) def run_stmt_in_hive(self, stmt): """ @@ -176,6 +181,8 @@ class TestUdfPersistence(CustomClusterTestSuite): @SkipIfS3.hive @SkipIfLocal.hive @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + catalogd_args= "--local_library_dir=%s" % LOCAL_LIBRARY_DIR) def test_java_udfs_hive_integration(self): ''' This test checks the integration between Hive and Impala on CREATE FUNCTION and DROP FUNCTION statements for persistent Java UDFs. @@ -225,6 +232,8 @@ class TestUdfPersistence(CustomClusterTestSuite): self.client.execute("INVALIDATE METADATA") self.verify_function_count( "SHOW FUNCTIONS in {0}".format(self.HIVE_IMPALA_INTEGRATION_DB), 0) + # Make sure we deleted all the temporary jars we copied to the local fs + assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0 @pytest.mark.execute_serially def test_java_udfs_from_impala(self):
