IMPALA-6215: Removes race when using LibCache.

Re-do the previously reverted change for IMPALA-6215. This patch
addresses the flakes listed in IMPALA-6092, which have become more
urgent recently.

LibCache's api to provide access to locally cached files has a race.
Currently, the client of the cache accesses the locally cached path
as a string, but nothing guarantees that the associated file is not
removed before the client is done using it. This race is suspected
as the root cause for the flakiness seen in IMPALA-6092. These tests
fail once in a while with classloader errors unable to load java udf
classes. In these tests, the lib cache makes no guarantee that the path
to the jar will remain valid from the time the path is acquired through
the time needed to fetch the jar and resolve the needed classes.

LibCache offers liveness guarantees for shared objects via reference
counting. The fix in this patch extends this API to also cover paths
to locally cached files.

This fix *only* addresses the path race. General cleanup of the api will
be done separately.

Testing:
   - added a test to test_udfs.py that does many concurrent udf uses and
     removals. By increasing the concurrent operations to 100, the issue
     in IMPALA-6092 is locally reproducible on every run. With this fix,
     the problem is no longer reproducible with this test.

Change-Id: I72ac0dfb13cf37d79e25c5b8a258b65f2dad097f
Reviewed-on: http://gerrit.cloudera.org:8080/9968
Reviewed-by: Vuk Ercegovac <vercego...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/226b914d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/226b914d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/226b914d

Branch: refs/heads/2.x
Commit: 226b914dd02dba4aa389ed7b4c0023ddfd229096
Parents: bbe5342
Author: Vuk Ercegovac <vercego...@cloudera.com>
Authored: Tue Nov 21 08:41:03 2017 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Apr 12 03:00:07 2018 +0000

----------------------------------------------------------------------
 be/src/codegen/llvm-codegen.cc               |  5 +-
 be/src/exec/external-data-source-executor.cc |  6 +-
 be/src/exprs/hive-udf-call.cc                | 57 +++++++--------
 be/src/exprs/hive-udf-call.h                 |  3 -
 be/src/runtime/lib-cache.cc                  | 18 +++--
 be/src/runtime/lib-cache.h                   | 39 +++++++++--
 be/src/service/fe-support.cc                 | 10 +--
 tests/query_test/test_udfs.py                | 84 +++++++++++++++++++++--
 8 files changed, 167 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/226b914d/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 5ac19ad..c8fd8eb 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -339,9 +339,10 @@ Status LlvmCodeGen::LinkModuleFromLocalFs(const string& 
file) {
 
 Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location, const 
time_t mtime) {
   if (linked_modules_.find(hdfs_location) != linked_modules_.end()) return 
Status::OK();
+  LibCacheEntryHandle handle;
   string local_path;
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-      hdfs_location, LibCache::TYPE_IR, mtime, &local_path));
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
+      hdfs_location, LibCache::TYPE_IR, mtime, &handle, &local_path));
   RETURN_IF_ERROR(LinkModuleFromLocalFs(local_path));
   linked_modules_.insert(hdfs_location);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/226b914d/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc 
b/be/src/exec/external-data-source-executor.cc
index e8217b7..5c06d3a 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -136,12 +136,12 @@ ExternalDataSourceExecutor::~ExternalDataSourceExecutor() 
{
 Status ExternalDataSourceExecutor::Init(const string& jar_path,
     const string& class_name, const string& api_version, const string& 
init_string) {
   DCHECK(!is_initialized_);
+  LibCacheEntryHandle handle;
   string local_jar_path;
   // TODO(IMPALA-6727): pass the mtime from the coordinator. for now, skip the 
mtime
   // check (-1).
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-      jar_path, LibCache::TYPE_JAR, -1, &local_jar_path));
-
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
+      jar_path, LibCache::TYPE_JAR, -1, &handle, &local_jar_path));
   JNIEnv* jni_env = getJNIEnv();
 
   // Add a scoped cleanup jni reference object. This cleans up local refs made 
below.

http://git-wip-us.apache.org/repos/asf/impala/blob/226b914d/be/src/exprs/hive-udf-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index be3965a..b7715b7 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -174,10 +174,6 @@ Status HiveUdfCall::Init(const RowDescriptor& row_desc, 
RuntimeState* state) {
   // Initialize children first.
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state));
 
