IMPALA-1575: part 2: yield admission control resources

This change releases admission control resources more eagerly,
once the query has finished actively executing. Some resources
(tracked and untracked) are still consumed by the client request
as long as it remains open, e.g. memory for control structures
and the result cache. However, these resources are relatively
small and should not block admission of new queries.

The same as in part 1, query execution is considered to be finished
under any of the following conditions:
1. The query encounters an error and fails
2. The query is cancelled due to the idle query timeout
3. The query reaches eos (or the DML completes)
4. The client cancels the query without closing the query

Admission control resources are released in two ways:
1. by calling AdmissionController::ReleaseQuery() on the coordinator
   promptly after query execution finishes, instead of waiting for
   UnregisterQuery(). This means that the query and its memory is
   no longer considered "admitted".
2. by changing the behaviour of MemTracker::GetPoolMemReserved() so
   that it is aware of when a query has finished executing and does not
   consider its entire memory limit to be "reserved".

The preconditions for releasing an admitted query are subtle because the
queries are being admitted to a distributed system, not just the
coordinator.  The comment for ReleaseAdmissionControlResources()
documents the preconditions and rationale. Note that the preconditions
are not weaker than the preconditions of calling UnregisterQuery()
before this patch.

Testing:
TestAdmissionController is extended to end queries in four ways:
cancellation by client, idle timeout, the last row being fetched,
and the client closing the query. The test uses a mix of all four.
After the query ends, all clients wait for the test to complete
before closing the query or closing the connection. This ensures
that the admission control decisions are based entirely on the
query end behavior. This test works for both query admission control
and mem_limit admission control and can detect both kinds of admission
control resources ("admitted" and "reserved") not being released
promptly.

This is based on an earlier patch by Joe McDonnell.

Change-Id: I80279eb2bda740d7f61420f52db3bfa42a6a51ac
Reviewed-on: http://gerrit.cloudera.org:8080/8323
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fe90867d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fe90867d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fe90867d

Branch: refs/heads/master
Commit: fe90867d890c71bfdcf8ff941f8ec51e36083f25
Parents: 94236ff
Author: Tim Armstrong <[email protected]>
Authored: Tue Oct 17 16:25:24 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Nov 7 05:16:11 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc                   |  29 +++-
 be/src/runtime/coordinator.h                    |  38 ++++-
 be/src/runtime/mem-tracker.cc                   |   6 +-
 be/src/runtime/mem-tracker.h                    |  16 +-
 be/src/runtime/query-state.cc                   |   3 +
 be/src/scheduling/admission-controller.cc       |  13 +-
 be/src/scheduling/admission-controller.h        |  15 +-
 be/src/service/client-request-state.cc          |  11 --
 .../custom_cluster/test_admission_controller.py | 149 ++++++++++++-------
 9 files changed, 189 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index d18d658..94b0bdb 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -37,6 +37,7 @@
 #include "runtime/coordinator-backend-state.h"
 #include "runtime/debug-options.h"
 #include "runtime/query-state.h"
+#include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
 #include "util/bloom-filter.h"
 #include "util/counting-barrier.h"
@@ -79,6 +80,9 @@ Coordinator::Coordinator(
 Coordinator::~Coordinator() {
   DCHECK(released_exec_resources_)
       << "ReleaseExecResources() must be called before Coordinator is 
destroyed";
+  DCHECK(released_admission_control_resources_)
+      << "ReleaseAdmissionControlResources() must be called before Coordinator 
is "
+      << "destroyed";
   if (query_state_ != nullptr) {
     ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
   }
@@ -817,15 +821,16 @@ Status Coordinator::Wait() {
   // Execution of query fragments has finished. We don't need to hold onto 
query execution
   // resources while we finalize the query.
   ReleaseExecResources();
-
   // Query finalization is required only for HDFS table sinks
   if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery());
+  // Release admission control resources after we'd done the potentially 
heavyweight
+  // finalization.
+  ReleaseAdmissionControlResources();
 
   query_profile_->AddInfoString(
       "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n"));
   // For DML queries, when Wait is done, the query is complete.
   ComputeQuerySummary();
-
   return status;
 }
 
