LENS-1202: Add client side iterator for result in python client
Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/acb32d54 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/acb32d54 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/acb32d54 Branch: refs/heads/current-release-line Commit: acb32d5400a61037c575dae44b31a884ff6773a7 Parents: 3780744 Author: Rajat Khandelwal <[email protected]> Authored: Mon Jul 25 15:02:17 2016 +0530 Committer: Rajat Khandelwal <[email protected]> Committed: Mon Jul 25 15:02:17 2016 +0530 ---------------------------------------------------------------------- contrib/clients/python/README.md | 67 ++++++- contrib/clients/python/lens/client/main.py | 23 +-- contrib/clients/python/lens/client/models.py | 6 - contrib/clients/python/lens/client/query.py | 196 ++++++++++++++++++-- contrib/clients/python/lens/client/session.py | 50 +++++ contrib/clients/python/test/test_lensclient.py | 78 +++++--- 6 files changed, 343 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/README.md ---------------------------------------------------------------------- diff --git a/contrib/clients/python/README.md b/contrib/clients/python/README.md index 00525ff..93a19c9 100644 --- a/contrib/clients/python/README.md +++ b/contrib/clients/python/README.md @@ -8,13 +8,60 @@ You can install like this: ## Usage - from lens.client import LensClient - with LensClient("http://lens.server.url/", "user.name", database="db") as client: - handle = client.queries.submit("cube select ...", query_name="My first query") - # Optionally wait for completion. - while not client.queries[handle].finished: - time.sleep(20) # sleep 20 seconds - print client.queries[handle].result_set_path - # listing queries: - for handle in client.queries(state='RUNNING'): - print client.queries[handle] + +### Listing queries +```python +with LensClient("http://lens.server.url/", "user.name", database="db") as client: + for handle in client.queries(state='RUNNING'): + print client.queries[handle] +``` + +### Async submission +``` python +from lens.client import LensClient +with LensClient("http://lens.server.url/", "user.name", database="db") as client: + # Submit asynchronously + handle = client.queries.submit("cube select ...", query_name="My first query") + # You can wait for completion and get the entire query detail: + query = client.queries.wait_till_finish(handle, poll_interval=5) # poll each 5 seconds + # the path would be accessible from lens server machine. Not much useful for the client + print query.result_set_path + # iterating over result: + for row in query.result: + print row + # Writing result to local csv: + with open('result.csv', 'w') as csvfile: + writer = csv.writer(csvfile) + for row in query.result: # This will fetch the result again from lens server + writer.writerow(row) + # listing queries: + for handle in client.queries(state='RUNNING'): + print client.queries[handle] +``` + +### Sync submission +```python +from lens.client import LensClient +with LensClient("http://lens.server.url/", "user.name", database="db") as client: + # Half async: The http call will return in 10 seconds, post that, query would be cancelled (depending on the server's configurations) + query = client.queries.submit("cube select ...", query_name="My first query", timeout=10) # 10 seconds + if query.status.status != 'CANCELLED': + result = query.result +``` + +### Async submission with wait +```python +from lens.client import LensClient +with LensClient("http://lens.server.url/", "user.name", database="db") as client: + # Pseudo-sync + query = client.queries.submit("cube select ...", query_name="My first query", wait=True, poll_interval=5) # submit async and wait till finish, polling every 5 seconds. poll_interval is optional + query.result +``` + +### Fetching just results +```python +from lens.client import LensClient +with LensClient("http://lens.server.url/", "user.name", database="db") as client: + # Direct result. Query handle and other details will be lost. + result = client.queries.submit("cube select ...", query_name="My first query", fetch_result=True, poll_interval=5, delimiter=",", custom_mappings={}) +``` http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/lens/client/main.py ---------------------------------------------------------------------- diff --git a/contrib/clients/python/lens/client/main.py b/contrib/clients/python/lens/client/main.py index dc60497..bf5d81e 100644 --- a/contrib/clients/python/lens/client/main.py +++ b/contrib/clients/python/lens/client/main.py @@ -16,11 +16,11 @@ # import os -import requests from six import string_types from .log import LensLogClient +from .session import LensSessionClient from .query import LensQueryClient -from .utils import conf_to_xml, xml_file_to_conf +from .utils import xml_file_to_conf class LensClient(object): @@ -37,25 +37,12 @@ class LensClient(object): self.base_url += "/" username = username or conf.get('lens.client.user.name', "anonymous") database = database or conf.get('lens.client.dbname') - self.open_session(username, password, database, conf) - self.queries = LensQueryClient(self.base_url, self._sessionid) + self.session = LensSessionClient(self.base_url, username, password, database, conf) + self.queries = LensQueryClient(self.base_url, self.session) self.logs = LensLogClient(self.base_url) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.close_session() - - def close_session(self): - if self._sessionid: - requests.delete(self.base_url + "session/", params={'sessionid': self._sessionid}) - self._sessionid = None - - def open_session(self, username, password, database, conf): - payload = [('username', username), ('password', password), ('sessionconf', conf_to_xml(conf))] - if database: - payload.append(('database', database)) - r = requests.post(self.base_url + "session/", files=payload, headers={'accept': 'application/xml'}) - r.raise_for_status() - self._sessionid = r.text + self.session.close() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/lens/client/models.py ---------------------------------------------------------------------- diff --git a/contrib/clients/python/lens/client/models.py b/contrib/clients/python/lens/client/models.py index 7451fe9..1860540 100644 --- a/contrib/clients/python/lens/client/models.py +++ b/contrib/clients/python/lens/client/models.py @@ -56,9 +56,3 @@ class WrappedJson(dict): def __eq__(self, other): return super(WrappedJson, self).__eq__(other) or ( self._is_wrapper and other._is_wrapper and str(self) == str(other)) - - -class LensQuery(WrappedJson): - @property - def finished(self): - return self.status.status in ('SUCCESSFUL', 'FAILED', 'CANCELED', 'CLOSED') http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/lens/client/query.py ---------------------------------------------------------------------- diff --git a/contrib/clients/python/lens/client/query.py b/contrib/clients/python/lens/client/query.py index 88ce719..f82f0cb 100644 --- a/contrib/clients/python/lens/client/query.py +++ b/contrib/clients/python/lens/client/query.py @@ -14,23 +14,138 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import codecs +import time +import zipfile import requests -import time -from six import string_types -from .models import WrappedJson, LensQuery +from six import string_types, BytesIO, StringIO, PY2, PY3 +from .models import WrappedJson from .utils import conf_to_xml +import csv + +long_type = int + +if PY3: + from collections.abc import Iterable as Iterable +elif PY2: + from collections import Iterable as Iterable + long_type = long + + +class LensQuery(WrappedJson): + def __init__(self, client, *args, **kwargs): + super(LensQuery, self).__init__(*args, **kwargs) + self.client = client + + @property + def finished(self): + return self.status.status in ('SUCCESSFUL', 'FAILED', 'CANCELED', 'CLOSED') + + def get_result(self, *args, **kwargs): + return self.client.get_result(self, *args, **kwargs) + + result = property(get_result) + + +type_mappings = {'BOOLEAN': bool, + 'TINYINT': int, + 'SMALLINT': int, + 'INT': int, + 'BIGINT': long_type, + 'FLOAT': float, + 'DOUBLE': float, + 'TIMESTAMP': long_type, + 'BINARY': bin, + 'ARRAY': list, + 'MAP': dict, + # 'STRUCT,': str, + # 'UNIONTYPE,': float, + # 3'USER_DEFINED,': float, + 'DECIMAL,': float, + # 'NULL,': float, + # 'DATE,': float, + # 'VARCHAR,': float, + # 'CHAR': float + } +default_mapping = lambda x: x + +class LensQueryResult(Iterable): + def __init__(self, custom_mappings=None): + if custom_mappings is None: + custom_mappings = {} + self.custom_mappings = custom_mappings + + def _mapping(self, type_name): + if type_name in self.custom_mappings: + return self.custom_mappings[type_name] + if type_name in type_mappings: + return type_mappings[type_name] + return default_mapping + + +class LensInMemoryResult(LensQueryResult): + def __init__(self, resp, custom_mappings=None): + super(LensInMemoryResult, self).__init__(custom_mappings) + self.rows = resp.in_memory_query_result.rows + + def __iter__(self): + for row in self.rows: + yield list(self._mapping(value.type)(value.value) if value else None for value in row['values']) + +class LensPersistentResult(LensQueryResult): + def __init__(self, header, response, encoding=None, is_header_present=True, delimiter=",", + custom_mappings=None): + super(LensPersistentResult, self).__init__(custom_mappings) + self.response = response + self.is_zipped = 'zip' in self.response.headers['content-disposition'] + self.delimiter = str(delimiter) + self.is_header_present = is_header_present + self.encoding = encoding + self.header = header + + def _parse_line(self, line): + return list(self._mapping(self.header.columns[index].type)(line[index]) for index in range(len(line))) + + def __iter__(self): + if self.is_zipped: + byte_stream = BytesIO(self.response.content) + with zipfile.ZipFile(byte_stream) as self.zipfile: + for name in self.zipfile.namelist(): + with self.zipfile.open(name) as single_file: + if name[-3:] == 'csv': + reader = csv.reader(single_file, delimiter=self.delimiter) + else: + reader = single_file + reader_iterator = iter(reader) + if self.is_header_present: + next(reader_iterator) + for line in reader_iterator: + yield self._parse_line(line) + byte_stream.close() + else: + stream = codecs.iterdecode(self.response.iter_lines(), + self.response.encoding or self.response.apparent_encoding) + reader = csv.reader(stream, delimiter=self.delimiter) + reader_iterator = iter(reader) + if self.is_header_present: + next(reader_iterator) + for line in reader_iterator: + yield self._parse_line(line) + stream.close() class LensQueryClient(object): - def __init__(self, base_url, sessionid): - self._sessionid = sessionid + def __init__(self, base_url, session): + self._session = session self.base_url = base_url + "queryapi/" self.launched_queries = [] self.finished_queries = {} + self.query_confs = {} + self.is_header_present_in_result = self._session['lens.query.output.write.header'].lower() in ['true', '1', 't', 'y', 'yes', 'yeah', 'yup'] def __call__(self, **filters): - filters['sessionid'] = self._sessionid + filters['sessionid'] = self._session._sessionid resp = requests.get(self.base_url + "queries/", params=filters, headers={'accept': 'application/json'}) return self.sanitize_response(resp) @@ -38,34 +153,79 @@ class LensQueryClient(object): if isinstance(item, string_types): if item in self.finished_queries: return self.finished_queries[item] - resp = requests.get(self.base_url + "queries/" + item, params={'sessionid': self._sessionid}, - headers={'accept': 'application/json'}) + resp = requests.get(self.base_url + "queries/" + item, params={'sessionid': self._session._sessionid}, + headers={'accept': 'application/json'}) resp.raise_for_status() - query = LensQuery(resp.json(object_hook=WrappedJson)) + query = LensQuery(self, resp.json(object_hook=WrappedJson)) if query.finished: + query.client = self self.finished_queries[item] = query return query + elif isinstance(item, LensQuery): + return self[item.query_handle] elif isinstance(item, WrappedJson): if item._is_wrapper: return self[item._wrapped_value] + if item.query_handle: + return self[item.query_handle] raise Exception("Can't get query: " + str(item)) - def submit(self, query, operation="execute", query_name=None, timeout=None, conf=None): - payload = [('sessionid', self._sessionid), ('query', query), ('operation', operation)] + def submit(self, query, operation=None, query_name=None, timeout=None, conf=None, wait=False, fetch_result=False, + *args, **kwargs): + payload = [('sessionid', self._session._sessionid), ('query', query)] if query_name: payload.append(('queryName', query_name)) if timeout: - payload.append(('timeoutmillis', timeout)) + payload.append(('timeoutmillis', str(int(timeout) * 1000))) + if not operation: + operation = "execute_with_timeout" if timeout else "execute" + payload.append(('operation', operation)) payload.append(('conf', conf_to_xml(conf))) resp = requests.post(self.base_url + "queries/", files=payload, headers={'accept': 'application/json'}) query = self.sanitize_response(resp) - self.launched_queries.append(query) + if conf: + self.query_confs[str(query)] = conf + if fetch_result: + # get result and return + return self.get_result(query, *args, **kwargs) # query is handle here + elif wait: + # fetch details and return + return self.wait_till_finish(query, *args, **kwargs) + # just return handle. This would be the async case. Or execute with timeout, without wait return query - def wait_till_finish(self, handle): - while not self[handle].finished: - time.sleep(1) - return self[handle] + def wait_till_finish(self, handle_or_query, poll_interval=5, *args, **kwargs): + while not self[handle_or_query].finished: + time.sleep(poll_interval) + return self[handle_or_query] + + def get_result(self, handle_or_query, *args, **kwargs): + query = self.wait_till_finish(handle_or_query, *args, **kwargs) + handle = str(query.query_handle) + if query.status.status == 'SUCCESSFUL' and query.status.is_result_set_available: + resp = requests.get(self.base_url + "queries/" + handle + "/resultsetmetadata", + params={'sessionid': self._session._sessionid}, headers={'accept': 'application/json'}) + metadata = self.sanitize_response(resp) + # Try getting the result through http result + resp = requests.get(self.base_url + "queries/" + handle + "/httpresultset", + params={'sessionid': self._session._sessionid}, stream=True) + if resp.ok: + is_header_present = self.is_header_present_in_result + if handle in self.query_confs and 'lens.query.output.write.header' in self.query_confs[handle]: + is_header_present = bool(self.query_confs[handle]['lens.query.output.write.header']) + return LensPersistentResult(metadata, resp, is_header_present=is_header_present, *args, **kwargs) + else: + response = requests.get(self.base_url + "queries/" + handle + "/resultset", + params={'sessionid': self._session._sessionid}, headers={'accept': 'application/json'}) + resp = self.sanitize_response(response) + # If it has in memory result, return inmemory result iterator + if resp._is_wrapper and resp._wrapped_key == u'inMemoryQueryResult': + return LensInMemoryResult(resp) + # Else, return whatever you got + return resp + + else: + raise Exception("Result set not available") def sanitize_response(self, resp): resp.raise_for_status() @@ -85,5 +245,3 @@ class LensQueryClient(object): except: resp_json = resp.json() return resp_json - - http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/lens/client/session.py ---------------------------------------------------------------------- diff --git a/contrib/clients/python/lens/client/session.py b/contrib/clients/python/lens/client/session.py new file mode 100644 index 0000000..a1ccc4b --- /dev/null +++ b/contrib/clients/python/lens/client/session.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import requests + +from .models import WrappedJson +from .utils import conf_to_xml + + +class LensSessionClient(object): + def __init__(self, base_url, username, password, database, conf): + self.base_url = base_url + "session/" + self.open(username, password, database, conf) + + def __getitem__(self, key): + resp = requests.get(self.base_url + "params", + params={'sessionid': self._sessionid, 'key': key}, + headers={'accept': 'application/json'}) + if resp.ok: + params = resp.json(object_hook=WrappedJson) + text = params.elements[0] + if key in text: + text = text[len(key)+1:] + return text + + def open(self, username, password, database, conf): + payload = [('username', username), ('password', password), ('sessionconf', conf_to_xml(conf))] + if database: + payload.append(('database', database)) + r = requests.post(self.base_url, files=payload, headers={'accept': 'application/xml'}) + r.raise_for_status() + self._sessionid = r.text + + def close(self): + requests.delete(self.base_url, params={'sessionid': self._sessionid}) + http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/test/test_lensclient.py ---------------------------------------------------------------------- diff --git a/contrib/clients/python/test/test_lensclient.py b/contrib/clients/python/test/test_lensclient.py index d49c55b..e9b0bdb 100644 --- a/contrib/clients/python/test/test_lensclient.py +++ b/contrib/clients/python/test/test_lensclient.py @@ -16,26 +16,28 @@ # from __future__ import print_function +import glob +import os import random import string -from contextlib import contextmanager -from requests.exceptions import HTTPError - +import subprocess import time +from contextlib import contextmanager import pytest -from lens.client import LensClient from lens.client.models import WrappedJson -import subprocess -import os -import glob +from requests.exceptions import HTTPError + +from lens.client import LensClient + def check_output(command): output = subprocess.check_output(command.split()) - if isinstance(output, bytes): # For Python 3. Python 2 directly gives string + if isinstance(output, bytes): # For Python 3. Python 2 directly gives string output = output.decode("utf-8") return output + @contextmanager def cwd(dir): cur_dir = os.getcwd() @@ -43,26 +45,34 @@ def cwd(dir): yield os.chdir(cur_dir) + def time_sorted_ls(path): mtime = lambda f: os.stat(os.path.join(path, f)).st_mtime return list(sorted(os.listdir(path), key=mtime)) + def has_error(msg): return any(x in msg for x in ('Error', 'error', 'Exception', 'exception')) + def get_error(): latest_out_file = list(name for name in time_sorted_ls('logs') if 'lensserver.out' in name)[-1] - print (latest_out_file) + print(latest_out_file) with open(os.path.join('logs', latest_out_file)) as f: return f.read() + def select_query(path): with open(path) as f: for line in f: if 'cube select' in line and 'sample_cube' in line: return line + class TestLensClient(object): + query = "cube select dim1, measure2 from sample_cube where time_range_in(dt, '2014-06-24-23', '2014-06-25-00')" + expected_result = [[21, 100], [22, 200], [23, 300], [24, 400], [25, 500], [26, 600], [27, 700], [28, 800]] + @classmethod def setup_class(cls): cls.db = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10)) @@ -109,6 +119,7 @@ class TestLensClient(object): populate_output = check_output('bin/run-examples.sh populate-metastore -db ' + cls.db) if has_error(populate_output): raise Exception("Couldn't populate sample metastore: " + populate_output) + @classmethod def teardown_class(cls): # TODO: drop database @@ -123,18 +134,10 @@ class TestLensClient(object): with cwd('server'): stop_output = check_output('bin/lens-ctl stop') if has_error(stop_output): - raise("Error stopping server: " + stop_output) + raise ("Error stopping server: " + stop_output) def get_client(self): - return LensClient(database = self.db, conf=os.path.join(self.base_path, 'client', 'conf')) - - def test_auto_close_session(self): - with self.get_client() as client: - pass - with pytest.raises(HTTPError) as e: - # Now any api should give 410 - client.queries(state='RUNNING') - assert e.value.response.status_code == 410 + return LensClient(database=self.db, conf=os.path.join(self.base_path, 'client', 'conf')) def test_wrong_query(self): with self.get_client() as client: @@ -146,16 +149,43 @@ class TestLensClient(object): def test_submit_query(self): with self.get_client() as client: handle = client.queries.submit(self.candidate_query) - # session not closed - assert client.queries[handle] - client.queries.wait_till_finish(handle) - client.close_session() + with pytest.raises(HTTPError) as e: + # Either of these can give 410 + client.queries.wait_till_finish(handle) + client.queries.submit(self.candidate_query) + assert e.value.response.status_code == 410 def test_list_query(self): with self.get_client() as client: handle = client.queries.submit(self.candidate_query, query_name="Candidate Query") finished_query = client.queries.wait_till_finish(handle) assert client.queries[handle] == finished_query - queries = client.queries(state='SUCCESSFUL', fromDate=finished_query.submission_time - 1, toDate=finished_query.submission_time + 1) + queries = client.queries(state='SUCCESSFUL', fromDate=finished_query.submission_time - 1, + toDate=finished_query.submission_time + 1) assert handle in queries + def test_non_persisted_result(self): + with self.get_client() as client: + result = client.queries.submit(self.query, fetch_result=True) + assert str(result)[:30] == 'file:/tmp/lensreports/hdfsout/' + + def test_persisted_result(self): + with self.get_client() as client: + result = client.queries.submit(self.query, conf={'lens.query.enable.persistent.resultset': True}, + delimiter=u'\x01', fetch_result=True) + assert list(iter(result)) == self.expected_result + + def test_persistent_result_with_header(self): + with self.get_client() as client: + result = client.queries.submit(self.query, + conf={'lens.query.enable.persistent.resultset': True, + 'lens.query.output.write.header': True}, + delimiter=u'\x01', fetch_result=True) + assert list(iter(result)) == self.expected_result + + def test_inmemory_result(self): + with self.get_client() as client: + result = client.queries.submit(self.query, + conf={'lens.query.enable.persistent.resultset.indriver': False}, + fetch_result=True) + assert list(iter(result)) == self.expected_result \ No newline at end of file