-  // Copy the Hive Jar from hdfs to local file system.
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-      fn_.hdfs_location, LibCache::TYPE_JAR, fn_.last_modified_time, 
&local_location_));
-
   // Initialize input_byte_offsets_ and input_buffer_size_
   for (int i = 0; i < GetNumChildren(); ++i) {
     input_byte_offsets_.push_back(input_buffer_size_);
@@ -202,30 +198,35 @@ Status 
HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
   JNIEnv* env = getJNIEnv();
   if (env == NULL) return Status("Failed to get/create JVM");
 
-  THiveUdfExecutorCtorParams ctor_params;
-  ctor_params.fn = fn_;
-  ctor_params.local_location = local_location_;
-  ctor_params.input_byte_offsets = input_byte_offsets_;
-
-  jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
-  jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
-  jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
-
-  ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
-  ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
-  ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
-  ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
-
-  jbyteArray ctor_params_bytes;
-
-  // Add a scoped cleanup jni reference object. This cleans up local refs made
-  // below.
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(env));
-
-  RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
-  // Create the java executor object
-  jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, 
ctor_params_bytes);
+  {
+    LibCacheEntryHandle handle;
+    string local_location;
+    RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(fn_.hdfs_location,
+        LibCache::TYPE_JAR, fn_.last_modified_time, &handle, &local_location));
+    THiveUdfExecutorCtorParams ctor_params;
+    ctor_params.fn = fn_;
+    ctor_params.local_location = local_location;
+    ctor_params.input_byte_offsets = input_byte_offsets_;
+
+    jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
+    jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
+    jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
+
+    ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
+    ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
+    ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
+    ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
+
+    jbyteArray ctor_params_bytes;
+
+    // Add a scoped cleanup jni reference object. This cleans up local refs 
made below.
+    JniLocalFrame jni_frame;
+    RETURN_IF_ERROR(jni_frame.push(env));
+
+    RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+    // Create the java executor object
+    jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, 
ctor_params_bytes);
+  }
   RETURN_ERROR_IF_EXC(env);
   RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, 
&jni_ctx->executor));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/226b914d/be/src/exprs/hive-udf-call.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h
index 7ce5eb0..8ca0372 100644
--- a/be/src/exprs/hive-udf-call.h
+++ b/be/src/exprs/hive-udf-call.h
@@ -116,9 +116,6 @@ class HiveUdfCall : public ScalarExpr {
   /// error.
   AnyVal* Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) const;
 
-  /// The path on the local FS to the UDF's jar
-  std::string local_location_;
-
   /// input_byte_offsets_[i] is the byte offset child ith's input argument 
should
   /// be written to.
   std::vector<int> input_byte_offsets_;

http://git-wip-us.apache.org/repos/asf/impala/blob/226b914d/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index 83bb4dc..cb1bc74 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -131,6 +131,10 @@ LibCacheEntry::~LibCacheEntry() {
   unlink(local_path.c_str());
 }
 
+LibCacheEntryHandle::~LibCacheEntryHandle() {
+  if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry_);
+}
+
 Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& 
symbol,
     time_t exp_mtime, void** fn_ptr, LibCacheEntry** ent, bool quiet) {
   if (hdfs_lib_file.empty()) {
@@ -181,14 +185,16 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) {
   if (can_delete) delete entry;
 }
 
-Status LibCache::GetLocalLibPath(
-    const string& hdfs_lib_file, LibType type, time_t exp_mtime, string* 
local_path) {
-  unique_lock<mutex> lock;
+Status LibCache::GetLocalPath(const std::string& hdfs_lib_file, LibType type,
+    time_t exp_mtime, LibCacheEntryHandle* handle, string* path) {
+  DCHECK(handle != nullptr && handle->entry() == nullptr);
   LibCacheEntry* entry = nullptr;
+  unique_lock<mutex> lock;
   RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, exp_mtime, &lock, 
&entry));
   DCHECK(entry != nullptr);
-  DCHECK_EQ(entry->type, type);
-  *local_path = entry->local_path;
+  ++entry->use_count;
+  handle->SetEntry(entry);
+  *path = entry->local_path;
   return Status::OK();
 }
 
