Repository: incubator-impala Updated Branches: refs/heads/master 756231880 -> 6d15f0377
IMPALA-4689: Fix computation of last active time The last active time in impala-server.cc#L1806 is in milliseconds, but the TimestampValue c'tor expects seconds. This change also renames some variables to make their meaning more explicit, aiming to prevent similar bugs in the future. This change also fixes a bug that occurred when during startup of the local minicluster the operating system PIDs would wrap around. This way the first impalad would not be the one with the smallest PID and ImpalaCluster.get_first_impalad() would return the wrong one. I ran git-clang-format on the change. Change-Id: I283564c8d8e145d44d9493f4201555d3a1087edf Reviewed-on: http://gerrit.cloudera.org:8080/5546 Reviewed-by: Thomas Tauber-Marshall <[email protected]> Reviewed-by: Marcel Kornacker <[email protected]> Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/25ebf586 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/25ebf586 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/25ebf586 Branch: refs/heads/master Commit: 25ebf586e00adc058412fd083b5c4768fa16cc7d Parents: 7562318 Author: Lars Volker <[email protected]> Authored: Fri Dec 16 17:04:04 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Wed Jan 4 12:12:04 2017 +0000 ---------------------------------------------------------------------- be/src/service/impala-http-handler.cc | 2 +- be/src/service/impala-server.cc | 11 ++++++----- be/src/service/impala-server.h | 2 +- be/src/service/query-exec-state.cc | 14 +++++++------- be/src/service/query-exec-state.h | 18 ++++++++++-------- tests/common/impala_cluster.py | 6 +++++- tests/custom_cluster/test_query_expiration.py | 8 +++++--- 7 files changed, 35 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index a324b1f..2fb52e7 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -346,7 +346,7 @@ void ImpalaHttpHandler::QueryStateToJson(const ImpalaServer::QueryStateRecord& r value->AddMember("waiting", waiting, document->GetAllocator()); value->AddMember("executing", !waiting, document->GetAllocator()); - int64_t waiting_time = impala::UnixMillis() - record.last_active_time; + int64_t waiting_time = impala::UnixMillis() - record.last_active_time_ms; string waiting_time_str = ""; if (waiting_time > 0) { waiting_time_str = PrettyPrinter::Print(waiting_time, TUnit::TIME_MS); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index e159fa2..722baca 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1604,7 +1604,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_stat plan_exec_info.fragments.begin(), plan_exec_info.fragments.end()); } all_rows_returned = exec_state.eos(); - last_active_time = exec_state.last_active(); + last_active_time_ms = exec_state.last_active_ms(); request_pool = exec_state.request_pool(); } @@ -1784,7 +1784,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) { // Use a non-zero timeout, if one exists timeout_s = max(FLAGS_idle_query_timeout, timeout_s); } - int64_t expiration = query_state->last_active() + (timeout_s * 1000L); + int64_t expiration = query_state->last_active_ms() + (timeout_s * 1000L); if (now < expiration) { // If the real expiration date is in the future we may need to re-insert the // query's expiration event at its correct location. @@ -1801,9 +1801,10 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) { } } else if (!query_state->is_active()) { // Otherwise time to expire this query - VLOG_QUERY << "Expiring query due to client inactivity: " - << expiration_event->second << ", last activity was at: " - << TimestampValue(query_state->last_active()).DebugString(); + VLOG_QUERY + << "Expiring query due to client inactivity: " << expiration_event->second + << ", last activity was at: " + << TimestampValue(query_state->last_active_ms() / 1000).DebugString(); const string& err_msg = Substitute( "Query $0 expired due to client inactivity (timeout is $1)", PrintId(expiration_event->second), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 0697357..588a5c3 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -494,7 +494,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, bool all_rows_returned; // The most recent time this query was actively being processed, in Unix milliseconds. - int64_t last_active_time; + int64_t last_active_time_ms; /// Request pool to which the request was submitted for admission, or an empty string /// if this request doesn't have a pool. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index 6fa6d4a..4a9d8f6 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -61,11 +61,11 @@ static const string PER_HOST_VCORES_KEY = "Estimated Per-Host VCores"; static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats"; static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats"; -ImpalaServer::QueryExecState::QueryExecState( - const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend, - ImpalaServer* server, shared_ptr<SessionState> session) +ImpalaServer::QueryExecState::QueryExecState(const TQueryCtx& query_ctx, + ExecEnv* exec_env, Frontend* frontend, ImpalaServer* server, + shared_ptr<SessionState> session) : query_ctx_(query_ctx), - last_active_time_(numeric_limits<int64_t>::max()), + last_active_time_ms_(numeric_limits<int64_t>::max()), ref_count_(0L), child_query_executor_(new ChildQueryExecutor), exec_env_(exec_env), @@ -74,7 +74,7 @@ ImpalaServer::QueryExecState::QueryExecState( schedule_(NULL), coord_(NULL), result_cache_max_size_(-1), - profile_(&profile_pool_, "Query"), // assign name w/ id after planning + profile_(&profile_pool_, "Query"), // assign name w/ id after planning server_profile_(&profile_pool_, "ImpalaServer"), summary_profile_(&profile_pool_, "Summary"), is_cancelled_(false), @@ -994,7 +994,7 @@ void ImpalaServer::QueryExecState::SetCreateTableAsSelectResultSet() { void ImpalaServer::QueryExecState::MarkInactive() { client_wait_sw_.Start(); lock_guard<mutex> l(expiration_data_lock_); - last_active_time_ = UnixMillis(); + last_active_time_ms_ = UnixMillis(); DCHECK(ref_count_ > 0) << "Invalid MarkInactive()"; --ref_count_; } @@ -1004,7 +1004,7 @@ void ImpalaServer::QueryExecState::MarkActive() { int64_t elapsed_time = client_wait_sw_.ElapsedTime(); client_wait_timer_->Set(elapsed_time); lock_guard<mutex> l(expiration_data_lock_); - last_active_time_ = UnixMillis(); + last_active_time_ms_ = UnixMillis(); ++ref_count_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/query-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h index 20cd7bf..8ac066e 100644 --- a/be/src/service/query-exec-state.h +++ b/be/src/service/query-exec-state.h @@ -195,9 +195,9 @@ class ImpalaServer::QueryExecState { return exec_request_.analysis_warnings; } - inline int64_t last_active() const { + inline int64_t last_active_ms() const { boost::lock_guard<boost::mutex> l(expiration_data_lock_); - return last_active_time_; + return last_active_time_ms_; } /// Returns true if Impala is actively processing this query. @@ -219,10 +219,12 @@ class ImpalaServer::QueryExecState { /// See "Locking" in the class comment for lock acquisition order. boost::mutex fetch_rows_lock_; - /// Protects last_active_time_ and ref_count_. Only held during short function calls - - /// no other locks should be acquired while holding this lock. + /// Protects last_active_time_ms_ and ref_count_. Only held during short function calls + /// - no other locks should be acquired while holding this lock. mutable boost::mutex expiration_data_lock_; - int64_t last_active_time_; + + /// Stores the last time that the query was actively doing work, in Unix milliseconds. + int64_t last_active_time_ms_; /// ref_count_ > 0 if Impala is currently performing work on this query's behalf. Every /// time a client instructs Impala to do work on behalf of this query, the ref count is @@ -335,11 +337,11 @@ class ImpalaServer::QueryExecState { /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements. Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op); - /// Updates last_active_time_ and ref_count_ to reflect that query is currently not doing - /// any work. Takes expiration_data_lock_. + /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently not + /// doing any work. Takes expiration_data_lock_. void MarkInactive(); - /// Updates last_active_time_ and ref_count_ to reflect that query is currently being + /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently being /// actively processed. Takes expiration_data_lock_. void MarkActive(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/tests/common/impala_cluster.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index a5e13b3..d27b89a 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -119,6 +119,10 @@ class ImpalaCluster(object): # A process from get_pid_list() no longer exists, continue. LOG.info(e) continue + # If the operating system PIDs wrap around during startup of the local minicluster, + # the order of the impalads is incorrect. We order them by their backend port, so that + # get_first_impalad() always returns the first one. + impalads.sort(key = lambda i: i.service.be_port) return impalads, statestored, catalogd # Represents a process running on a machine and common actions that can be performed @@ -130,7 +134,7 @@ class Process(object): 'Process object must be created with valid command line argument list' def get_pid(self): - """Gets the pid of the process. Returns None if the PID cannot be determined""" + """Gets the PID of the process. Returns None if the PID cannot be determined""" LOG.info("Attempting to find PID for %s" % ' '.join(self.cmd)) for pid in psutil.get_pid_list(): try: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/tests/custom_cluster/test_query_expiration.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_query_expiration.py b/tests/custom_cluster/test_query_expiration.py index 8757116..cbe29ff 100644 --- a/tests/custom_cluster/test_query_expiration.py +++ b/tests/custom_cluster/test_query_expiration.py @@ -37,10 +37,10 @@ class TestQueryExpiration(CustomClusterTestSuite): assert actual == expected @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--idle_query_timeout=6") - def test_query_expiration(self, vector): + @CustomClusterTestSuite.with_args("--idle_query_timeout=6 --logbuflevel=-1") + def test_query_expiration_test(self, vector): """Confirm that single queries expire if not fetched""" - impalad = self.cluster.get_any_impalad() + impalad = self.cluster.get_first_impalad() client = impalad.service.create_beeswax_client() num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired') handle = client.execute_async("SELECT SLEEP(1000000)") @@ -59,6 +59,8 @@ class TestQueryExpiration(CustomClusterTestSuite): assert num_expired + 1 == impalad.service.get_metric_value( 'impala-server.num-queries-expired') self._check_num_executing(impalad, 2) + self.assert_impalad_log_contains('INFO', "Expiring query due to client inactivity: " + "[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d") impalad.service.wait_for_metric_value('impala-server.num-queries-expired', num_expired + 3)
