This is an automated email from the ASF dual-hosted git repository. johnbodley pushed a commit to branch feature--embeddable-charts-pilot in repository https://gitbox.apache.org/repos/asf/incubator-superset.git
commit 3f400831d680b77f996c194f1311aa560b0706fe Author: Conglei Shi <[email protected]> AuthorDate: Mon Nov 12 12:34:15 2018 -0800 added get_data ; --- superset/common/query_context.py | 198 ++++++++++++++++++++++++++++++++++++++- superset/common/query_object.py | 8 ++ 2 files changed, 203 insertions(+), 3 deletions(-) diff --git a/superset/common/query_context.py b/superset/common/query_context.py index 21b0dac..f17f39c 100644 --- a/superset/common/query_context.py +++ b/superset/common/query_context.py @@ -1,9 +1,16 @@ # pylint: disable=R -from typing import Dict, List +from typing import Dict, List, from superset import db from superset.connectors.connector_registry import ConnectorRegistry +from superset import app, cache from .query_object import QueryObject +import pickle as pkl + +from superset.utils.core import ( + DTTM_ALIAS, + JS_MAX_INTEGER, +) class QueryContext: @@ -11,17 +18,202 @@ class QueryContext: The query context contains the query object and additional fields necessary to retrieve the data payload for a given viz. """ + + default_fillna = 0 + cache_type = 'df' + enforce_numerical_metrics = True + # TODO: Type datasource and query_object dictionary with TypedDict when it becomes # a vanilla python type https://github.com/python/mypy/issues/5288 def __init__( self, datasource: Dict, queries: List[Dict], + force: bool = False, ): self.datasource = ConnectorRegistry.get_datasource(datasource.get('type'), int(datasource.get('id')), db.session) self.queries = list(map(lambda query_obj: QueryObject(**query_obj), queries)) - def get_data(self): - raise NotImplementedError() + self.force = force + + self.query_details = [] + + def get_df(self, query_object): + """Returns a pandas dataframe based on the query object""" + + # Here, we assume that all the queries will use the same datasource, which is + # is a valid assumption for current setting. In a long term, we may or maynot support + # multiple queries from different data source. + + timestamp_format = None + if self.datasource.type == 'table': + dttm_col = self.datasource.get_col(query_obj.granularity) + if dttm_col: + timestamp_format = dttm_col.python_date_format + + # The datasource here can be different backend but the interface is common + result = self.datasource.query(query_object.to_dict()) + query_detail = { + 'raw_query': result.query, + 'status': result.status, + 'error_message': result.error_message, + } + self.query_details.append(query_detail) + + df = result.df + # Transform the timestamp we received from database to pandas supported + # datetime format. If no python_date_format is specified, the pattern will + # be considered as the default ISO date format + # If the datetime format is unix, the parse will use the corresponding + # parsing logic + if df is not None and not df.empty: + if DTTM_ALIAS in df.columns: + if timestamp_format in ('epoch_s', 'epoch_ms'): + # Column has already been formatted as a timestamp. + df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp) + else: + df[DTTM_ALIAS] = pd.to_datetime( + df[DTTM_ALIAS], utc=False, format=timestamp_format) + if self.datasource.offset: + df[DTTM_ALIAS] += timedelta(hours=self.datasource.offset) + df[DTTM_ALIAS] += self.time_shift + + if self.enforce_numerical_metrics: + self.df_metrics_to_num(df, query_object) + + df.replace([np.inf, -np.inf], np.nan) + df = self.handle_nulls(df) + return df + + def df_metrics_to_num(self, df, query_object): + """Converting metrics to numeric when pandas.read_sql cannot""" + metrics = query_object.get_metric_labels() + for col, dtype in df.dtypes.items(): + if dtype.type == np.object_ and col in metrics: + df[col] = pd.to_numeric(df[col], errors='coerce') + + def handle_nulls(self, df): + fillna = self.get_fillna_for_columns(df.columns) + return df.fillna(fillna) + + def get_fillna_for_col(self, col): + """Returns the value to use as filler for a specific Column.type""" + if col: + if col.is_string: + return ' NULL' + return self.default_fillna + + def get_fillna_for_columns(self, columns=None): + """Returns a dict or scalar that can be passed to DataFrame.fillna""" + if columns is None: + return self.default_fillna + columns_dict = {col.column_name: col for col in self.datasource.columns} + fillna = { + c: self.get_fillna_for_col(columns_dict.get(c)) + for c in columns + } + return fillna + + def get_data(self, df): + return df.to_dict(orient='records') + + def get_payload(self, query_obj): + """Returns a payload of metadata and data""" + payload = self.get_df_payload(query_obj) + df = payload.get('df') + if self.status != utils.QueryStatus.FAILED: + if df is not None and df.empty: + payload['error'] = 'No data' + else: + payload['data'] = self.get_data(d) + if 'df' in payload: + del payload['df'] + return payload + + def get_payloads(self): + """Get all the paylaods from the arrays""" + return [self.get_payload(query_ojbect) for query_ojbect in self.queries] + + def get_df_payload(self, query_obj): + """Handles caching around the df paylod retrieval""" + cache_key = query_obj.cache_key() if query_obj else None + logging.info('Cache key: {}'.format(cache_key)) + is_loaded = False + stacktrace = None + df = None + cached_dttm = datetime.utcnow().isoformat().split('.')[0] + if cache_key and cache and not self.force: + cache_value = cache.get(cache_key) + if cache_value: + stats_logger.incr('loaded_from_cache') + try: + cache_value = pkl.loads(cache_value) + df = cache_value['df'] + self.query = cache_value['query'] + self._any_cached_dttm = cache_value['dttm'] + self._any_cache_key = cache_key + self.status = utils.QueryStatus.SUCCESS + is_loaded = True + except Exception as e: + logging.exception(e) + logging.error('Error reading cache: ' + + utils.error_msg_from_exception(e)) + logging.info('Serving from cache') + + if query_obj and not is_loaded: + try: + df = self.get_df(query_obj) + if self.status != utils.QueryStatus.FAILED: + stats_logger.incr('loaded_from_source') + is_loaded = True + except Exception as e: + logging.exception(e) + if not self.error_message: + self.error_message = '{}'.format(e) + self.status = utils.QueryStatus.FAILED + stacktrace = traceback.format_exc() + + if ( + is_loaded and + cache_key and + cache and + self.status != utils.QueryStatus.FAILED): + try: + cache_value = dict( + dttm=cached_dttm, + df=df if df is not None else None, + query=self.query, + ) + cache_value = pkl.dumps( + cache_value, protocol=pkl.HIGHEST_PROTOCOL) + + logging.info('Caching {} chars at key {}'.format( + len(cache_value), cache_key)) + + stats_logger.incr('set_cache_key') + cache.set( + cache_key, + cache_value, + timeout=self.cache_timeout) + except Exception as e: + # cache.set call can fail if the backend is down or if + # the key is too large or whatever other reasons + logging.warning('Could not cache key {}'.format(cache_key)) + logging.exception(e) + cache.delete(cache_key) + return { + 'cache_key': self._any_cache_key, + 'cached_dttm': self._any_cached_dttm, + 'cache_timeout': self.cache_timeout, + 'df': df, + 'error': self.error_message, + 'form_data': self.form_data, + 'is_cached': self._any_cache_key is not None, + 'query': self.query, + 'status': self.status, + 'stacktrace': stacktrace, + 'rowcount': len(df.index) if df is not None else 0, + } + diff --git a/superset/common/query_object.py b/superset/common/query_object.py index 8116d26..dfac02c 100644 --- a/superset/common/query_object.py +++ b/superset/common/query_object.py @@ -45,3 +45,11 @@ class QueryObject: def to_dict(self): raise NotImplementedError() + + + def get_metric_labels(self): + raise NotImplementedError() + + + def cache_key(self): + raise NotImplementedError() \ No newline at end of file