@@ -422,7 +428,7 @@ Status LibCache::RefreshCacheEntry(const string& 
hdfs_lib_file, LibType type,
 
     // Let the caller propagate any error that occurred when loading the entry.
     RETURN_IF_ERROR((*entry)->copy_file_status);
-    DCHECK_EQ((*entry)->type, type);
+    DCHECK_EQ((*entry)->type, type) << (*entry)->local_path;
     DCHECK(!(*entry)->local_path.empty());
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/226b914d/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index 820a1a8..b8a2981 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -49,11 +49,16 @@ class RuntimeState;
 /// using the library. When the caller requests a ptr into the library, they
 /// are given the entry handle and must decrement the ref count when they
 /// are done.
+/// Note: Explicitly managing this reference count at the client is 
error-prone. See the
+/// api for accessing a path, GetLocalPath(), that uses the handle's scope to 
manage the
+/// reference count.
 //
 /// TODO:
 /// - refresh libraries
-/// - better cached module management.
+/// - better cached module management
+/// - improve the api to be less error-prone (IMPALA-6439)
 struct LibCacheEntry;
+class LibCacheEntryHandle;
 
 class LibCache {
  public:
@@ -71,15 +76,19 @@ class LibCache {
   /// Initializes the libcache. Must be called before any other APIs.
   static Status Init();
 
-  /// Gets the local file system path for the library at 'hdfs_lib_file'. If
+  /// Gets the local 'path' used to cache the file stored at the global 
'hdfs_lib_file'. If
   /// this file is not already on the local fs, or if the cached entry's last 
modified
   /// is older than expected mtime, 'exp_mtime', it copies it and caches the 
result.
   /// An 'exp_mtime' of -1 makes the mtime check a no-op.
+  ///
+  /// 'handle' must remain in scope while 'path' is used. The reference count 
to the
+  /// underlying cache entry is decremented when 'handle' goes out-of-scope.
+  ///
   /// Returns an error if 'hdfs_lib_file' cannot be copied to the local fs or 
if
   /// exp_mtime differs from the mtime on the file system.
   /// If error is due to refresh, then the entry will be removed from the 
cache.
-  Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type, 
time_t exp_mtime,
-      std::string* local_path);
+  Status GetLocalPath(const std::string& hdfs_lib_file, LibType type, time_t 
exp_mtime,
+      LibCacheEntryHandle* handle, string* path);
 
   /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok 
otherwise.
   /// If status.ok() is true, 'mtime' is set to the cache entry's last 
modified time.
@@ -107,6 +116,7 @@ class LibCache {
   /// An 'exp_mtime' of -1 makes the mtime check a no-op.
   /// An error is returned if exp_mtime differs from the mtime on the file 
system.
   /// If error is due to refresh, then the entry will be removed from the 
cache.
+  /// TODO: api is error-prone. upgrade to LibCacheEntryHandle (see 
IMPALA-6439).
   Status GetSoFunctionPtr(const std::string& hdfs_lib_file, const std::string& 
symbol,
       time_t exp_mtime, void** fn_ptr, LibCacheEntry** entry, bool quiet = 
false);
 
@@ -204,6 +214,27 @@ class LibCache {
       const std::string& hdfs_lib_file, const LibMap::iterator& 
entry_iterator);
 };
 
+/// Handle for a LibCacheEntry that decrements its reference count when the 
handle is
+/// destroyed or re-used for another entry.
+class LibCacheEntryHandle {
+ public:
+  LibCacheEntryHandle() {}
+  ~LibCacheEntryHandle();
+
+ private:
+  friend class LibCache;
+
+  LibCacheEntry* entry() const { return entry_; }
+  void SetEntry(LibCacheEntry* entry) {
+    if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry);
+    entry_ = entry;
+  }
+
+  LibCacheEntry* entry_ = nullptr;
+
+  DISALLOW_COPY_AND_ASSIGN(LibCacheEntryHandle);
+};
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/226b914d/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index a8906a0..187d14e 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -300,9 +300,10 @@ static void ResolveSymbolLookup(const TSymbolLookupParams 
params,
       // Refresh the library if necessary.
       LibCache::instance()->SetNeedsRefresh(params.location);
     }
+    LibCacheEntryHandle handle;
     string dummy_local_path;
-    Status status = LibCache::instance()->GetLocalLibPath(
-        params.location, type, -1, &dummy_local_path);
+    Status status = LibCache::instance()->GetLocalPath(
+        params.location, type, -1, &handle, &dummy_local_path);
     if (!status.ok()) {
       result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND);
       result->__set_error_msg(status.GetDetail());
@@ -397,10 +398,11 @@ Java_org_apache_impala_service_FeSupport_NativeCacheJar(
       JniUtil::internal_exc_class(), nullptr);
 
   TCacheJarResult result;
