Repository: incubator-impala Updated Branches: refs/heads/master bbf5255d0 -> 707f71b6e
IMPALA-4410: Safer tear-down of RuntimeState * Add RuntimeState::Close() which is guaranteed to release resources safely, rather than having PFE::Close() do the same piecemeal. * Fix for crash where PFE::Prepare() fails before descriptor table is created. * Remove some dead code from TestEnv, and rename some methods for clarity. Testing: Found by debug-actions, which has a reproducible test where PFE::Prepare() fails. Manually tested on master. Change-Id: Ie416e4d57240142bf685385299b749c3a6792c45 Reviewed-on: http://gerrit.cloudera.org:8080/4893 Tested-by: Internal Jenkins Reviewed-by: Henry Robinson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/707f71b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/707f71b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/707f71b6 Branch: refs/heads/master Commit: 707f71b6ea13487c707337785e785487d2f470f2 Parents: bbf5255 Author: Henry Robinson <[email protected]> Authored: Mon Oct 31 15:20:51 2016 -0700 Committer: Henry Robinson <[email protected]> Committed: Wed Nov 23 23:25:04 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hash-table-test.cc | 9 +++-- be/src/runtime/buffered-block-mgr-test.cc | 6 +-- be/src/runtime/buffered-tuple-stream-test.cc | 12 ++++-- be/src/runtime/plan-fragment-executor.cc | 5 +-- be/src/runtime/runtime-state.cc | 8 ++++ be/src/runtime/runtime-state.h | 13 ++++-- be/src/runtime/test-env.cc | 49 +++++++++-------------- be/src/runtime/test-env.h | 22 ++++------ be/src/service/fe-support.cc | 19 +++++---- be/src/util/scope-exit-trigger.h | 40 ++++++++++++++++++ 10 files changed, 111 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/exec/hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc index 9fb5038..a539bb1 100644 --- a/be/src/exec/hash-table-test.cc +++ b/be/src/exec/hash-table-test.cc @@ -58,6 +58,7 @@ class HashTableTest : public testing::Test { MemPool mem_pool_; vector<ExprContext*> build_expr_ctxs_; vector<ExprContext*> probe_expr_ctxs_; + int next_query_id_ = 0; virtual void SetUp() { test_env_.reset(new TestEnv()); @@ -184,8 +185,8 @@ class HashTableTest : public testing::Test { bool CreateHashTable(bool quadratic, int64_t initial_num_buckets, scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024, int max_num_blocks = 100, int reserved_blocks = 10) { - EXPECT_OK( - test_env_->CreateQueryState(0, max_num_blocks, block_size, &runtime_state_)); + EXPECT_OK(test_env_->CreatePerQueryState( + next_query_id_++, max_num_blocks, block_size, &runtime_state_)); MemTracker* client_tracker = pool_.Add( new MemTracker(-1, "client", runtime_state_->instance_mem_tracker())); BufferedBlockMgr::Client* client; @@ -602,8 +603,8 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) { // Test that hashing empty string updates hash value. TEST_F(HashTableTest, HashEmpty) { - EXPECT_TRUE(test_env_->CreateQueryState(0, 100, 8 * 1024 * 1024, - &runtime_state_).ok()); + EXPECT_TRUE( + test_env_->CreatePerQueryState(0, 100, 8 * 1024 * 1024, &runtime_state_).ok()); scoped_ptr<HashTableCtx> ht_ctx; Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, probe_expr_ctxs_, false /* !stores_nulls_ */, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/buffered-block-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc index 9f0222d..5e99c09 100644 --- a/be/src/runtime/buffered-block-mgr-test.cc +++ b/be/src/runtime/buffered-block-mgr-test.cc @@ -142,7 +142,7 @@ class BufferedBlockMgrTest : public ::testing::Test { BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int block_size, RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) { RuntimeState* state; - EXPECT_OK(test_env_->CreateQueryState( + EXPECT_OK(test_env_->CreatePerQueryState( query_id, max_buffers, block_size, &state, query_options)); if (query_state != NULL) *query_state = state; return state->block_mgr(); @@ -185,7 +185,7 @@ class BufferedBlockMgrTest : public ::testing::Test { void TearDownMgrs() { // Tear down the query states, which DCHECKs that the memory consumption of // the query's trackers is zero. - test_env_->TearDownQueryStates(); + test_env_->TearDownRuntimeStates(); } void AllocateBlocks(BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client, @@ -924,7 +924,7 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown( // scenario by holding onto a reference to the block mgr. This should be safe so // long as blocks are properly deleted before the runtime state is torn down. DeleteBlocks(blocks); - test_env_->TearDownQueryStates(); + test_env_->TearDownRuntimeStates(); // Optionally wait for writes to complete after cancellation. if (wait_for_writes) WaitForWrites(block_mgr.get()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/buffered-tuple-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index 9063a7d..e98c334 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -101,7 +101,7 @@ class SimpleTupleStreamTest : public testing::Test { /// Setup a block manager with the provided settings and client with no reservation, /// tracked by tracker_. void InitBlockMgr(int64_t limit, int block_size) { - ASSERT_OK(test_env_->CreateQueryState(0, limit, block_size, &runtime_state_)); + ASSERT_OK(test_env_->CreatePerQueryState(0, limit, block_size, &runtime_state_)); MemTracker* client_tracker = pool_.Add( new MemTracker(-1, "client", runtime_state_->instance_mem_tracker())); ASSERT_OK(runtime_state_->block_mgr()->RegisterClient( @@ -723,14 +723,20 @@ void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write) } /// Test attaching memory to a row batch from a pinned stream. -TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStream) { +TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamReadWrite) { TestTransferMemory(true, true); +} + +TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamNoReadWrite) { TestTransferMemory(true, false); } /// Test attaching memory to a row batch from an unpinned stream. -TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStream) { +TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamReadWrite) { TestTransferMemory(false, true); +} + +TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) { TestTransferMemory(false, false); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index f673f34..ad781b5 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -534,10 +534,7 @@ void PlanFragmentExecutor::Close() { // Prepare should always have been called, and so runtime_state_ should be set DCHECK(prepared_promise_.IsSet()); if (exec_tree_ != NULL) exec_tree_->Close(runtime_state_.get()); - runtime_state_->UnregisterReaderContexts(); - exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool()); - runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get()); - runtime_state_->filter_bank()->Close(); + runtime_state_->ReleaseResources(); if (mem_usage_sampled_counter_ != NULL) { PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 37d5fb0..d0e1172 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -306,4 +306,12 @@ void RuntimeState::UnregisterReaderContexts() { reader_contexts_.clear(); } +void RuntimeState::ReleaseResources() { + UnregisterReaderContexts(); + if (desc_tbl_ != nullptr) desc_tbl_->ClosePartitionExprs(this); + if (filter_bank_ != nullptr) filter_bank_->Close(); + if (resource_pool_ != nullptr) { + ExecEnv::GetInstance()->thread_mgr()->UnregisterPool(resource_pool_); + } +} } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 7003e59..de3ab94 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -62,8 +62,10 @@ typedef std::map<std::string, TInsertStats> PartitionInsertStats; /// deleted. typedef std::map<std::string, std::string> FileMoveMap; -/// A collection of items that are part of the global state of a -/// query and shared across all execution nodes of that query. +/// A collection of items that are part of the global state of a query and shared across +/// all execution nodes of that query. After initialisation, callers must call +/// ReleaseResources() to ensure that all resources are correctly freed before +/// destruction. class RuntimeState { public: RuntimeState(const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env); @@ -311,6 +313,9 @@ class RuntimeState { /// TODO: Fix IMPALA-4233 Status CodegenScalarFns(); + /// Release resources and prepare this object for destruction. + void ReleaseResources(); + private: /// Allow TestEnv to set block_mgr manually for testing. friend class TestEnv; @@ -325,7 +330,7 @@ class RuntimeState { static const int DEFAULT_BATCH_SIZE = 1024; - DescriptorTbl* desc_tbl_; + DescriptorTbl* desc_tbl_ = nullptr; boost::scoped_ptr<ObjectPool> obj_pool_; /// Lock protecting error_log_ @@ -351,7 +356,7 @@ class RuntimeState { /// Thread resource management object for this fragment's execution. The runtime /// state is responsible for returning this pool to the thread mgr. - ThreadResourceMgr::ResourcePool* resource_pool_; + ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr; /// Temporary Hdfs files created, and where they should be moved to ultimately. /// Mapping a filename to a blank destination causes it to be deleted. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index f0caab7..5f98d13 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -19,6 +19,8 @@ #include "util/disk-info.h" #include "util/impalad-metrics.h" +#include "gutil/strings/substitute.h" + #include <memory> #include "common/names.h" @@ -47,8 +49,7 @@ void TestEnv::InitMetrics() { metrics_.reset(new MetricGroup("test-env-metrics")); } -void TestEnv::InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, - bool one_dir_per_device) { +void TestEnv::InitTmpFileMgr(const vector<string>& tmp_dirs, bool one_dir_per_device) { // Need to recreate metrics to avoid error when registering metric twice. InitMetrics(); tmp_file_mgr_.reset(new TmpFileMgr); @@ -57,29 +58,26 @@ void TestEnv::InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, TestEnv::~TestEnv() { // Queries must be torn down first since they are dependent on global state. - TearDownQueryStates(); + TearDownRuntimeStates(); exec_env_.reset(); io_mgr_tracker_.reset(); tmp_file_mgr_.reset(); metrics_.reset(); } -RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id, - TQueryOptions* query_options) { +Status TestEnv::CreatePerQueryState(int64_t query_id, int max_buffers, int block_size, + RuntimeState** runtime_state, TQueryOptions* query_options) { + // Enforce invariant that each query ID can be registered at most once. + if (runtime_states_.find(query_id) != runtime_states_.end()) { + return Status(Substitute("Duplicate query id found: $0", query_id)); + } + TExecPlanFragmentParams plan_params = TExecPlanFragmentParams(); if (query_options != NULL) plan_params.query_ctx.request.query_options = *query_options; plan_params.query_ctx.query_id.hi = 0; plan_params.query_ctx.query_id.lo = query_id; - return new RuntimeState(plan_params, exec_env_.get()); -} - -Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size, - RuntimeState** runtime_state, TQueryOptions* query_options) { - *runtime_state = CreateRuntimeState(query_id, query_options); - if (*runtime_state == NULL) { - return Status("Unexpected error creating RuntimeState"); - } + *runtime_state = new RuntimeState(plan_params, exec_env_.get()); (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1); shared_ptr<BufferedBlockMgr> mgr; @@ -88,24 +86,13 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr)); (*runtime_state)->set_block_mgr(mgr); - query_states_.push_back(shared_ptr<RuntimeState>(*runtime_state)); - return Status::OK(); -} - -Status TestEnv::CreateQueryStates(int64_t start_query_id, int num_mgrs, - int buffers_per_mgr, int block_size, - vector<RuntimeState*>* runtime_states) { - for (int i = 0; i < num_mgrs; ++i) { - RuntimeState* runtime_state; - RETURN_IF_ERROR(CreateQueryState(start_query_id + i, buffers_per_mgr, block_size, - &runtime_state)); - runtime_states->push_back(runtime_state); - } + runtime_states_[query_id] = shared_ptr<RuntimeState>(*runtime_state); return Status::OK(); } -void TestEnv::TearDownQueryStates() { - query_states_.clear(); +void TestEnv::TearDownRuntimeStates() { + for (auto& runtime_state : runtime_states_) runtime_state.second->ReleaseResources(); + runtime_states_.clear(); } @@ -117,8 +104,8 @@ int64_t TestEnv::CalculateMemLimit(int max_buffers, int block_size) { int64_t TestEnv::TotalQueryMemoryConsumption() { int64_t total = 0; - for (shared_ptr<RuntimeState>& query_state : query_states_) { - total += query_state->query_mem_tracker()->consumption(); + for (const auto& runtime_state : runtime_states_) { + total += runtime_state.second->query_mem_tracker()->consumption(); } return total; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/runtime/test-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h index d30424d..6399110 100644 --- a/be/src/runtime/test-env.h +++ b/be/src/runtime/test-env.h @@ -38,17 +38,13 @@ class TestEnv { void InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device); /// Create a RuntimeState for a query with a new block manager and the given query - /// options. The RuntimeState is owned by the TestEnv. - Status CreateQueryState(int64_t query_id, int max_buffers, int block_size, + /// options. The RuntimeState is owned by the TestEnv. Returns an error if + /// CreatePerQueryState() has been called with the same query ID already. + Status CreatePerQueryState(int64_t query_id, int max_buffers, int block_size, RuntimeState** runtime_state, TQueryOptions* query_options = NULL); - /// Create multiple separate RuntimeStates with associated block managers, e.g. as if - /// multiple queries were executing. The RuntimeStates are owned by TestEnv. - Status CreateQueryStates(int64_t start_query_id, int num_mgrs, int buffers_per_mgr, - int block_size, std::vector<RuntimeState*>* runtime_states); - /// Destroy all RuntimeStates and block managers created by this TestEnv. - void TearDownQueryStates(); + void TearDownRuntimeStates(); /// Calculate memory limit accounting for overflow and negative values. /// If max_buffers is -1, no memory limit will apply. @@ -63,14 +59,9 @@ class TestEnv { TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); } private: - /// Recreate global metric groups. void InitMetrics(); - /// Create a new RuntimeState sharing global environment with given query options - RuntimeState* CreateRuntimeState(int64_t query_id, - TQueryOptions* query_options = NULL); - /// Global state for test environment. static boost::scoped_ptr<MetricGroup> static_metrics_; boost::scoped_ptr<ExecEnv> exec_env_; @@ -78,8 +69,9 @@ class TestEnv { boost::scoped_ptr<MetricGroup> metrics_; boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_; - /// Per-query states with associated block managers. - vector<std::shared_ptr<RuntimeState>> query_states_; + /// Per-query states with associated block managers. Key is the integer query ID passed + /// to CreatePerQueryState(). + std::unordered_map<int64_t, std::shared_ptr<RuntimeState>> runtime_states_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index b33cee1..ef951d6 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -26,25 +26,26 @@ #include "common/logging.h" #include "common/status.h" #include "exec/catalog-op-executor.h" -#include "exprs/expr.h" #include "exprs/expr-context.h" +#include "exprs/expr.h" +#include "gen-cpp/Data_types.h" +#include "gen-cpp/Frontend_types.h" +#include "rpc/jni-thrift-util.h" +#include "rpc/thrift-server.h" +#include "runtime/client-cache.h" #include "runtime/exec-env.h" -#include "runtime/runtime-state.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/lib-cache.h" -#include "runtime/client-cache.h" +#include "runtime/runtime-state.h" #include "service/impala-server.h" #include "util/cpu-info.h" +#include "util/debug-util.h" #include "util/disk-info.h" #include "util/dynamic-util.h" #include "util/jni-util.h" #include "util/mem-info.h" +#include "util/scope-exit-trigger.h" #include "util/symbols-util.h" -#include "rpc/jni-thrift-util.h" -#include "rpc/thrift-server.h" -#include "util/debug-util.h" -#include "gen-cpp/Data_types.h" -#include "gen-cpp/Frontend_types.h" #include "common/names.h" @@ -94,6 +95,8 @@ Java_org_apache_impala_service_FeSupport_NativeEvalConstExprs( // Java exception. query_ctx.request.query_options.max_errors = 1; RuntimeState state(query_ctx); + // Make sure to close the runtime state no matter how this scope is exited. + ScopeExitTrigger close_runtime_state([&state]() { state.ReleaseResources(); }); THROW_IF_ERROR_RET(jni_frame.push(env), env, JniUtil::internal_exc_class(), result_bytes); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/707f71b6/be/src/util/scope-exit-trigger.h ---------------------------------------------------------------------- diff --git a/be/src/util/scope-exit-trigger.h b/be/src/util/scope-exit-trigger.h new file mode 100644 index 0000000..42b808e --- /dev/null +++ b/be/src/util/scope-exit-trigger.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_UTIL_SCOPE_EXIT_TRIGGER_H +#define IMPALA_UTIL_SCOPE_EXIT_TRIGGER_H + +#include <functional> + +namespace impala { + +/// Utility class that calls a client-supplied function when it is destroyed. +/// +/// Use judiciously - scope exits can be hard to reason about, and this class should not +/// act as proxy for work-performing d'tors, which we try to avoid. +class ScopeExitTrigger { + public: + ScopeExitTrigger(const auto& trigger) : trigger_(trigger) {} + + ~ScopeExitTrigger() { trigger_(); } + + private: + std::function<void()> trigger_; +}; +} + +#endif