@@ -856,10 +861,11 @@ Status Coordinator::GetNext(QueryResultSet* results, int 
max_rows, bool* eos) {
     returned_all_results_ = true;
     // release query execution resources here, since we won't be fetching more 
result rows
     ReleaseExecResources();
-
     // wait for all backends to complete before computing the summary
     // TODO: relocate this so GetNext() won't have to wait for backends to 
complete?
     RETURN_IF_ERROR(WaitForBackendCompletion());
+    // Release admission control resources after backends are finished.
+    ReleaseAdmissionControlResources();
     // if the query completed successfully, compute the summary
     if (query_status_.ok()) ComputeQuerySummary();
   }
@@ -898,7 +904,7 @@ void Coordinator::CancelInternal() {
   backend_completion_cv_.notify_all();
 
   ReleaseExecResourcesLocked();
-
+  ReleaseAdmissionControlResourcesLocked();
   // Report the summary with whatever progress the query made before being 
cancelled.
   ComputeQuerySummary();
 }
@@ -1070,6 +1076,21 @@ void Coordinator::ReleaseExecResourcesLocked() {
   // caching. The query MemTracker will be cleaned up later.
 }
 
+void Coordinator::ReleaseAdmissionControlResources() {
+  lock_guard<mutex> l(lock_);
+  ReleaseAdmissionControlResourcesLocked();
+}
+
+void Coordinator::ReleaseAdmissionControlResourcesLocked() {
+  if (released_admission_control_resources_) return;
+  LOG(INFO) << "Release admssion control resources for query "
+            << PrintId(query_ctx_.query_id);
+  AdmissionController* admission_controller =
+      ExecEnv::GetInstance()->admission_controller();
+  if (admission_controller != nullptr) 
admission_controller->ReleaseQuery(schedule_);
+  released_admission_control_resources_ = true;
+}
+
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 611e4ae..3549ae9 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -189,7 +189,8 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   class FilterState;
   class FragmentStats;
 
-  const QuerySchedule schedule_;
+  /// owned by the ClientRequestState that owns this coordinator
+  const QuerySchedule& schedule_;
 
   /// copied from TQueryExecRequest; constant across all fragments
   TQueryCtx query_ctx_;
@@ -346,6 +347,9 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// True if and only if ReleaseExecResources() has been called.
   bool released_exec_resources_ = false;
 
+  /// True if and only if ReleaseAdmissionControlResources() has been called.
+  bool released_admission_control_resources_ = false;
+
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
@@ -437,6 +441,38 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
 
   /// Same as ReleaseExecResources() except the lock must be held by the 
caller.
   void ReleaseExecResourcesLocked();
