This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f18cfaf0db4c5141d6fc05f7b3feb687ea7eaf8b Author: Riza Suminto <[email protected]> AuthorDate: Fri May 2 22:19:56 2025 -0700 IMPALA-14028: Refactor cancel_query_and_validate_state with HS2 cancel_query_and_validate_state is a helper method used to test query cancellation with concurrent fetch. It is still use beeswax client by default. This patch change the test method to use HS2 protocol by default. The changes include following: 1. Set TGetOperationStatusResp.operationState to TOperationState::ERROR_STATE if returning abnormally. 2. Use separate MinimalHS2Client for (execute_async, fetch, get_runtime_profile) vs cancel vs close. Cancellation through KILL QUERY still instantiate new ImpylaHS2Connection client. 3. Implement required missing methods in MinimalHS2Client. 4. Change MinimalHS2Client logging pattern to match with other clients. Testing: Pass test_cancellation.py and TestResultSpoolingCancellation in core exploration mode. Also fix default_test_protocol to HS2 for these tests. Change-Id: I626a1a06eb3d5dc9737c7d4289720e1f52d2a984 Reviewed-on: http://gerrit.cloudera.org:8080/22853 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Riza Suminto <[email protected]> --- be/src/service/impala-hs2-server.cc | 28 +++-- tests/common/impala_connection.py | 75 +++++++++---- tests/query_test/test_cancellation.py | 34 +++++- tests/query_test/test_result_spooling.py | 13 ++- tests/util/cancel_util.py | 175 ++++++++++++++++++++++--------- 5 files changed, 235 insertions(+), 90 deletions(-) diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 33366195a..ba35ca395 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -890,25 +890,33 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val, // Secret is inherited from session. TUniqueId query_id; TUniqueId op_secret; - HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( - request.operationHandle.operationId, &query_id, &op_secret), - SQLSTATE_GENERAL_ERROR); + Status status = THandleIdentifierToTUniqueId( + request.operationHandle.operationId, &query_id, &op_secret); + if (!status.ok()) { + return_val.__set_operationState(TOperationState::ERROR_STATE); + HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); + } VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id); - // Make query id available to the following HS2_RETURN_IF_ERROR(). + // Make query id available to the following HS2_RETURN_ERROR(). ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id); QueryHandle query_handle; - HS2_RETURN_IF_ERROR( - return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR); + status = GetActiveQueryHandle(query_id, &query_handle); + if (!status.ok()) { + return_val.__set_operationState(TOperationState::ERROR_STATE); + HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); + } ScopedSessionState session_handle(this); const TUniqueId session_id = query_handle->session_id(); shared_ptr<SessionState> session; - HS2_RETURN_IF_ERROR(return_val, - session_handle.WithSession( - session_id, SecretArg::Operation(op_secret, query_id), &session), - SQLSTATE_GENERAL_ERROR); + status = session_handle.WithSession( + session_id, SecretArg::Operation(op_secret, query_id), &session); + if (!status.ok()) { + return_val.__set_operationState(TOperationState::ERROR_STATE); + HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); + } // When using long polling, this waits up to long_polling_time_ms milliseconds for // query completion.polling diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index 09568bcad..3475df911 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -563,10 +563,6 @@ class BeeswaxConnection(ImpalaConnection): query_id = operation_handle.get_handle().id return query_id if query_id else str(operation_handle) - def log_handle(self, operation_handle, message): - handle_id = self.handle_id(operation_handle) - LOG.info(u"{0}: {1}".format(handle_id, message)) - def get_query_id(self, operation_handle): return operation_handle.get_handle().id @@ -579,7 +575,7 @@ class ImpylaHS2Connection(ImpalaConnection): """ # ClientRequestState::TOperationState() - __OPERATION_STATE_TO_EXEC_STATE = { + OPERATION_STATE_TO_EXEC_STATE = { 'INITIALIZED_STATE': INITIALIZED, 'PENDING_STATE': PENDING, 'RUNNING_STATE': RUNNING, @@ -825,7 +821,7 @@ class ImpylaHS2Connection(ImpalaConnection): def get_impala_exec_state(self, operation_handle): try: - return self.__OPERATION_STATE_TO_EXEC_STATE[self.get_state(operation_handle)] + return self.OPERATION_STATE_TO_EXEC_STATE[self.get_state(operation_handle)] except impyla_error.Error: return ERROR except Exception as e: @@ -1051,26 +1047,39 @@ class MinimalHS2Connection(ImpalaConnection): self.__conn = hs2.connect(host, port, auth_mechanism='NOSASL') self.__user = user if user is not None else getpass.getuser() self.__session = self.__conn.open_session(self.__user) + self.__query_options = dict() def connect(self): pass # Do nothing def close(self): - LOG.info("-- closing connection to: %s" % self.__host_port) + self.log_client("closing connection to: %s" % self.__host_port) try: self.__session.close() finally: self.__conn.close() + def __log_execute(self, sql_stmt): + session_id = session_handle_to_session_id(self.__session.handle) + self.log_client( + u"executing at {0}. session: {1} user: {2}\n{3}".format( + self.__host_port, session_id, self.__user, format_sql_for_logging(sql_stmt)) + ) + + def log_client(self, message): + """Log 'message' at INFO level, prefixed wih the protocol name of this connection.""" + LOG.info(u"minimal_{0}: {1}".format(self.get_test_protocol(), message)) + def execute(self, sql_stmt, user=None, fetch_profile_after_close=False, # noqa: U100 fetch_exec_summary=False, # noqa: U100 profile_format=TRuntimeProfileFormat.STRING): # noqa: U100 raise NotImplementedError() def execute_async(self, sql_stmt): - hs2_operation = self.__session.execute(sql_stmt) + self.__log_execute(sql_stmt) + hs2_operation = self.__session.execute(sql_stmt, configuration=self.__query_options) operation_handle = MinimalHS2OperationHandle(hs2_operation.handle, sql_stmt) - LOG.info("Started query {0}".format(operation_handle)) + self.log_handle(operation_handle, "query started") return operation_handle def __get_operation(self, operation_handle): @@ -1081,7 +1090,7 @@ class MinimalHS2Connection(ImpalaConnection): Fetch the results of the query. It will block the current connection if the results are not available yet. """ - LOG.info("-- fetching results from: {0}".format(operation_handle)) + self.log_handle(operation_handle, "fetching results") return self.__get_operation(operation_handle).fetch(max_rows=max_rows) def fetch_error(self, operation_handle): @@ -1111,11 +1120,11 @@ class MinimalHS2Connection(ImpalaConnection): time.sleep(0.1) def cancel(self, operation_handle): - LOG.info("-- canceling operation: {0}".format(operation_handle)) + self.log_handle(operation_handle, "canceling operation") return self.__get_operation(operation_handle).cancel() def close_query(self, operation_handle): - LOG.info("-- closing query for operation handle: {0}".format(operation_handle)) + self.log_handle(operation_handle, "closing query for operation") return self.__get_operation(operation_handle).close() def state_is_finished(self, operation_handle): # noqa: U100 @@ -1124,11 +1133,22 @@ class MinimalHS2Connection(ImpalaConnection): def get_log(self, operation_handle): return self.__get_operation(operation_handle).get_log() - def set_configuration_option(self, name, value): # noqa: U100 - raise NotImplementedError() + def set_configuration_option(self, name, value, is_log_sql=True): + # Only set the option if it's not already set to the same value. + # value must be parsed to string. + name = name.lower() + value = str(value) + if self.__query_options.get(name, "") != value: + self.__query_options[name] = value + if is_log_sql: + self.log_client("\n\nset {0}={1};\n".format(name, value)) + return True + return False def clear_configuration(self): - raise NotImplementedError() + self.__query_options.clear() + if hasattr(tests.common, "current_node"): + self.set_configuration_option("client_identifier", tests.common.current_node) def get_host_port(self): return self.__host_port @@ -1136,21 +1156,36 @@ class MinimalHS2Connection(ImpalaConnection): def get_test_protocol(self): return HS2 - def handle_id(self, operation_handle): # noqa: U100 + def handle_id(self, operation_handle): return str(operation_handle) def get_admission_result(self, operation_handle): # noqa: U100 raise NotImplementedError() - def get_impala_exec_state(self, operation_handle): # noqa: U100 - raise NotImplementedError() + def get_impala_exec_state(self, operation_handle): + try: + return ImpylaHS2Connection.OPERATION_STATE_TO_EXEC_STATE[ + self.get_state(operation_handle)] + except impyla_error.Error: + return ERROR + except Exception as e: + raise e def get_runtime_profile(self, operation_handle, profile_format=TRuntimeProfileFormat.STRING): return self.__get_operation(operation_handle).get_profile(profile_format) - def wait_for_admission_control(self, operation_handle, timeout_s=60): # noqa: U100 - raise NotImplementedError() + def wait_for_admission_control(self, operation_handle, timeout_s=60): + self.log_handle(operation_handle, 'waiting for completion of the admission control') + start_time = time.time() + while time.time() - start_time < timeout_s: + start_rpc_time = time.time() + if self.is_admitted(operation_handle): + return True + rpc_time = time.time() - start_rpc_time + if rpc_time < DEFAULT_SLEEP_INTERVAL: + time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time) + return False def get_exec_summary(self, operation_handle): # noqa: U100 raise NotImplementedError() diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py index efda79e8b..ac2a9f9d4 100644 --- a/tests/query_test/test_cancellation.py +++ b/tests/query_test/test_cancellation.py @@ -29,7 +29,7 @@ from impala_thrift_gen.RuntimeProfile.ttypes import TRuntimeProfileFormat from tests.common.impala_connection import MinimalHS2Connection from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, ImpalaTestSuite from tests.common.test_dimensions import add_mandatory_exec_option -from tests.common.test_vector import ImpalaTestDimension +from tests.common.test_vector import HS2, ImpalaTestDimension from tests.util.cancel_util import cancel_query_and_validate_state from tests.verifiers.metric_verifier import MetricVerifier @@ -88,6 +88,11 @@ USE_KILL_QUERY_STATEMENT = [False, True] class TestCancellation(ImpalaTestSuite): + + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def get_workload(self): return 'tpch' @@ -179,7 +184,8 @@ class TestCancellation(ImpalaTestSuite): # Execute the query multiple times, cancelling it each time. for i in range(vector.get_value('num_cancellation_iterations')): - cancel_query_and_validate_state(self.client, query, + cancel_query_and_validate_state( + query, vector.get_value('exec_option'), vector.get_value('table_format'), vector.get_value('cancel_delay'), vector.get_value('join_before_close'), use_kill_query_statement=vector.get_value('use_kill_query_statement')) @@ -246,7 +252,6 @@ class TestCancellation(ImpalaTestSuite): assert "Invalid or unknown query handle" in str( cancel_client.fetch_error(query_handle)) - def teardown_method(self, method): # For some reason it takes a little while for the query to get completely torn down # when the debug action is WAIT, causing TestValidateMetrics.test_metrics_are_zero to @@ -257,6 +262,11 @@ class TestCancellation(ImpalaTestSuite): class TestCancellationParallel(TestCancellation): + + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def add_test_dimensions(cls): super(TestCancellationParallel, cls).add_test_dimensions() @@ -267,6 +277,11 @@ class TestCancellationParallel(TestCancellation): class TestCancellationSerial(TestCancellation): + + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def add_test_dimensions(cls): super(TestCancellationSerial, cls).add_test_dimensions() @@ -299,6 +314,11 @@ class TestCancellationSerial(TestCancellation): class TestCancellationFullSort(TestCancellation): + + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def add_test_dimensions(cls): super(TestCancellationFullSort, cls).add_test_dimensions() @@ -322,6 +342,11 @@ class TestCancellationFullSort(TestCancellation): class TestCancellationFinalizeDelayed(ImpalaTestSuite): + + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def get_workload(self): return 'tpch' @@ -344,5 +369,6 @@ class TestCancellationFinalizeDelayed(ImpalaTestSuite): def test_cancellation(self, vector): query = "select l_returnflag from tpch_parquet.lineitem" cancel_delay = 0 - cancel_query_and_validate_state(self.client, query, + cancel_query_and_validate_state( + query, vector.get_value('exec_option'), vector.get_value('table_format'), cancel_delay) diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py index ba1c9ba77..52ffdc06e 100644 --- a/tests/query_test/test_result_spooling.py +++ b/tests/query_test/test_result_spooling.py @@ -26,7 +26,7 @@ from tests.common.errors import Timeout from tests.common.impala_connection import FINISHED from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import create_exec_option_dimension -from tests.common.test_vector import ImpalaTestDimension +from tests.common.test_vector import HS2, ImpalaTestDimension from tests.util.cancel_util import cancel_query_and_validate_state from tests.util.failpoints_util import execute_query_expect_debug_action_failure @@ -325,6 +325,10 @@ class TestResultSpoolingCancellation(ImpalaTestSuite): # Time to sleep between issuing query and canceling. _cancel_delay_in_seconds = [0, 0.01, 0.1, 1, 4] + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def get_workload(cls): return 'tpch' @@ -345,7 +349,8 @@ class TestResultSpoolingCancellation(ImpalaTestSuite): def test_cancellation(self, vector): vector.get_value('exec_option')['spool_query_results'] = 'true' - cancel_query_and_validate_state(self.client, vector.get_value('query'), + cancel_query_and_validate_state( + vector.get_value('query'), vector.get_value('exec_option'), vector.get_value('table_format'), vector.get_value('cancel_delay')) @@ -359,9 +364,7 @@ class TestResultSpoolingCancellation(ImpalaTestSuite): handle = self.execute_query_async(vector.get_value('query'), vector.get_value('exec_option')) sleep(vector.get_value('cancel_delay')) - cancel_result = self.client.cancel(handle) - assert cancel_result.status_code == 0,\ - "Unexpected status code from cancel request: {0}".format(cancel_result) + self.client.cancel(handle) finally: if handle: self.client.close_query(handle) diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py index d72d3fc8e..c98ffc013 100644 --- a/tests/util/cancel_util.py +++ b/tests/util/cancel_util.py @@ -18,8 +18,16 @@ from __future__ import absolute_import, division, print_function import threading from time import sleep -from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION, create_connection -from tests.common.impala_test_suite import ImpalaTestSuite +import traceback + +from impala_thrift_gen.TCLIService import TCLIService +from tests.common.impala_connection import ( + create_connection, + ERROR, + IMPALA_CONNECTION_EXCEPTION, + MinimalHS2Connection, +) +from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, ImpalaTestSuite from tests.common.test_result_verifier import error_msg_startswith @@ -88,8 +96,90 @@ def assert_kill_error(client, error_msg, query_id=None, sql=None, user=None): assert error_msg_startswith(str(exc), error_msg) -def cancel_query_and_validate_state(client, query, exec_option, table_format, - cancel_delay, join_before_close=False, use_kill_query_statement=False): +class FetchingThread(threading.Thread): + """Thread that does rows fetching that is subject to cancellation from main thread. + execute_async() must be called before starting the thread. + """ + + def __init__(self, client, query, exec_option, table_format, + use_kill_query_statement=False): + super(FetchingThread, self).__init__(name='FetchingThread') + self.client = client + self.query = query + self.exec_option = exec_option + self.table_format = table_format + self.db_name = ImpalaTestSuite.get_db_name_from_format(table_format) + self.use_kill_query_statement = use_kill_query_statement + self.fetch_results_error = None + self.query_profile = None + self.handle = None + self.is_error = False + + def execute_async(self): + # change database. + use_handle = self.client.execute_async('use ' + self.db_name) + self.client.wait_for(use_handle) + self.client.close_query(use_handle) + self.client.set_configuration(self.exec_option) + # execute the query. + self.handle = self.client.execute_async(self.query) + self.is_error = self.client.get_impala_exec_state(self.handle) == ERROR + + def run(self): + if self.is_error: + self.client.log_client( + 'Not starting fetch_results because query failed to start') + return + self.client.log_client('Start fetching results') + try: + result = True + while result: + result = self.client.fetch(self.query, self.handle) + except IMPALA_CONNECTION_EXCEPTION as e: + self.fetch_results_error = e + except Exception as e: + stack_trace_string = traceback.format_exc() + msg = "Exception in fetch_results: {}\n{}".format( + str(e), stack_trace_string) + self.fetch_results_error = Exception(msg) + finally: + self.client.log_client('Stop fetching results') + + def cancel_query(self): + assert self.handle is not None, \ + "handle is None. Did the query fail to start?" + if self.use_kill_query_statement: + # Run kill query using ImpylaHS2Connection. + with create_connection( + host_port=self.client.get_host_port(), + protocol=self.client.get_test_protocol(), + ) as kill_client: + kill_client.connect() + if self.exec_option: + kill_client.set_configuration(self.exec_option) + assert_kill_ok(kill_client, self.client.handle_id(self.handle)) + else: + # Run cancellation using separate client/connection. + with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as cancel_client: + cancel_resp = cancel_client.cancel(self.handle) + assert cancel_resp.status.statusCode == TCLIService.TStatusCode.SUCCESS_STATUS + + def close_query(self): + with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as close_client: + close_client.close_query(self.handle) + + def get_runtime_profile(self): + """Get query profile. + Might hit exception if QueryState has been released.""" + try: + return self.client.get_runtime_profile(self.handle) + except Exception: + return None + + +def cancel_query_and_validate_state( + query, exec_option, table_format, cancel_delay, join_before_close=False, + use_kill_query_statement=False): """Runs the given query asynchronously and then cancels it after the specified delay. The query is run with the given 'exec_options' against the specified 'table_format'. A separate async thread is launched to fetch the results of the query. The method @@ -102,48 +192,34 @@ def cancel_query_and_validate_state(client, query, exec_option, table_format, RPCs directly. """ assert table_format is not None - with ImpalaTestSuite.change_database(client, table_format): - __run_cancel_query_and_validate_state( - client, query, exec_option, cancel_delay, join_before_close, - use_kill_query_statement) - - -def __run_cancel_query_and_validate_state(client, query, exec_option, - cancel_delay, join_before_close=False, use_kill_query_statement=False): assert not (join_before_close and use_kill_query_statement) + with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as fetch_client: + thread = FetchingThread(fetch_client, query, exec_option, table_format, + use_kill_query_statement=use_kill_query_statement) + __run_cancel_query_and_validate_state( + thread, cancel_delay, join_before_close, use_kill_query_statement) - if exec_option: client.set_configuration(exec_option) - handle = client.execute_async(query) - thread = threading.Thread(target=__fetch_results, args=(query, handle)) +def __run_cancel_query_and_validate_state( + thread, cancel_delay, join_before_close=False, use_kill_query_statement=False): + thread.execute_async() thread.start() sleep(cancel_delay) - if client.is_error(handle): + if thread.is_error: # If some error occurred before trying to cancel the query then we put an error # message together and fail the test. thread.join() - error_msg = "The following query returned an error: %s\n" % query + error_msg = "The following query returned an error: {}\n".format(thread.query) if thread.fetch_results_error is not None: error_msg += str(thread.fetch_results_error) + "\n" - profile_lines = client.get_runtime_profile(handle).splitlines() + profile_lines = thread.get_runtime_profile().splitlines() + thread.close_query() for line in profile_lines: if "Query Status:" in line: error_msg += line assert False, error_msg - if use_kill_query_statement: - with create_connection( - host_port=client.get_host_port(), - protocol=client.get_test_protocol(), - ) as kill_client: - kill_client.connect() - if exec_option: - kill_client.set_configuration(exec_option) - assert_kill_ok(kill_client, client.handle_id(handle)) - else: - cancel_result = client.cancel(handle) - assert cancel_result.status_code == 0, \ - 'Unexpected status code from cancel request: %s' % cancel_result + thread.cancel_query() if join_before_close: thread.join() @@ -152,7 +228,7 @@ def __run_cancel_query_and_validate_state(client, query, exec_option, # The KILL QUERY statement will also close the query. if not use_kill_query_statement: try: - client.close_query(handle) + thread.close_query() except IMPALA_CONNECTION_EXCEPTION as e: close_error = e @@ -166,13 +242,17 @@ def __run_cancel_query_and_validate_state(client, query, exec_option, # need to do this after both close_query() and fetch() have returned to ensure # that the synchronous phase of query unregistration has finished and the profile # is final. - profile = client.get_runtime_profile(handle) - if ("- Completed admission: " in profile - and ("- First row fetched:" in profile or "- Request finished:" in profile)): + profile = thread.get_runtime_profile() + if profile and ( + "- Completed admission: " in profile + and ("- First row fetched:" in profile or "- Request finished:" in profile)): # TotalBytesRead is a sentinel that will only be created if ComputeQuerySummary() # has been run by the cancelling thread. assert "- TotalBytesRead:" in profile, profile + str_close_error = str(close_error) if close_error else '' + str_fetch_error = (str(thread.fetch_results_error) if thread.fetch_results_error + else '') if thread.fetch_results_error is None: # If the fetch rpc didn't result in CANCELLED (and auto-close the query) then # the close rpc should have succeeded. @@ -182,24 +262,17 @@ def __run_cancel_query_and_validate_state(client, query, exec_option, # failed with 'Cancelled' or failed with 'Invalid or unknown query handle' # (if the close rpc occured before the fetch rpc). if thread.fetch_results_error is not None: - assert 'Cancelled' in str(thread.fetch_results_error) or \ - ('Invalid or unknown query handle' in str(thread.fetch_results_error) - and not join_before_close), str(thread.fetch_results_error) + assert 'Cancelled' in str_fetch_error or \ + ('Invalid or unknown query handle' in str_fetch_error + and not join_before_close), str_fetch_error else: # If the close rpc encountered an exception, then it must be due to fetch - # noticing the cancellation and doing the auto-close. - assert 'Invalid or unknown query handle' in str(close_error) - assert 'Cancelled' in str(thread.fetch_results_error) + # noticing the cancellation and doing the auto-close, or cancellation went through + # before fetch. + assert ('Cancelled' in str_close_error + or 'Invalid or unknown query handle' in str_close_error) + assert ('Cancelled' in str_fetch_error + or 'Invalid or unknown query handle' in str_fetch_error) # TODO: Add some additional verification to check to make sure the query was # actually canceled - - -def __fetch_results(query, handle): - threading.current_thread().fetch_results_error = None - threading.current_thread().query_profile = None - try: - new_client = ImpalaTestSuite.create_impala_client() - new_client.fetch(query, handle) - except IMPALA_CONNECTION_EXCEPTION as e: - threading.current_thread().fetch_results_error = e
