Repository: incubator-impala
Updated Branches:
  refs/heads/master bbf5255d0 -> 707f71b6e


IMPALA-4410: Safer tear-down of RuntimeState

* Add RuntimeState::Close() which is guaranteed to release resources
  safely, rather than having PFE::Close() do the same piecemeal.
* Fix for crash where PFE::Prepare() fails before descriptor table is
  created.
* Remove some dead code from TestEnv, and rename some methods for clarity.

Testing: Found by debug-actions, which has a reproducible test where
PFE::Prepare() fails. Manually tested on master.

Change-Id: Ie416e4d57240142bf685385299b749c3a6792c45
Reviewed-on: http://gerrit.cloudera.org:8080/4893
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/707f71b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/707f71b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/707f71b6

Branch: refs/heads/master
Commit: 707f71b6ea13487c707337785e785487d2f470f2
Parents: bbf5255
Author: Henry Robinson <[email protected]>
Authored: Mon Oct 31 15:20:51 2016 -0700
Committer: Henry Robinson <[email protected]>
Committed: Wed Nov 23 23:25:04 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hash-table-test.cc               |  9 +++--
 be/src/runtime/buffered-block-mgr-test.cc    |  6 +--
 be/src/runtime/buffered-tuple-stream-test.cc | 12 ++++--
 be/src/runtime/plan-fragment-executor.cc     |  5 +--
 be/src/runtime/runtime-state.cc              |  8 ++++
 be/src/runtime/runtime-state.h               | 13 ++++--
 be/src/runtime/test-env.cc                   | 49 +++++++++--------------
 be/src/runtime/test-env.h                    | 22 ++++------
 be/src/service/fe-support.cc                 | 19 +++++----
 be/src/util/scope-exit-trigger.h             | 40 ++++++++++++++++++
 10 files changed, 111 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 9fb5038..a539bb1 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -58,6 +58,7 @@ class HashTableTest : public testing::Test {
   MemPool mem_pool_;
   vector<ExprContext*> build_expr_ctxs_;
   vector<ExprContext*> probe_expr_ctxs_;
+  int next_query_id_ = 0;
 
   virtual void SetUp() {
     test_env_.reset(new TestEnv());
@@ -184,8 +185,8 @@ class HashTableTest : public testing::Test {
   bool CreateHashTable(bool quadratic, int64_t initial_num_buckets,
       scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024,
       int max_num_blocks = 100, int reserved_blocks = 10) {
-    EXPECT_OK(
-        test_env_->CreateQueryState(0, max_num_blocks, block_size, 
&runtime_state_));
+    EXPECT_OK(test_env_->CreatePerQueryState(
+        next_query_id_++, max_num_blocks, block_size, &runtime_state_));
     MemTracker* client_tracker = pool_.Add(
         new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     BufferedBlockMgr::Client* client;
@@ -602,8 +603,8 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
 
 // Test that hashing empty string updates hash value.
 TEST_F(HashTableTest, HashEmpty) {
-  EXPECT_TRUE(test_env_->CreateQueryState(0, 100, 8 * 1024 * 1024,
-      &runtime_state_).ok());
+  EXPECT_TRUE(
+      test_env_->CreatePerQueryState(0, 100, 8 * 1024 * 1024, 
&runtime_state_).ok());
   scoped_ptr<HashTableCtx> ht_ctx;
   Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
       probe_expr_ctxs_, false /* !stores_nulls_ */,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc 
b/be/src/runtime/buffered-block-mgr-test.cc
index 9f0222d..5e99c09 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -142,7 +142,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
   BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int 
block_size,
       RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) {
     RuntimeState* state;
-    EXPECT_OK(test_env_->CreateQueryState(
+    EXPECT_OK(test_env_->CreatePerQueryState(
         query_id, max_buffers, block_size, &state, query_options));
     if (query_state != NULL) *query_state = state;
     return state->block_mgr();
@@ -185,7 +185,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
   void TearDownMgrs() {
     // Tear down the query states, which DCHECKs that the memory consumption of
     // the query's trackers is zero.
-    test_env_->TearDownQueryStates();
+    test_env_->TearDownRuntimeStates();
   }
 
   void AllocateBlocks(BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* 
client,
@@ -924,7 +924,7 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown(
   // scenario by holding onto a reference to the block mgr. This should be 
safe so
   // long as blocks are properly deleted before the runtime state is torn down.
   DeleteBlocks(blocks);
-  test_env_->TearDownQueryStates();
+  test_env_->TearDownRuntimeStates();
 
   // Optionally wait for writes to complete after cancellation.
   if (wait_for_writes) WaitForWrites(block_mgr.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 9063a7d..e98c334 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -101,7 +101,7 @@ class SimpleTupleStreamTest : public testing::Test {
   /// Setup a block manager with the provided settings and client with no 
reservation,
   /// tracked by tracker_.
   void InitBlockMgr(int64_t limit, int block_size) {
-    ASSERT_OK(test_env_->CreateQueryState(0, limit, block_size, 
&runtime_state_));
+    ASSERT_OK(test_env_->CreatePerQueryState(0, limit, block_size, 
&runtime_state_));
     MemTracker* client_tracker = pool_.Add(
         new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     ASSERT_OK(runtime_state_->block_mgr()->RegisterClient(
@@ -723,14 +723,20 @@ void SimpleTupleStreamTest::TestTransferMemory(bool 
pin_stream, bool read_write)
 }
 
 /// Test attaching memory to a row batch from a pinned stream.
-TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStream) {
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamReadWrite) {
   TestTransferMemory(true, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamNoReadWrite) {
   TestTransferMemory(true, false);
 }
 
 /// Test attaching memory to a row batch from an unpinned stream.
-TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStream) {
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamReadWrite) {
   TestTransferMemory(false, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) {
   TestTransferMemory(false, false);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc 
b/be/src/runtime/plan-fragment-executor.cc
index f673f34..ad781b5 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -534,10 +534,7 @@ void PlanFragmentExecutor::Close() {
   // Prepare should always have been called, and so runtime_state_ should be 
set
   DCHECK(prepared_promise_.IsSet());
   if (exec_tree_ != NULL) exec_tree_->Close(runtime_state_.get());
-  runtime_state_->UnregisterReaderContexts();
-  exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
-  runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get());
-  runtime_state_->filter_bank()->Close();
+  runtime_state_->ReleaseResources();
 
   if (mem_usage_sampled_counter_ != NULL) {
     PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 37d5fb0..d0e1172 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -306,4 +306,12 @@ void RuntimeState::UnregisterReaderContexts() {
   reader_contexts_.clear();
 }
 
+void RuntimeState::ReleaseResources() {
+  UnregisterReaderContexts();
+  if (desc_tbl_ != nullptr) desc_tbl_->ClosePartitionExprs(this);
+  if (filter_bank_ != nullptr) filter_bank_->Close();
+  if (resource_pool_ != nullptr) {
+    ExecEnv::GetInstance()->thread_mgr()->UnregisterPool(resource_pool_);
+  }
+}
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 7003e59..de3ab94 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -62,8 +62,10 @@ typedef std::map<std::string, TInsertStats> 
PartitionInsertStats;
 /// deleted.
 typedef std::map<std::string, std::string> FileMoveMap;
 
-/// A collection of items that are part of the global state of a
-/// query and shared across all execution nodes of that query.
+/// A collection of items that are part of the global state of a query and 
shared across
+/// all execution nodes of that query. After initialisation, callers must call
+/// ReleaseResources() to ensure that all resources are correctly freed before
+/// destruction.
 class RuntimeState {
  public:
   RuntimeState(const TExecPlanFragmentParams& fragment_params, ExecEnv* 
exec_env);
@@ -311,6 +313,9 @@ class RuntimeState {
   /// TODO: Fix IMPALA-4233
   Status CodegenScalarFns();
 
+  /// Release resources and prepare this object for destruction.
+  void ReleaseResources();
+
  private:
   /// Allow TestEnv to set block_mgr manually for testing.
   friend class TestEnv;
@@ -325,7 +330,7 @@ class RuntimeState {
 
   static const int DEFAULT_BATCH_SIZE = 1024;
 
-  DescriptorTbl* desc_tbl_;
+  DescriptorTbl* desc_tbl_ = nullptr;
   boost::scoped_ptr<ObjectPool> obj_pool_;
 
   /// Lock protecting error_log_
@@ -351,7 +356,7 @@ class RuntimeState {
 
   /// Thread resource management object for this fragment's execution.  The 
runtime
   /// state is responsible for returning this pool to the thread mgr.
-  ThreadResourceMgr::ResourcePool* resource_pool_;
+  ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr;
 
   /// Temporary Hdfs files created, and where they should be moved to 
ultimately.
   /// Mapping a filename to a blank destination causes it to be deleted.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index f0caab7..5f98d13 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -19,6 +19,8 @@
 #include "util/disk-info.h"
 #include "util/impalad-metrics.h"
 
+#include "gutil/strings/substitute.h"
+
 #include <memory>
 
 #include "common/names.h"
@@ -47,8 +49,7 @@ void TestEnv::InitMetrics() {
   metrics_.reset(new MetricGroup("test-env-metrics"));
 }
 
-void TestEnv::InitTmpFileMgr(const std::vector<std::string>& tmp_dirs,
-    bool one_dir_per_device) {
+void TestEnv::InitTmpFileMgr(const vector<string>& tmp_dirs, bool 
one_dir_per_device) {
   // Need to recreate metrics to avoid error when registering metric twice.
   InitMetrics();
   tmp_file_mgr_.reset(new TmpFileMgr);
@@ -57,29 +58,26 @@ void TestEnv::InitTmpFileMgr(const 
std::vector<std::string>& tmp_dirs,
 
 TestEnv::~TestEnv() {
   // Queries must be torn down first since they are dependent on global state.
-  TearDownQueryStates();
+  TearDownRuntimeStates();
   exec_env_.reset();
   io_mgr_tracker_.reset();
   tmp_file_mgr_.reset();
   metrics_.reset();
 }
 
-RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id,
-    TQueryOptions* query_options) {
+Status TestEnv::CreatePerQueryState(int64_t query_id, int max_buffers, int 
block_size,
+    RuntimeState** runtime_state, TQueryOptions* query_options) {
+  // Enforce invariant that each query ID can be registered at most once.
+  if (runtime_states_.find(query_id) != runtime_states_.end()) {
+    return Status(Substitute("Duplicate query id found: $0", query_id));
+  }
+
   TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
   if (query_options != NULL) plan_params.query_ctx.request.query_options = 
*query_options;
   plan_params.query_ctx.query_id.hi = 0;
   plan_params.query_ctx.query_id.lo = query_id;
-  return new RuntimeState(plan_params, exec_env_.get());
-}
-
-Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int 
block_size,
-    RuntimeState** runtime_state, TQueryOptions* query_options) {
-  *runtime_state = CreateRuntimeState(query_id, query_options);
-  if (*runtime_state == NULL) {
-    return Status("Unexpected error creating RuntimeState");
-  }
 
+  *runtime_state = new RuntimeState(plan_params, exec_env_.get());
   (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);
 
   shared_ptr<BufferedBlockMgr> mgr;
@@ -88,24 +86,13 @@ Status TestEnv::CreateQueryState(int64_t query_id, int 
max_buffers, int block_si
       tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), 
block_size, &mgr));
   (*runtime_state)->set_block_mgr(mgr);
 
-  query_states_.push_back(shared_ptr<RuntimeState>(*runtime_state));
-  return Status::OK();
-}
-
-Status TestEnv::CreateQueryStates(int64_t start_query_id, int num_mgrs,
-    int buffers_per_mgr, int block_size,
-    vector<RuntimeState*>* runtime_states) {
-  for (int i = 0; i < num_mgrs; ++i) {
-    RuntimeState* runtime_state;
-    RETURN_IF_ERROR(CreateQueryState(start_query_id + i, buffers_per_mgr, 
block_size,
-        &runtime_state));
-    runtime_states->push_back(runtime_state);
-  }
+  runtime_states_[query_id] = shared_ptr<RuntimeState>(*runtime_state);
   return Status::OK();
 }
 
-void TestEnv::TearDownQueryStates() {
-  query_states_.clear();
+void TestEnv::TearDownRuntimeStates() {
+  for (auto& runtime_state : runtime_states_) 
runtime_state.second->ReleaseResources();
+  runtime_states_.clear();
 }
 
 
@@ -117,8 +104,8 @@ int64_t TestEnv::CalculateMemLimit(int max_buffers, int 
block_size) {
 
 int64_t TestEnv::TotalQueryMemoryConsumption() {
   int64_t total = 0;
-  for (shared_ptr<RuntimeState>& query_state : query_states_) {
-    total += query_state->query_mem_tracker()->consumption();
+  for (const auto& runtime_state : runtime_states_) {
+    total += runtime_state.second->query_mem_tracker()->consumption();
   }
   return total;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index d30424d..6399110 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -38,17 +38,13 @@ class TestEnv {
   void InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, bool 
one_dir_per_device);
 
   /// Create a RuntimeState for a query with a new block manager and the given 
query
-  /// options. The RuntimeState is owned by the TestEnv.
-  Status CreateQueryState(int64_t query_id, int max_buffers, int block_size,
+  /// options. The RuntimeState is owned by the TestEnv. Returns an error if
+  /// CreatePerQueryState() has been called with the same query ID already.
+  Status CreatePerQueryState(int64_t query_id, int max_buffers, int block_size,
       RuntimeState** runtime_state, TQueryOptions* query_options = NULL);
 
-  /// Create multiple separate RuntimeStates with associated block managers, 
e.g. as if
-  /// multiple queries were executing. The RuntimeStates are owned by TestEnv.
-  Status CreateQueryStates(int64_t start_query_id, int num_mgrs, int 
buffers_per_mgr,
-      int block_size, std::vector<RuntimeState*>* runtime_states);
-
   /// Destroy all RuntimeStates and block managers created by this TestEnv.
-  void TearDownQueryStates();
+  void TearDownRuntimeStates();
 
   /// Calculate memory limit accounting for overflow and negative values.
   /// If max_buffers is -1, no memory limit will apply.
@@ -63,14 +59,9 @@ class TestEnv {
   TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }
 
  private:
-
   /// Recreate global metric groups.
   void InitMetrics();
 
-  /// Create a new RuntimeState sharing global environment with given query 
options
-  RuntimeState* CreateRuntimeState(int64_t query_id,
-      TQueryOptions* query_options = NULL);
-
   /// Global state for test environment.
   static boost::scoped_ptr<MetricGroup> static_metrics_;
   boost::scoped_ptr<ExecEnv> exec_env_;
@@ -78,8 +69,9 @@ class TestEnv {
   boost::scoped_ptr<MetricGroup> metrics_;
   boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;
 
-  /// Per-query states with associated block managers.
-  vector<std::shared_ptr<RuntimeState>> query_states_;
+  /// Per-query states with associated block managers. Key is the integer 
query ID passed
+  /// to CreatePerQueryState().
+  std::unordered_map<int64_t, std::shared_ptr<RuntimeState>> runtime_states_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index b33cee1..ef951d6 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -26,25 +26,26 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "exec/catalog-op-executor.h"
-#include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "exprs/expr.h"
+#include "gen-cpp/Data_types.h"
+#include "gen-cpp/Frontend_types.h"
+#include "rpc/jni-thrift-util.h"
+#include "rpc/thrift-server.h"
+#include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
-#include "runtime/runtime-state.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/lib-cache.h"
-#include "runtime/client-cache.h"
+#include "runtime/runtime-state.h"
 #include "service/impala-server.h"
 #include "util/cpu-info.h"
+#include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/dynamic-util.h"
 #include "util/jni-util.h"
 #include "util/mem-info.h"
+#include "util/scope-exit-trigger.h"
 #include "util/symbols-util.h"
-#include "rpc/jni-thrift-util.h"
-#include "rpc/thrift-server.h"
-#include "util/debug-util.h"
-#include "gen-cpp/Data_types.h"
-#include "gen-cpp/Frontend_types.h"
 
 #include "common/names.h"
 
@@ -94,6 +95,8 @@ Java_org_apache_impala_service_FeSupport_NativeEvalConstExprs(
   // Java exception.
   query_ctx.request.query_options.max_errors = 1;
   RuntimeState state(query_ctx);
+  // Make sure to close the runtime state no matter how this scope is exited.
+  ScopeExitTrigger close_runtime_state([&state]() { state.ReleaseResources(); 
});
 
   THROW_IF_ERROR_RET(jni_frame.push(env), env, JniUtil::internal_exc_class(),
       result_bytes);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/util/scope-exit-trigger.h
----------------------------------------------------------------------
diff --git a/be/src/util/scope-exit-trigger.h b/be/src/util/scope-exit-trigger.h
new file mode 100644
index 0000000..42b808e
--- /dev/null
+++ b/be/src/util/scope-exit-trigger.h
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_SCOPE_EXIT_TRIGGER_H
+#define IMPALA_UTIL_SCOPE_EXIT_TRIGGER_H
+
+#include <functional>
+
+namespace impala {
+
+/// Utility class that calls a client-supplied function when it is destroyed.
+///
+/// Use judiciously - scope exits can be hard to reason about, and this class 
should not
+/// act as proxy for work-performing d'tors, which we try to avoid.
+class ScopeExitTrigger {
+ public:
+  ScopeExitTrigger(const auto& trigger) : trigger_(trigger) {}
+
+  ~ScopeExitTrigger() { trigger_(); }
+
+ private:
+  std::function<void()> trigger_;
+};
+}
+
+#endif

Reply via email to