Repository: incubator-impala
Updated Branches:
  refs/heads/master 756231880 -> 6d15f0377


IMPALA-4689: Fix computation of last active time

The last active time in impala-server.cc#L1806 is in milliseconds, but
the TimestampValue c'tor expects seconds. This change also renames some
variables to make their meaning more explicit, aiming to prevent similar
bugs in the future.

This change also fixes a bug that occurred when during startup of the
local minicluster the operating system PIDs would wrap around. This way
the first impalad would not be the one with the smallest PID and
ImpalaCluster.get_first_impalad() would return the wrong one.

I ran git-clang-format on the change.

Change-Id: I283564c8d8e145d44d9493f4201555d3a1087edf
Reviewed-on: http://gerrit.cloudera.org:8080/5546
Reviewed-by: Thomas Tauber-Marshall <[email protected]>
Reviewed-by: Marcel Kornacker <[email protected]>
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/25ebf586
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/25ebf586
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/25ebf586

Branch: refs/heads/master
Commit: 25ebf586e00adc058412fd083b5c4768fa16cc7d
Parents: 7562318
Author: Lars Volker <[email protected]>
Authored: Fri Dec 16 17:04:04 2016 -0800
Committer: Internal Jenkins <[email protected]>
Committed: Wed Jan 4 12:12:04 2017 +0000

----------------------------------------------------------------------
 be/src/service/impala-http-handler.cc         |  2 +-
 be/src/service/impala-server.cc               | 11 ++++++-----
 be/src/service/impala-server.h                |  2 +-
 be/src/service/query-exec-state.cc            | 14 +++++++-------
 be/src/service/query-exec-state.h             | 18 ++++++++++--------
 tests/common/impala_cluster.py                |  6 +++++-
 tests/custom_cluster/test_query_expiration.py |  8 +++++---
 7 files changed, 35 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index a324b1f..2fb52e7 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -346,7 +346,7 @@ void ImpalaHttpHandler::QueryStateToJson(const 
ImpalaServer::QueryStateRecord& r
   value->AddMember("waiting", waiting, document->GetAllocator());
   value->AddMember("executing", !waiting, document->GetAllocator());
 
-  int64_t waiting_time = impala::UnixMillis() - record.last_active_time;
+  int64_t waiting_time = impala::UnixMillis() - record.last_active_time_ms;
   string waiting_time_str = "";
   if (waiting_time > 0) {
     waiting_time_str = PrettyPrinter::Print(waiting_time, TUnit::TIME_MS);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index e159fa2..722baca 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1604,7 +1604,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const 
QueryExecState& exec_stat
         plan_exec_info.fragments.begin(), plan_exec_info.fragments.end());
   }
   all_rows_returned = exec_state.eos();
-  last_active_time = exec_state.last_active();
+  last_active_time_ms = exec_state.last_active_ms();
   request_pool = exec_state.request_pool();
 }
 
@@ -1784,7 +1784,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t 
session_timeout) {
           // Use a non-zero timeout, if one exists
           timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
         }