+
+  /// Releases admission control resources for use by other queries.
+  /// This should only be called if one of following preconditions is 
satisfied for each
+  /// backend on which the query is executing:
+  /// * The backend finished execution.
+  ///   Rationale: the backend isn't consuming resources.
+  //
+  /// * A cancellation RPC was delivered to the backend.
+  ///   Rationale: the backend will be cancelled and release resources soon. 
By the
+  ///   time a newly admitted query fragment starts up on the backend and 
starts consuming
+  ///   resources, the resources from this query will probably have been 
released.
+  //
+  /// * Sending the cancellation RPC to the backend failed
+  ///   Rationale: the backend is either down or will tear itself down when it 
next tries
+  ///   to send a status RPC to the coordinator. It's possible that the 
fragment will be
+  ///   slow to tear down and we could overadmit and cause query failures. 
However, given
+  ///   the communication errors, we need to proceed based on incomplete 
information about
+  ///   the state of the cluster. We choose to optimistically assume that the 
backend will
+  ///   tear itself down in a timely manner and admit more queries instead of
+  ///   pessimistically queueing queries while we wait for a response from a 
backend that
+  ///   may never come.
+  ///
+  /// Calling WaitForBackendCompletion() or CancelInternal() before this 
function is
+  /// sufficient to satisfy the above preconditions. If the query has an 
expensive
+  /// finalization step post query execution (e.g. a DML statement), then this 
should
+  /// be called after that completes to avoid over-admitting queries.
+  ///
+  /// Acquires lock_. Idempotent.
+  void ReleaseAdmissionControlResources();
+
+  /// Same as ReleaseAdmissionControlResources() except lock must be held by 
caller.
+  void ReleaseAdmissionControlResourcesLocked();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index a8cb37e..4162e8a 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -141,11 +141,13 @@ int64_t MemTracker::GetPoolMemReserved() {
   lock_guard<SpinLock> l(child_trackers_lock_);
   for (MemTracker* child : child_trackers_) {
     int64_t child_limit = child->limit();
-    if (child_limit > 0) {
+    bool query_exec_finished = child->query_exec_finished_.Load() != 0;
+    if (child_limit > 0 && !query_exec_finished) {
       // Make sure we don't overflow if the query limits are set to ridiculous 
values.
       mem_reserved += std::min(child_limit, MemInfo::physical_mem());
     } else {
-      DCHECK_EQ(child_limit, -1) << child->LogUsage(UNLIMITED_DEPTH);
+      DCHECK(query_exec_finished || child_limit == -1)
+          << child->LogUsage(UNLIMITED_DEPTH);
       mem_reserved += child->consumption();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 1260351..539f973 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -294,8 +294,9 @@ class MemTracker {
 
   /// Returns the memory 'reserved' by this resource pool mem tracker, which 
is the sum
   /// of the memory reserved by the queries in it (i.e. its child trackers). 
The mem
-  /// reserved for a query is its limit_, if set (which should be the common 
case with
-  /// admission control). Otherwise the current consumption is used.
+  /// reserved for a query that is currently executing is its limit_, if set 
(which
+  /// should be the common case with admission control). Otherwise, if the 
query has
+  /// no limit or the query is finished executing, the current consumption is 
used.
   int64_t GetPoolMemReserved();
 
   /// Returns the memory consumed in bytes.
@@ -351,6 +352,11 @@ class MemTracker {
   Status MemLimitExceeded(RuntimeState* state, const std::string& details,
       int64_t failed_allocation = 0) WARN_UNUSED_RESULT;
 
+  void set_query_exec_finished() {
+    DCHECK(is_query_mem_tracker_);
+    query_exec_finished_.Store(1);
+  }
+
   static const std::string COUNTER_NAME;
 
  private:
@@ -386,6 +392,12 @@ class MemTracker {
   /// True if this is a Query MemTracker returned from CreateQueryMemTracker().
   bool is_query_mem_tracker_ = false;
 
+  /// Only used if 'is_query_mem_tracker_' is true.
+  /// 0 if the query is still executing or 1 if it has finished executing. 
Before
+  /// it has finished executing, the tracker limit is treated as "reserved 
memory"
+  /// for the purpose of admission control - see GetPoolMemReserved().
+  AtomicInt32 query_exec_finished_{0};
+
   /// Only valid for MemTrackers returned from CreateQueryMemTracker()
   TUniqueId query_id_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 6796c82..fd72f6a 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -82,6 +82,9 @@ void QueryState::ReleaseExecResources() {
   if (initial_reservations_ != nullptr) 
initial_reservations_->ReleaseResources();
   if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
   if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources();
+  // Mark the query as finished on the query MemTracker so that admission 
control will
+  // not consider the whole query memory limit to be "reserved".
+  query_mem_tracker_->set_query_exec_finished();
   // At this point query execution should not be consuming any resources but 
some tracked
   // memory may still be used by the ClientRequestState for result caching. 
The query
   // MemTracker will be closed later when this QueryState is torn down.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index c29020c..8994a5b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -608,20 +608,19 @@ Status AdmissionController::AdmitQuery(QuerySchedule* 
schedule) {
   }
 }
 
-Status AdmissionController::ReleaseQuery(QuerySchedule* schedule) {
-  if (!schedule->is_admitted()) return Status::OK(); // No-op if query was not 
admitted
-  const string& pool_name = schedule->request_pool();
+void AdmissionController::ReleaseQuery(const QuerySchedule& schedule) {
+  if (!schedule.is_admitted()) return; // No-op if query was not admitted
+  const string& pool_name = schedule.request_pool();
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
     PoolStats* stats = GetPoolStats(pool_name);
-    stats->Release(*schedule);
-    UpdateHostMemAdmitted(*schedule, -schedule->GetPerHostMemoryEstimate());
+    stats->Release(schedule);
+    UpdateHostMemAdmitted(schedule, -schedule.GetPerHostMemoryEstimate());
     pools_for_updates_.insert(pool_name);
-    VLOG_RPC << "Released query id=" << schedule->query_id() << " "
+    VLOG_RPC << "Released query id=" << schedule.query_id() << " "
              << stats->DebugString();
   }
   dequeue_cv_.notify_one();
-  return Status::OK();
 }
 
 // Statestore subscriber callback for IMPALA_REQUEST_QUEUE_TOPIC.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index 3f18f6d..0cb9f2a 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -95,12 +95,13 @@ class ExecEnv;
 ///  a) Mem Reserved: the amount of memory that has been reported as reserved 
by all
 ///     backends, which come from the statestore topic updates. The values 
that are sent
 ///     come from the pool mem trackers in UpdateMemTrackerStats(), which 
reflects the
-///     memory reserved by fragments that have begun execution. For queries 
that have mem
-///     limits, the limit is considered to be its reserved memory, otherwise 
the current
-///     consumption is used (see MemTracker::GetPoolMemReserved()). The 
per-pool and
-///     per-host aggregates are computed in UpdateClusterAggregates(). This 
state, once
-///     all updates are fully distributed and aggregated, provides enough 
information to
-///     make admission decisions by any impalad. However, this requires 
waiting for both
+///     memory reserved by fragments that have begun execution. For queries 
that are
+///     executing and have mem limits, the limit is considered to be its 
reserved memory
+///     because it may consume up to that limit. Otherwise the query's current 
consumption
+///     is used (see MemTracker::GetPoolMemReserved()). The per-pool and 
per-host
+///     aggregates are computed in UpdateClusterAggregates(). This state, once 
all updates
+///     are fully distributed and aggregated, provides enough information to 
make
+///     admission decisions by any impalad. However, this requires waiting for 
both
 ///     admitted requests to start all remote fragments and then for the 
updated state to
 ///     be distributed via the statestore.
 ///  b) Mem Admitted: the amount of memory required (i.e. the value used in 
admission,
@@ -195,7 +196,7 @@ class AdmissionController {
   /// been submitted via AdmitQuery(). (If the request was not admitted, this 
is
   /// a no-op.)
   /// This does not block.
-  Status ReleaseQuery(QuerySchedule* schedule);
+  void ReleaseQuery(const QuerySchedule& schedule);
 
   /// Registers the request queue topic with the statestore.
   Status Init();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 918fc03..523e4ae 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -572,17 +572,6 @@ void ClientRequestState::Done() {
   // Update result set cache metrics, and update mem limit accounting before 
tearing
   // down the coordinator.
   ClearResultCache();
-
-  if (coord_.get() != NULL) {
-    // Release any reserved resources.
-    if (exec_env_->admission_controller() != nullptr) {
-      Status status = 
exec_env_->admission_controller()->ReleaseQuery(schedule_.get());
-      if (!status.ok()) {
-        LOG(WARNING) << "Failed to release resources of query " << 
schedule_->query_id()
-                     << " because of error: " << status.GetDetail();
-      }
-    }
-  }
 }
 
 Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 712ce57..02fd6db 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -40,10 +40,11 @@ from TCLIService import TCLIService
 
 LOG = logging.getLogger('admission_test')
 
-# We set a WAIT debug action so it doesn't complete the execution of this 
query. The
-# limit is a parameter for debugging purposes; each thread will insert its id 
so
+# The query used for testing. It is important that this query be able to fetch 
many
+# rows. This allows a thread to stay active by fetching one row at a time. The
+# where clause is for debugging purposes; each thread will insert its id so
 # that running queries can be correlated with the thread that submitted them.
-QUERY = "select * from alltypes where id != %s"# limit %s"
+QUERY = "select * from alltypes where id != %s"
 
 # Time to sleep (in milliseconds) between issuing queries. The default 
statestore
 # heartbeat is 500ms, so the lower the delay the more we can submit before the 
global
@@ -86,6 +87,9 @@ _STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s "\
 # Key in the query profile for the query options.
 PROFILE_QUERY_OPTIONS_KEY = "Query Options (set by configuration): "
 
+# The different ways that a query thread can end its query.
+QUERY_END_BEHAVIORS = ['EOS', 'CLIENT_CANCEL', 'QUERY_TIMEOUT', 'CLIENT_CLOSE']
+
 def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
     proc_mem_limit = None):
   if proc_mem_limit is not None:
@@ -365,12 +369,13 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between 
submissions
   (parameterized) and the ability to submit to one impalad or many in a 
round-robin
-  fashion. The queries are set with the WAIT debug action so that we have more 
control
-  over the state that the admission controller uses to make decisions.  Each 
query is
-  submitted on a separate thread. Depending on the test parameters a varying 
number of
-  queries will be admitted, queued, and rejected. Once queries are admitted, 
the query
-  execution blocks and we can cancel the query in order to allow another 
queued query to
-  be admitted.
+  fashion. Each query is submitted on a separate thread. After admission, the 
query
+  thread will block with the query open and wait for the main thread to notify 
it to
+  end its query. The query thread can end its query by fetching to the end, 
cancelling
+  itself, closing itself, or waiting for the query timeout to take effect. 
Depending
+  on the test parameters a varying number of queries will be admitted, queued, 
and
+  rejected. After the queries are admitted, the main thread will request each 
admitted
+  query thread to end its query and allow queued queries to be admitted.
 
   The test tracks the state of the admission controller using the metrics from 
each
   impalad to do the following:
@@ -378,13 +383,15 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       queued, and rejected requests should sum to the number of queries and 
that the
       values are reasonable given the test parameters.
   (2) While there are running queries:
-      * Cancel the currently running queries (they are blocked with the WAIT 
debug action)
-        and verify the metric for the number of completed queries. The threads 
that
-        submitted those queries should complete.
+      * Request the currently running queries to end and wait for the queries 
to end.
+        Verify the metric for the number of completed queries. The threads that
+        submitted those queries will keep their connections open until the 
entire test
+        completes. This verifies that admission control is tied to the end of 
the query
+        and does not depend on closing the connection.
       * Check that queued requests are then dequeued and verify using the 
metric for the
         number of dequeued requests. The threads that were waiting to submit 
the query
         should then insert themselves into a list of currently running queries 
and then
-        fetch() the results (which will block).
+        wait for a notification from the main thread.
   (3) After all queries have completed, check that the final number of 
admitted,
       queued, and rejected requests are reasonable given the test parameters. 
When
       submitting to a single impalad, we know exactly what the values should 
be,
@@ -428,6 +435,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     self.executing_threads = list()
 
   def teardown(self):
+    # Set shutdown for all threads (cancel if needed)
     for thread in self.all_threads:
       try:
         thread.lock.acquire()
@@ -442,6 +450,9 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
             client.close()
       finally:
         thread.lock.release()
+
+    # Wait for all threads to exit
+    for thread in self.all_threads:
       thread.join(5)
       LOG.debug("Join thread for query num %s %s", thread.query_num,
           "TIMED OUT" if thread.isAlive() else "")
@@ -537,36 +548,39 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     LOG.debug("Found all %s admitted threads after %s seconds", num_threads,
         round(time() - start_time, 1))
 
-  def cancel_admitted_queries(self, num_queries):
+  def end_admitted_queries(self, num_queries):
     """
-    Cancels queries on threads that are currently blocked on query execution.
+    Requests each admitted query to end its query.
     """
     assert len(self.executing_threads) >= num_queries
-    LOG.debug("Cancelling %s queries", num_queries)
+    LOG.debug("Requesting {0} clients to end queries".format(num_queries))
+
+    # Request admitted clients to end their queries
+    current_executing_queries = []
     for i in xrange(num_queries):
       # pop() is thread-safe, it's OK if another thread is appending 
concurrently.
       thread = self.executing_threads.pop(0)
       LOG.debug("Cancelling query %s", thread.query_num)
-      # The other thread sets the query_state before appending itself to the 
list,
-      # and will not change its state until it is cancelled by this thread.
       assert thread.query_state == 'ADMITTED'
-      client = thread.impalad.service.create_beeswax_client()
-      try:
-        cancel_result = client.cancel(thread.query_handle)
-        assert cancel_result.status_code == 0,\
-            'Unexpected status code from cancel request: %s' % cancel_result
-        # Wait for the query to be cancelled and return
-        thread.join(20)
-        LOG.debug("Cancelled admitted query %s %s",
-            thread.query_num, "TIMED OUT" if thread.isAlive() else "")
-        assert not thread.isAlive()
-        assert thread.query_state == 'COMPLETED'
-      finally:
-        client.close()
+      current_executing_queries.append(thread)
+      thread.query_state = 'REQUEST_QUERY_END'
+
+    # Wait for the queries to end
+    start_time = time()
+    while True:
+      all_done = True
+      for thread in self.all_threads:
+        if thread.query_state == 'REQUEST_QUERY_END':
+          all_done = False
+      if all_done:
+        break
+      assert (time() - start_time < STRESS_TIMEOUT),\
+        "Timed out waiting %s seconds for query end" % (STRESS_TIMEOUT,)
+      sleep(1)
 
   class SubmitQueryThread(threading.Thread):
     def __init__(self, impalad, additional_query_options, vector, query_num,
-        executing_threads):
+        query_end_behavior, executing_threads):
       """
       executing_threads must be provided so that this thread can add itself 
when the
       query is admitted and begins execution.
@@ -576,6 +590,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       self.vector = vector
       self.additional_query_options = additional_query_options
       self.query_num = query_num
+      self.query_end_behavior = query_end_behavior
       self.impalad = impalad
       self.error = None
       # query_state is defined and used only by the test code, not a property 
exposed by
@@ -599,7 +614,6 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
             return
 
           exec_options = self.vector.get_value('exec_option')
-          exec_options['debug_action'] = '0:GETNEXT:WAIT'
           exec_options.update(self.additional_query_options)
           query = QUERY % (self.query_num,)
           self.query_state = 'SUBMITTING'
@@ -607,6 +621,9 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           ImpalaTestSuite.change_database(client, 
self.vector.get_value('table_format'))
           client.set_configuration(exec_options)
 
+          if self.query_end_behavior == 'QUERY_TIMEOUT':
+            client.execute("SET QUERY_TIMEOUT_S=5")
+
           LOG.debug("Submitting query %s", self.query_num)
           self.query_handle = client.execute_async(query)
         except ImpalaBeeswaxException as e:
@@ -627,22 +644,22 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
         # The thread becomes visible to the main thread when it is added to the
         # shared list of executing_threads. append() is atomic and thread-safe.
         self.executing_threads.append(self)
-        try:
-          # fetch() will block until we cancel the query from the main thread
-          # (unless an unexpected error occurs). If an error occurs on the 
main therad,
-          # it is possible that teardown() cancels this query before we call 
fetch(). In
-          # that case a different exception is thrown and we handle it 
gracefully.
-          client.fetch(query, self.query_handle)
-        except ImpalaBeeswaxException as e:
-          if "Cancelled" in str(e):
-            LOG.debug("Query %s completed", self.query_num)
+
+        # Synchronize with the main thread. At this point, the thread is 
executing a
+        # query. It needs to wait until the main thread requests it to end its 
query.
+        while not self.shutdown:
+          # The QUERY_TIMEOUT needs to stay active until the main thread 
requests it
+          # to end. Otherwise, the query may get cancelled early. Fetch a row 
every
+          # second to avoid going idle.
+          if self.query_end_behavior == 'QUERY_TIMEOUT' and \
+             self.query_state != 'COMPLETED':
+            client.fetch(query, self.query_handle, 1)
+          if self.query_state == 'REQUEST_QUERY_END':
+            self._end_query(client, query)
+            # The query has released admission control resources
             self.query_state = 'COMPLETED'
             self.query_handle = None
-          elif "Invalid or unknown query handle" in str(e):
-            # May happen if the test is being torn down early (i.e. an error 
occurred).
-            LOG.debug("Query %s already cancelled in test shutdown.")
-          else:
-            raise e
+          sleep(1)
       except Exception as e:
         LOG.exception(e)
         # Unknown errors will be raised later
@@ -653,6 +670,27 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
         if client is not None:
           client.close()
 
+    def _end_query(self, client, query):
+      """Bring the query to the appropriate end state defined by 
self.query_end_behaviour.
+      Returns once the query has reached that state."""
+      if self.query_end_behavior == 'QUERY_TIMEOUT':
+        # Sleep and wait for the query to be cancelled. The cancellation will
+        # set the state to EXCEPTION.
+        start_time = time()
+        while (client.get_state(self.query_handle) != \
+               client.QUERY_STATES['EXCEPTION']):
+          assert (time() - start_time < STRESS_TIMEOUT),\
+            "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
+          sleep(1)
+      elif self.query_end_behavior == 'EOS':
+        # Fetch all rows so we hit eos.
+        client.fetch(query, self.query_handle)
+      elif self.query_end_behavior == 'CLIENT_CANCEL':
+        client.cancel(self.query_handle)
+      else:
+        assert self.query_end_behavior == 'CLIENT_CLOSE'
+        client.close_query(self.query_handle)
+
   def _check_queries_page_resource_pools(self):
     """Checks that all queries in the '/queries' webpage json have the correct 
resource
     pool (this is called after all queries have been admitted, queued, or 
rejected, so
@@ -693,14 +731,11 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     initial_metrics = self.get_admission_metrics();
     log_metrics("Initial metrics: ", initial_metrics);
 
-    # Want query_num to start at 1 because this gets used as the limit in the 
query to
-    # help debugging (we can associate a running query with a thread). If we 
start at 0,
-    # that query would be evaluated as a constant expression and never hit the 
WAIT debug
-    # action.
-    for query_num in xrange(1, num_queries + 1):
+    for query_num in xrange(num_queries):
       impalad = self.impalads[query_num % len(self.impalads)]
+      query_end_behavior = QUERY_END_BEHAVIORS[query_num % 
len(QUERY_END_BEHAVIORS)]
       thread = self.SubmitQueryThread(impalad, additional_query_options, 
vector,
-          query_num, self.executing_threads)
+          query_num, query_end_behavior, self.executing_threads)
       thread.start()
       self.all_threads.append(thread)
       sleep(submission_delay_ms / 1000.0)
@@ -735,10 +770,10 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     while len(self.executing_threads) > 0:
       curr_metrics = self.get_admission_metrics();
       log_metrics("Main loop, curr_metrics: ", curr_metrics);
-      num_to_cancel = len(self.executing_threads)
-      LOG.debug("Main loop, will cancel %s queries", num_to_cancel)
-      self.cancel_admitted_queries(num_to_cancel)
-      self.wait_for_metric_changes(['released'], curr_metrics, num_to_cancel)
+      num_to_end = len(self.executing_threads)
+      LOG.debug("Main loop, will request %s queries to end", num_to_end)
+      self.end_admitted_queries(num_to_end)
+      self.wait_for_metric_changes(['released'], curr_metrics, num_to_end)
 
       num_queued_remaining =\
           curr_metrics['queued'] - curr_metrics['dequeued'] - 
curr_metrics['timed-out']

Reply via email to