This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new f05eac647 IMPALA-12602: Unregister queries on idle timeout
f05eac647 is described below
commit f05eac647647b5e03c3aafc35f785c73d07e2658
Author: Michael Smith <[email protected]>
AuthorDate: Fri Feb 23 15:37:23 2024 -0800
IMPALA-12602: Unregister queries on idle timeout
Queries cancelled due to idle_query_timeout/QUERY_TIMEOUT_S are now also
Unregistered to free any remaining memory, as you cannot fetch results
from a cancelled query.
Adds a new structure - idle_query_statuses_ - to retain Status messages
for queries closed this way so that we can continue to return a clear
error message if the client returns and requests query status or
attempts to fetch results. This structure must be global because HS2
server can only identify a session ID from a query handle, and the query
handle no longer exists. SessionState tracks queries added to
idle_query_statuses_ so they can be cleared when the session is closed.
Also ensures MarkInactive is called in ClientRequestState when Wait()
completes. Previously WaitInternal would only MarkInactive on success,
leaving any failed requests in an active state until explicitly closed
or the session ended.
The beeswax get_log RPC will not return the preserved error message or
any warnings for these queries. It's also possible the summary and
profile are rotated out of query log as the query is no longer inflight.
This is an acceptable outcome as a client will likely not look for a
log/summary/profile after it times out.
Testing:
- updates test_query_expiration to verify number of waiting queries is
only non-zero for queries cancelled by EXEC_TIME_LIMIT_S and not yet
closed as an idle query
- modified test_retry_query_timeout to use exec_time_limit_s because
queries closed by idle_timeout_s don't work with get_exec_summary
Change-Id: Iacfc285ed3587892c7ec6f7df3b5f71c9e41baf0
Reviewed-on: http://gerrit.cloudera.org:8080/21074
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/service/client-request-state.cc | 9 ++-
be/src/service/impala-beeswax-server.cc | 9 ++-
be/src/service/impala-server.cc | 88 +++++++++++++++++++--------
be/src/service/impala-server.h | 24 ++++++--
docs/topics/impala_timeouts.xml | 6 +-
tests/custom_cluster/test_query_expiration.py | 52 ++++++++++++----
tests/custom_cluster/test_query_retries.py | 4 +-
7 files changed, 140 insertions(+), 52 deletions(-)
diff --git a/be/src/service/client-request-state.cc
b/be/src/service/client-request-state.cc
index 44aa3b824..99c70e088 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1155,6 +1155,10 @@ bool ClientRequestState::BlockOnWait(int64_t timeout_us,
int64_t* block_on_wait_
void ClientRequestState::Wait() {
// block until results are ready
Status status = WaitInternal();
+ // Rows are available now (for SELECT statement), so start the 'wait' timer
that tracks
+ // how long Impala waits for the client to fetch rows. For other statements,
track the
+ // time until a Close() is received.
+ MarkInactive();
{
lock_guard<mutex> l(lock_);
if (returns_result_set()) {
@@ -1191,7 +1195,6 @@ void ClientRequestState::Wait() {
Status ClientRequestState::WaitInternal() {
// Explain requests have already populated the result set. Nothing to do
here.
if (exec_request().stmt_type == TStmtType::EXPLAIN) {
- MarkInactive();
return Status::OK();
}
@@ -1240,10 +1243,6 @@ Status ClientRequestState::WaitInternal() {
} else if (isCTAS) {
SetCreateTableAsSelectResultSet();
}
- // Rows are available now (for SELECT statement), so start the 'wait' timer
that tracks
- // how long Impala waits for the client to fetch rows. For other statements,
track the
- // time until a Close() is received.
- MarkInactive();
return Status::OK();
}
diff --git a/be/src/service/impala-beeswax-server.cc
b/be/src/service/impala-beeswax-server.cc
index 2b10e8c17..5bf658188 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -292,7 +292,14 @@ beeswax::QueryState::type ImpalaServer::get_state(
VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
QueryHandle query_handle;
- RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle),
SQLSTATE_GENERAL_ERROR);
+ Status status = GetActiveQueryHandle(query_id, &query_handle);
+ // GetActiveQueryHandle may return the query's status from being cancelled.
If not an
+ // invalid query handle, we can assume that error statuses reflect a query
in the
+ // EXCEPTION state.
+ if (!status.ok() && status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+ return beeswax::QueryState::EXCEPTION;
+ }
+ RAISE_IF_ERROR(status, SQLSTATE_GENERAL_ERROR);
// Validate that query can be accessed by user.
RAISE_IF_ERROR(CheckClientRequestSession(session.get(),
query_handle->effective_user(),
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 963960661..029ae61dc 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1428,6 +1428,16 @@ Status ImpalaServer::RegisterQuery(const TUniqueId&
query_id,
return Status::OK();
}
+static inline int32_t GetIdleTimeout(const TQueryOptions& query_options) {
+ int32_t idle_timeout_s = query_options.query_timeout_s;
+ if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
+ return min(FLAGS_idle_query_timeout, idle_timeout_s);
+ } else {
+ // Use a non-zero timeout, if one exists
+ return max(FLAGS_idle_query_timeout, idle_timeout_s);
+ }
+}
+
Status ImpalaServer::SetQueryInflight(
shared_ptr<SessionState> session_state, const QueryHandle& query_handle) {
DebugActionNoFail(query_handle->query_options(), "SET_QUERY_INFLIGHT");
@@ -1459,13 +1469,7 @@ Status ImpalaServer::SetQueryInflight(
}
// If the query has a timeout or time limit, schedule checks.
- int32_t idle_timeout_s = query_handle->query_options().query_timeout_s;
- if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
- idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
- } else {
- // Use a non-zero timeout, if one exists
- idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
- }
+ int32_t idle_timeout_s = GetIdleTimeout(query_handle->query_options());
int32_t exec_time_limit_s = query_handle->query_options().exec_time_limit_s;
int64_t cpu_limit_s = query_handle->query_options().cpu_limit_s;
int64_t scan_bytes_limit = query_handle->query_options().scan_bytes_limit;
@@ -1701,6 +1705,14 @@ Status ImpalaServer::GetActiveQueryHandle(
DCHECK(query_handle != nullptr);
shared_ptr<QueryDriver> query_driver = GetQueryDriver(query_id);
if (UNLIKELY(query_driver == nullptr)) {
+ {
+ lock_guard<mutex> l(idle_query_statuses_lock_);
+ auto it = idle_query_statuses_.find(query_id);
+ if (it != idle_query_statuses_.end()) {
+ return it->second;
+ }
+ }
+
Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE,
PrintId(query_id));
VLOG(1) << err.GetDetail();
return err;
@@ -1797,6 +1809,7 @@ Status ImpalaServer::CloseSessionInternal(const
TUniqueId& session_id,
DecrementSessionCount(session_state->connected_user);
}
unordered_set<TUniqueId> inflight_queries;
+ vector<TUniqueId> idled_queries;
{
lock_guard<mutex> l(session_state->lock);
DCHECK(!session_state->closed);
@@ -1804,6 +1817,7 @@ Status ImpalaServer::CloseSessionInternal(const
TUniqueId& session_id,
// Since closed is true, no more queries will be added to the inflight
list.
inflight_queries.insert(session_state->inflight_queries.begin(),
session_state->inflight_queries.end());
+ idled_queries.swap(session_state->idled_queries);
}
// Unregister all open queries from this session.
Status status = Status::Expected("Session closed");
@@ -1811,6 +1825,12 @@ Status ImpalaServer::CloseSessionInternal(const
TUniqueId& session_id,
// TODO: deal with an error status
UnregisterQueryDiscardResult(query_id, false, &status);
}
+ {
+ lock_guard<mutex> l(idle_query_statuses_lock_);
+ for (const TUniqueId& query_id: idled_queries) {
+ idle_query_statuses_.erase(query_id);
+ }
+ }
// Reconfigure the poll period of session_maintenance_thread_ if necessary.
UnregisterSessionTimeout(session_state->session_timeout);
VLOG_QUERY << "Closed session: " << PrintId(session_id)
@@ -2774,8 +2794,9 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t
session_timeout) {
continue;
}
ClientRequestState* crs = query_driver->GetActiveClientRequestState();
- if (crs->is_expired()) {
- // Query was expired already from a previous expiration event.
+ if (crs->is_expired() && expiration_event->kind !=
ExpirationKind::IDLE_TIMEOUT) {
+ // Query was expired already from a previous expiration event. Keep
idle
+ // timeouts as they will additionally unregister the query.
expiration_event = queries_by_timestamp_.erase(expiration_event);
continue;
}
@@ -2812,13 +2833,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t
session_timeout) {
// Now check to see if the idle timeout has expired. We must check the
actual
// expiration time in case the query has updated 'last_active_ms'
since the last
// time we looked.
- int32_t idle_timeout_s = crs->query_options().query_timeout_s;
- if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
- idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
- } else {
- // Use a non-zero timeout, if one exists
- idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
- }
+ int32_t idle_timeout_s = GetIdleTimeout(crs->query_options());
int64_t expiration = crs->last_active_ms() + (idle_timeout_s * 1000L);
if (now < expiration) {
// If the real expiration date is in the future we may need to
re-insert the
@@ -2840,10 +2855,29 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t
session_timeout) {
VLOG_QUERY << "Expiring query due to client inactivity: "
<< PrintId(expiration_event->query_id) << ", last
activity was at: "
<< ToStringFromUnixMillis(crs->last_active_ms());
- ExpireQuery(crs,
- Status::Expected(TErrorCode::INACTIVE_QUERY_EXPIRED,
- PrintId(expiration_event->query_id),
- PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S)));
+ const Status status =
Status::Expected(TErrorCode::INACTIVE_QUERY_EXPIRED,
+ PrintId(expiration_event->query_id),
+ PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
+
+ // Save status so we can report it for unregistered queries.
+ Status preserved_status;
+ {
+ lock_guard<mutex> l(*crs->lock());
+ preserved_status = crs->query_status();
+ }
+ preserved_status.MergeStatus(status);
+ {
+ shared_ptr<SessionState> session = crs->session();
+ lock_guard<mutex> l(session->lock);
+ if (!session->closed) {
+ lock_guard<mutex> l(idle_query_statuses_lock_);
+ idle_query_statuses_.emplace(
+ expiration_event->query_id, move(preserved_status));
+ session->idled_queries.emplace_back(expiration_event->query_id);
+ }
+ }
+
+ ExpireQuery(crs, status, true);
expiration_event = queries_by_timestamp_.erase(expiration_event);
} else {
// Iterator is moved on in every other branch.
@@ -2972,12 +3006,18 @@ Status
ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
return Status::OK();
}
-void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {
+void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status,
+ bool unregister) {
DCHECK(!status.ok());
cancellation_thread_pool_->Offer(
- CancellationWork::TerminatedByServer(crs->query_id(), status, false));
- ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
- crs->set_expired();
+ CancellationWork::TerminatedByServer(crs->query_id(), status,
unregister));
+ if (crs->is_expired()) {
+ // Should only be re-entrant if we're now unregistering the query.
+ DCHECK(unregister);
+ } else {
+ ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
+ crs->set_expired();
+ }
}
Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 8024a6133..e3562959d 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -185,10 +185,11 @@ class TQueryExecRequest;
/// 1. session_state_map_lock_
/// 2. SessionState::lock
/// 3. query_expiration_lock_
-/// 4. ClientRequestState::fetch_rows_lock
-/// 5. ClientRequestState::lock
-/// 6. ClientRequestState::expiration_data_lock_
-/// 7. Coordinator::exec_summary_lock
+/// 4. idle_query_statuses_lock_
+/// 5. ClientRequestState::fetch_rows_lock
+/// 6. ClientRequestState::lock
+/// 7. ClientRequestState::expiration_data_lock_
+/// 8. Coordinator::exec_summary_lock
///
/// The following locks are not held in conjunction with other locks:
/// * query_log_lock_
@@ -657,6 +658,9 @@ class ImpalaServer : public ImpalaServiceIf,
/// inflight_queries. In that case we add it to prestopped_queries instead.
std::set<TUniqueId> prestopped_queries;
+ /// Unregistered queries we need to clear from idle_query_statuses_ on
closure.
+ std::vector<TUniqueId> idled_queries;
+
/// Total number of queries run as part of this session.
int64_t total_queries;
@@ -1153,8 +1157,9 @@ class ImpalaServer : public ImpalaServiceIf,
/// check should be rescheduled for a later time.
Status CheckResourceLimits(ClientRequestState* crs);
- /// Expire 'crs' and cancel it with status 'status'.
- void ExpireQuery(ClientRequestState* crs, const Status& status);
+ /// Expire 'crs' and cancel it with status 'status'. Optionally unregisters
the query.
+ void ExpireQuery(ClientRequestState* crs, const Status& status,
+ bool unregister = false);
typedef boost::unordered_map<std::string, boost::unordered_set<std::string>>
AuthorizedProxyMap;
@@ -1423,6 +1428,13 @@ class ImpalaServer : public ImpalaServiceIf,
typedef boost::unordered_map<TUniqueId, std::shared_ptr<SessionState>>
SessionStateMap;
SessionStateMap session_state_map_;
+ /// Protects idle_query_statuses_;
+ std::mutex idle_query_statuses_lock_;
+
+ /// A map of queries that were stopped due to idle timeout and the status
they had when
+ /// unregistered. Used to return a more useful error when looking up
unregistered IDs.
+ std::map<TUniqueId, Status> idle_query_statuses_;
+
/// Protects connection_to_sessions_map_. See "Locking" in the class comment
for lock
/// acquisition order.
std::mutex connection_to_sessions_map_lock_;
diff --git a/docs/topics/impala_timeouts.xml b/docs/topics/impala_timeouts.xml
index 4ee95e64d..713827138 100644
--- a/docs/topics/impala_timeouts.xml
+++ b/docs/topics/impala_timeouts.xml
@@ -126,8 +126,10 @@ Trying to re-register with state-store</codeblock>
<codeph>--idle_query_timeout</codeph> disables query timeouts.
</p>
<p>
- Cancelled queries remain in the open state but use only the
- minimal resources.
+ Cancelled queries are closed, but the client can still fetch their
exception
+ details. If the query was in a good state when cancelled, it will
present an
+ error like "Query 6f49e509bfa5b347:207d8ef900000000 expired due to
client
+ inactivity", otherwise it will show the relevant error.
</p>
</li>
diff --git a/tests/custom_cluster/test_query_expiration.py
b/tests/custom_cluster/test_query_expiration.py
index 7e08dc81c..a1d8dd9f4 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -29,18 +29,21 @@ from tests.common.custom_cluster_test_suite import
CustomClusterTestSuite
class TestQueryExpiration(CustomClusterTestSuite):
"""Tests query expiration logic"""
- def _check_num_executing(self, impalad, expected):
+ def _check_num_executing(self, impalad, expected, expect_waiting=0):
in_flight_queries = impalad.service.get_in_flight_queries()
# Guard against too few in-flight queries.
assert expected <= len(in_flight_queries)
- actual = 0
+ actual = waiting = 0
for query in in_flight_queries:
if query["executing"]:
actual += 1
else:
assert query["waiting"]
- assert actual == expected, '%s out of %s queries with expected (%s)
status' \
+ waiting += 1
+ assert actual == expected, '%s out of %s queries executing (expected %s)' \
% (actual, len(in_flight_queries), expected)
+ assert waiting == expect_waiting, '%s out of %s queries waiting (expected
%s)' \
+ % (waiting, len(in_flight_queries), expect_waiting)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--idle_query_timeout=8 --logbuflevel=-1")
@@ -58,7 +61,7 @@ class TestQueryExpiration(CustomClusterTestSuite):
# This query will hit a lower time limit.
client.execute("SET EXEC_TIME_LIMIT_S=3")
- time_limit_expire_handle = client.execute_async(query1);
+ time_limit_expire_handle = client.execute_async(query1)
handles.append(time_limit_expire_handle)
# This query will hit a lower idle timeout instead of the default timeout
or time
@@ -75,30 +78,46 @@ class TestQueryExpiration(CustomClusterTestSuite):
handles.append(default_timeout_expire_handle2)
self._check_num_executing(impalad, len(handles))
+ # Run a query that fails, and will timeout due to client inactivity.
+ client.execute("SET QUERY_TIMEOUT_S=1")
+ client.execute('SET MEM_LIMIT=1')
+ exception_handle = client.execute_async("select count(*) from
functional.alltypes")
+ client.execute('SET MEM_LIMIT=1g')
+ handles.append(exception_handle)
+
before = time()
sleep(4)
- # Queries with timeout or time limit of 1 should have expired, other
queries should
+ # Queries with timeout or time limit < 4 should have expired, other
queries should
# still be running.
- assert num_expired + 2 == impalad.service.get_metric_value(
+ assert num_expired + 3 == impalad.service.get_metric_value(
'impala-server.num-queries-expired')
assert (client.get_state(short_timeout_expire_handle) ==
client.QUERY_STATES['EXCEPTION'])
assert (client.get_state(time_limit_expire_handle) ==
client.QUERY_STATES['EXCEPTION'])
+ assert (client.get_state(exception_handle) ==
client.QUERY_STATES['EXCEPTION'])
assert (client.get_state(default_timeout_expire_handle) ==
client.QUERY_STATES['FINISHED'])
assert (client.get_state(default_timeout_expire_handle2) ==
client.QUERY_STATES['FINISHED'])
+ # The query cancelled by exec_time_limit_s should be waiting to be closed.
+ self._check_num_executing(impalad, 2, 1)
self.__expect_expired(client, query1, short_timeout_expire_handle,
- "Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout
is 3s000ms\)")
+ r"Query [0-9a-f]+:[0-9a-f]+ expired due to "
+ + r"client inactivity \(timeout is 3s000ms\)")
self.__expect_expired(client, query1, time_limit_expire_handle,
- "Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of
3s000ms")
+ r"Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of
3s000ms")
+ self.__expect_expired(client, query1, exception_handle,
+ r"minimum memory reservation is greater than memory available.*\nQuery
"
+ + r"[0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is
1s000ms\)")
self._check_num_executing(impalad, 2)
+ # Both queries with query_timeout_s < 4 should generate this message.
self.assert_impalad_log_contains('INFO', "Expiring query due to client
inactivity: "
- "[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d
\d\d:\d\d:\d\d")
+ r"[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d
\d\d:\d\d:\d\d",
+ 2)
self.assert_impalad_log_contains('INFO',
- "Expiring query [0-9a-f]+:[0-9a-f]+ due to execution time limit of 3s")
+ r"Expiring query [0-9a-f]+:[0-9a-f]+ due to execution time limit of
3s")
# Wait until the remaining queries expire. The time limit query will have
hit
# expirations but only one should be counted.
@@ -124,14 +143,23 @@ class TestQueryExpiration(CustomClusterTestSuite):
# Confirm that no extra expirations happened
assert
impalad.service.get_metric_value('impala-server.num-queries-expired') \
- == len(handles)
+ == num_expired + len(handles)
self._check_num_executing(impalad, 0)
for handle in handles:
try:
client.close_query(handle)
+ assert False, "Close should always throw an exception"
except Exception as e:
# We fetched from some cancelled handles above, which unregistered the
queries.
- assert 'Invalid or unknown query handle' in str(e)
+ # Expired queries always return their exception, so will not be
invalid/unknown.
+ if handle is time_limit_expire_handle:
+ assert 'Invalid or unknown query handle' in str(e)
+ else:
+ if handle is exception_handle:
+ # Error should return original failure and timeout message
+ assert 'minimum memory reservation is greater than memory
available' in str(e)
+ assert re.search(
+ r'Query [0-9a-f]{16}:[0-9a-f]{16} expired due to client
inactivity', str(e))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--idle_query_timeout=0")
diff --git a/tests/custom_cluster/test_query_retries.py
b/tests/custom_cluster/test_query_retries.py
index c6fe3e74d..8d0629f0c 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -886,7 +886,7 @@ class TestQueryRetries(CustomClusterTestSuite):
self.cluster.impalads[1].kill()
query = self._count_query
handle = self.execute_query_async(query,
- query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
+ query_options={'retry_failed_queries': 'true', 'exec_time_limit_s':
'1'})
self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
# Validate the live exec summary.
@@ -908,7 +908,7 @@ class TestQueryRetries(CustomClusterTestSuite):
self.client.fetch(query, handle)
assert False
except Exception as e:
- assert "expired due to client inactivity" in str(e)
+ assert "expired due to execution time limit of 1s000ms" in str(e)
# Assert that the impalad metrics show one expired query.
assert
impalad_service.get_metric_value('impala-server.num-queries-expired') == 1