-        int64_t expiration = query_state->last_active() + (timeout_s * 1000L);
+        int64_t expiration = query_state->last_active_ms() + (timeout_s * 
1000L);
         if (now < expiration) {
           // If the real expiration date is in the future we may need to 
re-insert the
           // query's expiration event at its correct location.
@@ -1801,9 +1801,10 @@ void ImpalaServer::RegisterSessionTimeout(int32_t 
session_timeout) {
           }
         } else if (!query_state->is_active()) {
           // Otherwise time to expire this query
-          VLOG_QUERY << "Expiring query due to client inactivity: "
-                     << expiration_event->second << ", last activity was at: "
-                     << 
TimestampValue(query_state->last_active()).DebugString();
+          VLOG_QUERY
+              << "Expiring query due to client inactivity: " << 
expiration_event->second
+              << ", last activity was at: "
+              << TimestampValue(query_state->last_active_ms() / 
1000).DebugString();
           const string& err_msg = Substitute(
               "Query $0 expired due to client inactivity (timeout is $1)",
               PrintId(expiration_event->second),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 0697357..588a5c3 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -494,7 +494,7 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
     bool all_rows_returned;
 
     // The most recent time this query was actively being processed, in Unix 
milliseconds.
-    int64_t last_active_time;
+    int64_t last_active_time_ms;
 
     /// Request pool to which the request was submitted for admission, or an 
empty string
     /// if this request doesn't have a pool.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc 
b/be/src/service/query-exec-state.cc
index 6fa6d4a..4a9d8f6 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -61,11 +61,11 @@ static const string PER_HOST_VCORES_KEY = "Estimated 
Per-Host VCores";
 static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
 static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table 
Stats";
 
-ImpalaServer::QueryExecState::QueryExecState(
-    const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
-    ImpalaServer* server, shared_ptr<SessionState> session)
+ImpalaServer::QueryExecState::QueryExecState(const TQueryCtx& query_ctx,
+    ExecEnv* exec_env, Frontend* frontend, ImpalaServer* server,
+    shared_ptr<SessionState> session)
   : query_ctx_(query_ctx),
-    last_active_time_(numeric_limits<int64_t>::max()),
+    last_active_time_ms_(numeric_limits<int64_t>::max()),
     ref_count_(0L),
     child_query_executor_(new ChildQueryExecutor),
     exec_env_(exec_env),
@@ -74,7 +74,7 @@ ImpalaServer::QueryExecState::QueryExecState(
     schedule_(NULL),
     coord_(NULL),
     result_cache_max_size_(-1),
-    profile_(&profile_pool_, "Query"),  // assign name w/ id after planning
+    profile_(&profile_pool_, "Query"), // assign name w/ id after planning
     server_profile_(&profile_pool_, "ImpalaServer"),
     summary_profile_(&profile_pool_, "Summary"),
     is_cancelled_(false),
@@ -994,7 +994,7 @@ void 
ImpalaServer::QueryExecState::SetCreateTableAsSelectResultSet() {
 void ImpalaServer::QueryExecState::MarkInactive() {
   client_wait_sw_.Start();
   lock_guard<mutex> l(expiration_data_lock_);
-  last_active_time_ = UnixMillis();
+  last_active_time_ms_ = UnixMillis();
   DCHECK(ref_count_ > 0) << "Invalid MarkInactive()";
   --ref_count_;
 }
@@ -1004,7 +1004,7 @@ void ImpalaServer::QueryExecState::MarkActive() {
   int64_t elapsed_time = client_wait_sw_.ElapsedTime();
   client_wait_timer_->Set(elapsed_time);
   lock_guard<mutex> l(expiration_data_lock_);
-  last_active_time_ = UnixMillis();
+  last_active_time_ms_ = UnixMillis();
   ++ref_count_;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h 
b/be/src/service/query-exec-state.h
index 20cd7bf..8ac066e 100644
--- a/be/src/service/query-exec-state.h
+++ b/be/src/service/query-exec-state.h
@@ -195,9 +195,9 @@ class ImpalaServer::QueryExecState {
     return exec_request_.analysis_warnings;
   }
 
-  inline int64_t last_active() const {
+  inline int64_t last_active_ms() const {
     boost::lock_guard<boost::mutex> l(expiration_data_lock_);
-    return last_active_time_;
+    return last_active_time_ms_;
   }
 
   /// Returns true if Impala is actively processing this query.
@@ -219,10 +219,12 @@ class ImpalaServer::QueryExecState {
   /// See "Locking" in the class comment for lock acquisition order.
   boost::mutex fetch_rows_lock_;
 
-  /// Protects last_active_time_ and ref_count_. Only held during short 
function calls -
-  /// no other locks should be acquired while holding this lock.
+  /// Protects last_active_time_ms_ and ref_count_. Only held during short 
function calls
+  /// - no other locks should be acquired while holding this lock.
   mutable boost::mutex expiration_data_lock_;
-  int64_t last_active_time_;
+
+  /// Stores the last time that the query was actively doing work, in Unix 
milliseconds.
+  int64_t last_active_time_ms_;
 
   /// ref_count_ > 0 if Impala is currently performing work on this query's 
behalf. Every
   /// time a client instructs Impala to do work on behalf of this query, the 
ref count is
@@ -335,11 +337,11 @@ class ImpalaServer::QueryExecState {
   /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN 
statements.
   Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op);
 
-  /// Updates last_active_time_ and ref_count_ to reflect that query is 
currently not doing
-  /// any work. Takes expiration_data_lock_.
+  /// Updates last_active_time_ms_ and ref_count_ to reflect that query is 
currently not
+  /// doing any work. Takes expiration_data_lock_.
   void MarkInactive();
 
-  /// Updates last_active_time_ and ref_count_ to reflect that query is 
currently being
+  /// Updates last_active_time_ms_ and ref_count_ to reflect that query is 
currently being
   /// actively processed. Takes expiration_data_lock_.
   void MarkActive();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index a5e13b3..d27b89a 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -119,6 +119,10 @@ class ImpalaCluster(object):
         # A process from get_pid_list() no longer exists, continue.
         LOG.info(e)
         continue
+    # If the operating system PIDs wrap around during startup of the local 
minicluster,
+    # the order of the impalads is incorrect. We order them by their backend 
port, so that
+    # get_first_impalad() always returns the first one.
+    impalads.sort(key = lambda i: i.service.be_port)
     return impalads, statestored, catalogd
 
 # Represents a process running on a machine and common actions that can be 
performed
@@ -130,7 +134,7 @@ class Process(object):
         'Process object must be created with valid command line argument list'
 
   def get_pid(self):
-    """Gets the pid of the process. Returns None if the PID cannot be 
determined"""
+    """Gets the PID of the process. Returns None if the PID cannot be 
determined"""
     LOG.info("Attempting to find PID for %s" % ' '.join(self.cmd))
     for pid in psutil.get_pid_list():
       try:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/tests/custom_cluster/test_query_expiration.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_query_expiration.py 
b/tests/custom_cluster/test_query_expiration.py
index 8757116..cbe29ff 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -37,10 +37,10 @@ class TestQueryExpiration(CustomClusterTestSuite):
     assert actual == expected
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_query_timeout=6")
-  def test_query_expiration(self, vector):
+  @CustomClusterTestSuite.with_args("--idle_query_timeout=6 --logbuflevel=-1")
+  def test_query_expiration_test(self, vector):
     """Confirm that single queries expire if not fetched"""
-    impalad = self.cluster.get_any_impalad()
+    impalad = self.cluster.get_first_impalad()
     client = impalad.service.create_beeswax_client()
     num_expired = 
impalad.service.get_metric_value('impala-server.num-queries-expired')
     handle = client.execute_async("SELECT SLEEP(1000000)")
@@ -59,6 +59,8 @@ class TestQueryExpiration(CustomClusterTestSuite):
     assert num_expired + 1 == impalad.service.get_metric_value(
       'impala-server.num-queries-expired')
     self._check_num_executing(impalad, 2)
+    self.assert_impalad_log_contains('INFO', "Expiring query due to client 
inactivity: "
+        "[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d 
\d\d:\d\d:\d\d")
     impalad.service.wait_for_metric_value('impala-server.num-queries-expired',
                                           num_expired + 3)
 

Reply via email to