Reuse session for executing queries (Hive on Spark) Change-Id: I06c798dc311d63eb0a875450fd26d06db4e84a03 Reviewed-on: http://gerrit.cloudera.org:8080/2374 Reviewed-by: Taras Bobrovytsky <[email protected]> Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e1c5959b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e1c5959b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e1c5959b Branch: refs/heads/master Commit: e1c5959b4dfbd13c60048d705d9a147ffb40cd04 Parents: 3093054 Author: Kapil Rastogi <[email protected]> Authored: Tue Feb 2 14:14:14 2016 -0800 Committer: Tim Armstrong <[email protected]> Committed: Thu May 12 14:17:54 2016 -0700 ---------------------------------------------------------------------- tests/performance/query_exec_functions.py | 8 ++++++-- tests/performance/scheduler.py | 4 ++++ 2 files changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e1c5959b/tests/performance/query_exec_functions.py ---------------------------------------------------------------------- diff --git a/tests/performance/query_exec_functions.py b/tests/performance/query_exec_functions.py index 910c563..0b3307b 100644 --- a/tests/performance/query_exec_functions.py +++ b/tests/performance/query_exec_functions.py @@ -30,6 +30,7 @@ from tests.performance.query_executor import ( ) from tests.util.shell_util import exec_process from time import time +import threading DEFAULT_BEESWAX_PORT = 21000 DEFAULT_HS2_PORT = 21050 @@ -59,11 +60,15 @@ def get_hs2_hive_cursor(hiveserver, user=None, use_kerberos=False, def execute_using_hive_hs2(query, query_config): exec_result = HiveQueryResult(query, query_config=query_config) plugin_runner = query_config.plugin_runner - cursor = get_hs2_hive_cursor(query_config.hiveserver, + cursor = getattr(threading.current_thread(), 'cursor', None) + if cursor is None: + cursor = get_hs2_hive_cursor(query_config.hiveserver, user=query_config.user, database=query.db, use_kerberos=query_config.use_kerberos, execOptions=query_config.exec_options) + threading.current_thread().cursor = cursor + if cursor is None: return exec_result if plugin_runner: plugin_runner.run_plugins_pre(scope="Query") @@ -77,7 +82,6 @@ def execute_using_hive_hs2(query, query_config): LOG.error(str(e)) exec_result.query_error = str(e) finally: - cursor.close() if plugin_runner: plugin_runner.run_plugins_post(scope="Query") return exec_result http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e1c5959b/tests/performance/scheduler.py ---------------------------------------------------------------------- diff --git a/tests/performance/scheduler.py b/tests/performance/scheduler.py index eaf5437..33623f1 100644 --- a/tests/performance/scheduler.py +++ b/tests/performance/scheduler.py @@ -24,6 +24,7 @@ from copy import deepcopy from random import shuffle from sys import exit from threading import Lock, Thread, Event +import threading logging.basicConfig(level=logging.INFO, format='%(name)s %(threadName)s: %(message)s') LOG = logging.getLogger('scheduler') @@ -123,6 +124,9 @@ class Scheduler(object): workload_time_sec += query_executor.result.time_taken if self.query_iterations == 1: LOG.info("Workload iteration %d finished in %s seconds" % (j+1, workload_time_sec)) + cursor = getattr(threading.current_thread(), 'cursor', None) + if cursor is not None: + cursor.close() def run(self): """Run the query pipelines concurrently"""