+  LibCacheEntryHandle handle;
   string local_path;
   // TODO(IMPALA-6727): used for external data sources; add proper mtime.
-  Status status = LibCache::instance()->GetLocalLibPath(
-      params.hdfs_location, LibCache::TYPE_JAR, -1, &local_path);
+  Status status = LibCache::instance()->GetLocalPath(
+      params.hdfs_location, LibCache::TYPE_JAR, -1, &handle, &local_path);
   status.ToThrift(&result.status);
   if (status.ok()) result.__set_local_path(local_path);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/226b914d/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 644d80f..dc5491b 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -394,8 +394,6 @@ class TestUdfExecution(TestUdfBase):
       self.run_test_case('QueryTest/udf-non-deterministic', vector,
           use_db=unique_database)
 
-  # Runs serially as a temporary workaround for IMPALA_6092.
-  @pytest.mark.execute_serially
   def test_java_udfs(self, vector, unique_database):
     self.run_test_case('QueryTest/load-java-udfs', vector, 
use_db=unique_database)
     self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database)
@@ -507,9 +505,6 @@ class TestUdfTargeted(TestUdfBase):
             unique_database, tgt_udf_path))
     query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database)
 
-    # Dropping the function can interact with other tests whose Java classes 
are in
-    # the same jar. Use a copy of the jar to avoid unintended interactions.
-    # See IMPALA-6215 and IMPALA-6092 for examples.
     check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path])
     self.client.execute(drop_fn_stmt)
     self.client.execute(create_fn_stmt)
@@ -518,6 +513,85 @@ class TestUdfTargeted(TestUdfBase):
       assert "Unable to find class" in str(ex)
     self.client.execute(drop_fn_stmt)
 
+  def test_concurrent_jar_drop_use(self, vector, unique_database):
+    """IMPALA-6215: race between dropping/using java udf's defined in the same 
jar.
+       This test runs concurrent drop/use threads that result in class not 
found
+       exceptions when the race is present.
+    """
+    udf_src_path = os.path.join(
+      os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar")
+    udf_tgt_path = get_fs_path(
+      '/test-warehouse/{0}.db/impala-hive-udfs.jar'.format(unique_database))
+
+    create_fn_to_drop = """create function {0}.foo_{1}() returns string
+                           LOCATION '{2}' 
SYMBOL='org.apache.impala.TestUpdateUdf'"""
+    create_fn_to_use = """create function {0}.use_it(string) returns string
+                          LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'"""
+    drop_fn = "drop function if exists {0}.foo_{1}()"
+    use_fn = """select * from (select max(int_col) from functional.alltypesagg
+                where {0}.use_it(string_col) = 'blah' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(String_col) > '1' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(string_col) > '1'))) v"""
+    num_drops = 100
+    num_uses = 100
+
+    # use a unique jar for this test to avoid interactions with other tests
+    # that use the same jar
+    check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path])
+
+    # create all the functions.
+    setup_client = self.create_impala_client()
+    try:
+      s = create_fn_to_use.format(unique_database, udf_tgt_path)
+      setup_client.execute(s)
+    except Exception as e:
+      print e
+      assert False
+    for i in range(0, num_drops):
+      try:
+        setup_client.execute(create_fn_to_drop.format(unique_database, i, 
udf_tgt_path))
+      except Exception as e:
+        print e
+        assert False
+
+    errors = []
+    def use_fn_method():
+      time.sleep(5 + random.random())
+      client = self.create_impala_client()
+      try:
+        client.execute(use_fn.format(unique_database))
+      except Exception as e: errors.append(e)
+
+    def drop_fn_method(i):
+      time.sleep(1 + random.random())
+      client = self.create_impala_client()
+      try:
+        client.execute(drop_fn.format(unique_database, i))
+      except Exception as e: errors.append(e)
+
+    # create threads to use functions.
+    runner_threads = []
+    for i in range(0, num_uses):
+      runner_threads.append(threading.Thread(target=use_fn_method))
+
+    # create threads to drop functions.
+    drop_threads = []
+    for i in range(0, num_drops):
+      runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, 
)))
+
+    # launch all runner threads.
+    for t in runner_threads: t.start()
+
+    # join all threads.
+    for t in runner_threads: t.join();
+
+    # Check for any errors.
+    for e in errors: print e
+    assert len(errors) == 0
+
+
   @SkipIfLocal.multiple_impalad
   def test_hive_udfs_missing_jar(self, vector, unique_database):
     """ IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present

Reply via email to