Repository: impala Updated Branches: refs/heads/master 830e3346f -> 818cd8fa2
IMPALA-6215: Removes race when using LibCache. Re-do the previously reverted change for IMPALA-6215. This patch addresses the flakes listed in IMPALA-6092, which have become more urgent recently. LibCache's api to provide access to locally cached files has a race. Currently, the client of the cache accesses the locally cached path as a string, but nothing guarantees that the associated file is not removed before the client is done using it. This race is suspected as the root cause for the flakiness seen in IMPALA-6092. These tests fail once in a while with classloader errors unable to load java udf classes. In these tests, the lib cache makes no guarantee that the path to the jar will remain valid from the time the path is acquired through the time needed to fetch the jar and resolve the needed classes. LibCache offers liveness guarantees for shared objects via reference counting. The fix in this patch extends this API to also cover paths to locally cached files. This fix *only* addresses the path race. General cleanup of the api will be done separately. Testing: - added a test to test_udfs.py that does many concurrent udf uses and removals. By increasing the concurrent operations to 100, the issue in IMPALA-6092 is locally reproducible on every run. With this fix, the problem is no longer reproducible with this test. Change-Id: I72ac0dfb13cf37d79e25c5b8a258b65f2dad097f Reviewed-on: http://gerrit.cloudera.org:8080/9968 Reviewed-by: Vuk Ercegovac <vercego...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/27c028f0 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/27c028f0 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/27c028f0 Branch: refs/heads/master Commit: 27c028f0578ac5edb42ba55c2c54ab8b7c195422 Parents: 830e334 Author: Vuk Ercegovac <vercego...@cloudera.com> Authored: Tue Nov 21 08:41:03 2017 -0800 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Wed Apr 11 02:14:28 2018 +0000 ---------------------------------------------------------------------- be/src/codegen/llvm-codegen.cc | 5 +- be/src/exec/external-data-source-executor.cc | 6 +- be/src/exprs/hive-udf-call.cc | 57 +++++++-------- be/src/exprs/hive-udf-call.h | 3 - be/src/runtime/lib-cache.cc | 18 +++-- be/src/runtime/lib-cache.h | 39 +++++++++-- be/src/service/fe-support.cc | 10 +-- tests/query_test/test_udfs.py | 84 +++++++++++++++++++++-- 8 files changed, 167 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/27c028f0/be/src/codegen/llvm-codegen.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc index 5ac19ad..c8fd8eb 100644 --- a/be/src/codegen/llvm-codegen.cc +++ b/be/src/codegen/llvm-codegen.cc @@ -339,9 +339,10 @@ Status LlvmCodeGen::LinkModuleFromLocalFs(const string& file) { Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location, const time_t mtime) { if (linked_modules_.find(hdfs_location) != linked_modules_.end()) return Status::OK(); + LibCacheEntryHandle handle; string local_path; - RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( - hdfs_location, LibCache::TYPE_IR, mtime, &local_path)); + RETURN_IF_ERROR(LibCache::instance()->GetLocalPath( + hdfs_location, LibCache::TYPE_IR, mtime, &handle, &local_path)); RETURN_IF_ERROR(LinkModuleFromLocalFs(local_path)); linked_modules_.insert(hdfs_location); return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/27c028f0/be/src/exec/external-data-source-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc index e8217b7..5c06d3a 100644 --- a/be/src/exec/external-data-source-executor.cc +++ b/be/src/exec/external-data-source-executor.cc @@ -136,12 +136,12 @@ ExternalDataSourceExecutor::~ExternalDataSourceExecutor() { Status ExternalDataSourceExecutor::Init(const string& jar_path, const string& class_name, const string& api_version, const string& init_string) { DCHECK(!is_initialized_); + LibCacheEntryHandle handle; string local_jar_path; // TODO(IMPALA-6727): pass the mtime from the coordinator. for now, skip the mtime // check (-1). - RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( - jar_path, LibCache::TYPE_JAR, -1, &local_jar_path)); - + RETURN_IF_ERROR(LibCache::instance()->GetLocalPath( + jar_path, LibCache::TYPE_JAR, -1, &handle, &local_jar_path)); JNIEnv* jni_env = getJNIEnv(); // Add a scoped cleanup jni reference object. This cleans up local refs made below. http://git-wip-us.apache.org/repos/asf/impala/blob/27c028f0/be/src/exprs/hive-udf-call.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc index be3965a..b7715b7 100644 --- a/be/src/exprs/hive-udf-call.cc +++ b/be/src/exprs/hive-udf-call.cc @@ -174,10 +174,6 @@ Status HiveUdfCall::Init(const RowDescriptor& row_desc, RuntimeState* state) { // Initialize children first. RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state)); - // Copy the Hive Jar from hdfs to local file system. - RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( - fn_.hdfs_location, LibCache::TYPE_JAR, fn_.last_modified_time, &local_location_)); - // Initialize input_byte_offsets_ and input_buffer_size_ for (int i = 0; i < GetNumChildren(); ++i) { input_byte_offsets_.push_back(input_buffer_size_); @@ -202,30 +198,35 @@ Status HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope, JNIEnv* env = getJNIEnv(); if (env == NULL) return Status("Failed to get/create JVM"); - THiveUdfExecutorCtorParams ctor_params; - ctor_params.fn = fn_; - ctor_params.local_location = local_location_; - ctor_params.input_byte_offsets = input_byte_offsets_; - - jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_]; - jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()]; - jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()]; - - ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer; - ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer; - ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer; - ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value; - - jbyteArray ctor_params_bytes; - - // Add a scoped cleanup jni reference object. This cleans up local refs made - // below. - JniLocalFrame jni_frame; - RETURN_IF_ERROR(jni_frame.push(env)); - - RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); - // Create the java executor object - jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes); + { + LibCacheEntryHandle handle; + string local_location; + RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(fn_.hdfs_location, + LibCache::TYPE_JAR, fn_.last_modified_time, &handle, &local_location)); + THiveUdfExecutorCtorParams ctor_params; + ctor_params.fn = fn_; + ctor_params.local_location = local_location; + ctor_params.input_byte_offsets = input_byte_offsets_; + + jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_]; + jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()]; + jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()]; + + ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer; + ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer; + ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer; + ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value; + + jbyteArray ctor_params_bytes; + + // Add a scoped cleanup jni reference object. This cleans up local refs made below. + JniLocalFrame jni_frame; + RETURN_IF_ERROR(jni_frame.push(env)); + + RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); + // Create the java executor object + jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes); + } RETURN_ERROR_IF_EXC(env); RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor)); http://git-wip-us.apache.org/repos/asf/impala/blob/27c028f0/be/src/exprs/hive-udf-call.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h index 7ce5eb0..8ca0372 100644 --- a/be/src/exprs/hive-udf-call.h +++ b/be/src/exprs/hive-udf-call.h @@ -116,9 +116,6 @@ class HiveUdfCall : public ScalarExpr { /// error. AnyVal* Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) const; - /// The path on the local FS to the UDF's jar - std::string local_location_; - /// input_byte_offsets_[i] is the byte offset child ith's input argument should /// be written to. std::vector<int> input_byte_offsets_; http://git-wip-us.apache.org/repos/asf/impala/blob/27c028f0/be/src/runtime/lib-cache.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc index 83bb4dc..cb1bc74 100644 --- a/be/src/runtime/lib-cache.cc +++ b/be/src/runtime/lib-cache.cc @@ -131,6 +131,10 @@ LibCacheEntry::~LibCacheEntry() { unlink(local_path.c_str()); } +LibCacheEntryHandle::~LibCacheEntryHandle() { + if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry_); +} + Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& symbol, time_t exp_mtime, void** fn_ptr, LibCacheEntry** ent, bool quiet) { if (hdfs_lib_file.empty()) { @@ -181,14 +185,16 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) { if (can_delete) delete entry; } -Status LibCache::GetLocalLibPath( - const string& hdfs_lib_file, LibType type, time_t exp_mtime, string* local_path) { - unique_lock<mutex> lock; +Status LibCache::GetLocalPath(const std::string& hdfs_lib_file, LibType type, + time_t exp_mtime, LibCacheEntryHandle* handle, string* path) { + DCHECK(handle != nullptr && handle->entry() == nullptr); LibCacheEntry* entry = nullptr; + unique_lock<mutex> lock; RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, exp_mtime, &lock, &entry)); DCHECK(entry != nullptr); - DCHECK_EQ(entry->type, type); - *local_path = entry->local_path; + ++entry->use_count; + handle->SetEntry(entry); + *path = entry->local_path; return Status::OK(); } @@ -422,7 +428,7 @@ Status LibCache::RefreshCacheEntry(const string& hdfs_lib_file, LibType type, // Let the caller propagate any error that occurred when loading the entry. RETURN_IF_ERROR((*entry)->copy_file_status); - DCHECK_EQ((*entry)->type, type); + DCHECK_EQ((*entry)->type, type) << (*entry)->local_path; DCHECK(!(*entry)->local_path.empty()); } return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/27c028f0/be/src/runtime/lib-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h index 820a1a8..b8a2981 100644 --- a/be/src/runtime/lib-cache.h +++ b/be/src/runtime/lib-cache.h @@ -49,11 +49,16 @@ class RuntimeState; /// using the library. When the caller requests a ptr into the library, they /// are given the entry handle and must decrement the ref count when they /// are done. +/// Note: Explicitly managing this reference count at the client is error-prone. See the +/// api for accessing a path, GetLocalPath(), that uses the handle's scope to manage the +/// reference count. // /// TODO: /// - refresh libraries -/// - better cached module management. +/// - better cached module management +/// - improve the api to be less error-prone (IMPALA-6439) struct LibCacheEntry; +class LibCacheEntryHandle; class LibCache { public: @@ -71,15 +76,19 @@ class LibCache { /// Initializes the libcache. Must be called before any other APIs. static Status Init(); - /// Gets the local file system path for the library at 'hdfs_lib_file'. If + /// Gets the local 'path' used to cache the file stored at the global 'hdfs_lib_file'. If /// this file is not already on the local fs, or if the cached entry's last modified /// is older than expected mtime, 'exp_mtime', it copies it and caches the result. /// An 'exp_mtime' of -1 makes the mtime check a no-op. + /// + /// 'handle' must remain in scope while 'path' is used. The reference count to the + /// underlying cache entry is decremented when 'handle' goes out-of-scope. + /// /// Returns an error if 'hdfs_lib_file' cannot be copied to the local fs or if /// exp_mtime differs from the mtime on the file system. /// If error is due to refresh, then the entry will be removed from the cache. - Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type, time_t exp_mtime, - std::string* local_path); + Status GetLocalPath(const std::string& hdfs_lib_file, LibType type, time_t exp_mtime, + LibCacheEntryHandle* handle, string* path); /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok otherwise. /// If status.ok() is true, 'mtime' is set to the cache entry's last modified time. @@ -107,6 +116,7 @@ class LibCache { /// An 'exp_mtime' of -1 makes the mtime check a no-op. /// An error is returned if exp_mtime differs from the mtime on the file system. /// If error is due to refresh, then the entry will be removed from the cache. + /// TODO: api is error-prone. upgrade to LibCacheEntryHandle (see IMPALA-6439). Status GetSoFunctionPtr(const std::string& hdfs_lib_file, const std::string& symbol, time_t exp_mtime, void** fn_ptr, LibCacheEntry** entry, bool quiet = false); @@ -204,6 +214,27 @@ class LibCache { const std::string& hdfs_lib_file, const LibMap::iterator& entry_iterator); }; +/// Handle for a LibCacheEntry that decrements its reference count when the handle is +/// destroyed or re-used for another entry. +class LibCacheEntryHandle { + public: + LibCacheEntryHandle() {} + ~LibCacheEntryHandle(); + + private: + friend class LibCache; + + LibCacheEntry* entry() const { return entry_; } + void SetEntry(LibCacheEntry* entry) { + if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry); + entry_ = entry; + } + + LibCacheEntry* entry_ = nullptr; + + DISALLOW_COPY_AND_ASSIGN(LibCacheEntryHandle); +}; + } #endif http://git-wip-us.apache.org/repos/asf/impala/blob/27c028f0/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index a8906a0..187d14e 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -300,9 +300,10 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params, // Refresh the library if necessary. LibCache::instance()->SetNeedsRefresh(params.location); } + LibCacheEntryHandle handle; string dummy_local_path; - Status status = LibCache::instance()->GetLocalLibPath( - params.location, type, -1, &dummy_local_path); + Status status = LibCache::instance()->GetLocalPath( + params.location, type, -1, &handle, &dummy_local_path); if (!status.ok()) { result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND); result->__set_error_msg(status.GetDetail()); @@ -397,10 +398,11 @@ Java_org_apache_impala_service_FeSupport_NativeCacheJar( JniUtil::internal_exc_class(), nullptr); TCacheJarResult result; + LibCacheEntryHandle handle; string local_path; // TODO(IMPALA-6727): used for external data sources; add proper mtime. - Status status = LibCache::instance()->GetLocalLibPath( - params.hdfs_location, LibCache::TYPE_JAR, -1, &local_path); + Status status = LibCache::instance()->GetLocalPath( + params.hdfs_location, LibCache::TYPE_JAR, -1, &handle, &local_path); status.ToThrift(&result.status); if (status.ok()) result.__set_local_path(local_path); http://git-wip-us.apache.org/repos/asf/impala/blob/27c028f0/tests/query_test/test_udfs.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py index 644d80f..dc5491b 100644 --- a/tests/query_test/test_udfs.py +++ b/tests/query_test/test_udfs.py @@ -394,8 +394,6 @@ class TestUdfExecution(TestUdfBase): self.run_test_case('QueryTest/udf-non-deterministic', vector, use_db=unique_database) - # Runs serially as a temporary workaround for IMPALA_6092. - @pytest.mark.execute_serially def test_java_udfs(self, vector, unique_database): self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database) self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database) @@ -507,9 +505,6 @@ class TestUdfTargeted(TestUdfBase): unique_database, tgt_udf_path)) query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database) - # Dropping the function can interact with other tests whose Java classes are in - # the same jar. Use a copy of the jar to avoid unintended interactions. - # See IMPALA-6215 and IMPALA-6092 for examples. check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path]) self.client.execute(drop_fn_stmt) self.client.execute(create_fn_stmt) @@ -518,6 +513,85 @@ class TestUdfTargeted(TestUdfBase): assert "Unable to find class" in str(ex) self.client.execute(drop_fn_stmt) + def test_concurrent_jar_drop_use(self, vector, unique_database): + """IMPALA-6215: race between dropping/using java udf's defined in the same jar. + This test runs concurrent drop/use threads that result in class not found + exceptions when the race is present. + """ + udf_src_path = os.path.join( + os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar") + udf_tgt_path = get_fs_path( + '/test-warehouse/{0}.db/impala-hive-udfs.jar'.format(unique_database)) + + create_fn_to_drop = """create function {0}.foo_{1}() returns string + LOCATION '{2}' SYMBOL='org.apache.impala.TestUpdateUdf'""" + create_fn_to_use = """create function {0}.use_it(string) returns string + LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'""" + drop_fn = "drop function if exists {0}.foo_{1}()" + use_fn = """select * from (select max(int_col) from functional.alltypesagg + where {0}.use_it(string_col) = 'blah' union all + (select max(int_col) from functional.alltypesagg + where {0}.use_it(String_col) > '1' union all + (select max(int_col) from functional.alltypesagg + where {0}.use_it(string_col) > '1'))) v""" + num_drops = 100 + num_uses = 100 + + # use a unique jar for this test to avoid interactions with other tests + # that use the same jar + check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path]) + + # create all the functions. + setup_client = self.create_impala_client() + try: + s = create_fn_to_use.format(unique_database, udf_tgt_path) + setup_client.execute(s) + except Exception as e: + print e + assert False + for i in range(0, num_drops): + try: + setup_client.execute(create_fn_to_drop.format(unique_database, i, udf_tgt_path)) + except Exception as e: + print e + assert False + + errors = [] + def use_fn_method(): + time.sleep(5 + random.random()) + client = self.create_impala_client() + try: + client.execute(use_fn.format(unique_database)) + except Exception as e: errors.append(e) + + def drop_fn_method(i): + time.sleep(1 + random.random()) + client = self.create_impala_client() + try: + client.execute(drop_fn.format(unique_database, i)) + except Exception as e: errors.append(e) + + # create threads to use functions. + runner_threads = [] + for i in range(0, num_uses): + runner_threads.append(threading.Thread(target=use_fn_method)) + + # create threads to drop functions. + drop_threads = [] + for i in range(0, num_drops): + runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, ))) + + # launch all runner threads. + for t in runner_threads: t.start() + + # join all threads. + for t in runner_threads: t.join(); + + # Check for any errors. + for e in errors: print e + assert len(errors) == 0 + + @SkipIfLocal.multiple_impalad def test_hive_udfs_missing_jar(self, vector, unique_database): """ IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present