This is an automated email from the ASF dual-hosted git repository.
maximebeauchemin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git
The following commit(s) were added to refs/heads/master by this push:
new 7d37442 [Bugfix] _add_filters_from_pre_query doesn't handle dim specs
(#3974)
7d37442 is described below
commit 7d374428d375349a9966a39f979d51c72202b969
Author: Jeff Niu <[email protected]>
AuthorDate: Mon Dec 11 12:35:25 2017 -0800
[Bugfix] _add_filters_from_pre_query doesn't handle dim specs (#3974)
* Fixed _add_filters_from_pre_query for dimension specs
* add_filters_from_pre_query ignores extraction functions
---
superset/connectors/druid/models.py | 98 +++++++++++++++++++++++++------------
superset/connectors/druid/views.py | 22 ++++++++-
tests/druid_func_tests.py | 16 +++---
3 files changed, 96 insertions(+), 40 deletions(-)
diff --git a/superset/connectors/druid/models.py
b/superset/connectors/druid/models.py
index acb1951..a666253 100644
--- a/superset/connectors/druid/models.py
+++ b/superset/connectors/druid/models.py
@@ -908,6 +908,9 @@ class DruidDatasource(Model, BaseDatasource):
column_name,
limit=10000):
"""Retrieve some values for the given column"""
+ logging.info(
+ 'Getting values for columns [{}] limited to [{}]'
+ .format(column_name, limit))
# TODO: Use Lexicographic TopNMetricSpec once supported by PyDruid
if self.fetch_values_from:
from_dttm = utils.parse_human_datetime(self.fetch_values_from)
@@ -954,6 +957,37 @@ class DruidDatasource(Model, BaseDatasource):
ret = Filter(type='and', fields=[ff, dim_filter])
return ret
+ def get_aggregations(self, all_metrics):
+ aggregations = OrderedDict()
+ for m in self.metrics:
+ if m.metric_name in all_metrics:
+ aggregations[m.metric_name] = m.json_obj
+ return aggregations
+
+ def check_restricted_metrics(self, aggregations):
+ rejected_metrics = [
+ m.metric_name for m in self.metrics
+ if m.is_restricted and
+ m.metric_name in aggregations.keys() and
+ not sm.has_access('metric_access', m.perm)
+ ]
+ if rejected_metrics:
+ raise MetricPermException(
+ 'Access to the metrics denied: ' + ', '.join(rejected_metrics),
+ )
+
+ def get_dimensions(self, groupby, columns_dict):
+ dimensions = []
+ groupby = [gb for gb in groupby if gb in columns_dict]
+ for column_name in groupby:
+ col = columns_dict.get(column_name)
+ dim_spec = col.dimension_spec if col else None
+ if dim_spec:
+ dimensions.append(dim_spec)
+ else:
+ dimensions.append(column_name)
+ return dimensions
+
def run_query( # noqa / druid
self,
groupby, metrics,
@@ -987,40 +1021,17 @@ class DruidDatasource(Model, BaseDatasource):
query_str = ''
metrics_dict = {m.metric_name: m for m in self.metrics}
-
columns_dict = {c.column_name: c for c in self.columns}
all_metrics, post_aggs = DruidDatasource.metrics_and_post_aggs(
metrics,
metrics_dict)
- aggregations = OrderedDict()
- for m in self.metrics:
- if m.metric_name in all_metrics:
- aggregations[m.metric_name] = m.json_obj
-
- rejected_metrics = [
- m.metric_name for m in self.metrics
- if m.is_restricted and
- m.metric_name in aggregations.keys() and
- not sm.has_access('metric_access', m.perm)
- ]
-
- if rejected_metrics:
- raise MetricPermException(
- 'Access to the metrics denied: ' + ', '.join(rejected_metrics),
- )
+ aggregations = self.get_aggregations(all_metrics)
+ self.check_restricted_metrics(aggregations)
# the dimensions list with dimensionSpecs expanded
- dimensions = []
- groupby = [gb for gb in groupby if gb in columns_dict]
- for column_name in groupby:
- col = columns_dict.get(column_name)
- dim_spec = col.dimension_spec
- if dim_spec:
- dimensions.append(dim_spec)
- else:
- dimensions.append(column_name)
+ dimensions = self.get_dimensions(groupby, columns_dict)
extras = extras or {}
qry = dict(
datasource=self.datasource_name,
@@ -1042,17 +1053,20 @@ class DruidDatasource(Model, BaseDatasource):
having_filters = self.get_having_filters(extras.get('having_druid'))
if having_filters:
qry['having'] = having_filters
+
order_direction = 'descending' if order_desc else 'ascending'
+
if len(groupby) == 0 and not having_filters:
+ logging.info('Running timeseries query for no groupby values')
del qry['dimensions']
client.timeseries(**qry)
elif (
not having_filters and
len(groupby) == 1 and
- order_desc and
- not isinstance(list(qry.get('dimensions'))[0], dict)
+ order_desc
):
dim = list(qry.get('dimensions'))[0]
+ logging.info('Running two-phase topn query for dimension
[{}]'.format(dim))
if timeseries_limit_metric:
order_by = timeseries_limit_metric
else:
@@ -1063,9 +1077,14 @@ class DruidDatasource(Model, BaseDatasource):
pre_qry['threshold'] = min(row_limit,
timeseries_limit or row_limit)
pre_qry['metric'] = order_by
- pre_qry['dimension'] = dim
+ if isinstance(dim, dict):
+ if 'dimension' in dim:
+ pre_qry['dimension'] = dim['dimension']
+ else:
+ pre_qry['dimension'] = dim
del pre_qry['dimensions']
client.topn(**pre_qry)
+ logging.info('Phase 1 Complete')
query_str += '// Two phase query\n// Phase 1\n'
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
@@ -1077,19 +1096,22 @@ class DruidDatasource(Model, BaseDatasource):
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
- qry['dimensions'], filters)
+ [pre_qry['dimension']],
+ filters)
qry['threshold'] = timeseries_limit or 1000
if row_limit and granularity == 'all':
qry['threshold'] = row_limit
- qry['dimension'] = list(qry.get('dimensions'))[0]
qry['dimension'] = dim
del qry['dimensions']
qry['metric'] = list(qry['aggregations'].keys())[0]
client.topn(**qry)
+ logging.info('Phase 2 Complete')
elif len(groupby) > 0:
# If grouping on multiple fields or using a having filter
# we have to force a groupby query
+ logging.info('Running groupby query for dimensions
[{}]'.format(dimensions))
if timeseries_limit and is_timeseries:
+ logging.info('Running two-phase query for timeseries')
order_by = metrics[0] if metrics else self.metrics[0]
if timeseries_limit_metric:
order_by = timeseries_limit_metric
@@ -1107,7 +1129,18 @@ class DruidDatasource(Model, BaseDatasource):
'direction': order_direction,
}],
}
+ pre_qry_dims = []
+ # Replace dimensions specs with their `dimension`
+ # values, and ignore those without
+ for dim in qry['dimensions']:
+ if isinstance(dim, dict):
+ if 'dimension' in dim:
+ pre_qry_dims.append(dim['dimension'])
+ else:
+ pre_qry_dims.append(dim)
+ pre_qry['dimensions'] = list(set(pre_qry_dims))
client.groupby(**pre_qry)
+ logging.info('Phase 1 Complete')
query_str += '// Two phase query\n// Phase 1\n'
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
@@ -1119,7 +1152,7 @@ class DruidDatasource(Model, BaseDatasource):
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
- qry['dimensions'],
+ pre_qry['dimensions'],
filters,
)
qry['limit_spec'] = None
@@ -1134,6 +1167,7 @@ class DruidDatasource(Model, BaseDatasource):
}],
}
client.groupby(**qry)
+ logging.info('Query Complete')
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
return query_str
diff --git a/superset/connectors/druid/views.py
b/superset/connectors/druid/views.py
index ad3664b..66b3bc5 100644
--- a/superset/connectors/druid/views.py
+++ b/superset/connectors/druid/views.py
@@ -1,4 +1,5 @@
from datetime import datetime
+import json
import logging
from flask import flash, Markup, redirect
@@ -61,9 +62,28 @@ class DruidColumnInlineView(CompactCRUDMixin,
SupersetModelView): # noqa
True),
}
+ def pre_update(self, col):
+ # If a dimension spec JSON is given, ensure that it is
+ # valid JSON and that `outputName` is specified
+ if col.dimension_spec_json:
+ try:
+ dimension_spec = json.loads(col.dimension_spec_json)
+ except ValueError as e:
+ raise ValueError('Invalid Dimension Spec JSON: ' + str(e))
+ if not isinstance(dimension_spec, dict):
+ raise ValueError('Dimension Spec must be a JSON object')
+ if 'outputName' not in dimension_spec:
+ raise ValueError('Dimension Spec does not contain
`outputName`')
+ if 'dimension' not in dimension_spec:
+ raise ValueError('Dimension Spec is missing `dimension`')
+ # `outputName` should be the same as the `column_name`
+ if dimension_spec['outputName'] != col.column_name:
+ raise ValueError(
+ '`outputName` [{}] unequal to `column_name` [{}]'
+ .format(dimension_spec['outputName'], col.column_name))
+
def post_update(self, col):
col.generate_metrics()
- utils.validate_json(col.dimension_spec_json)
def post_add(self, col):
self.post_update(col)
diff --git a/tests/druid_func_tests.py b/tests/druid_func_tests.py
index 4c047df..74da486 100644
--- a/tests/druid_func_tests.py
+++ b/tests/druid_func_tests.py
@@ -226,7 +226,8 @@ class DruidFuncTestCase(unittest.TestCase):
self.assertIn('dimensions', client.groupby.call_args_list[0][1])
self.assertEqual(['col1'],
client.groupby.call_args_list[0][1]['dimensions'])
# order_desc but timeseries and dimension spec
- spec = {'spec': 1}
+ # calls topn with single dimension spec 'dimension'
+ spec = {'outputName': 'hello', 'dimension': 'matcho'}
spec_json = json.dumps(spec)
col3 = DruidColumn(column_name='col3', dimension_spec_json=spec_json)
ds.columns.append(col3)
@@ -238,13 +239,14 @@ class DruidFuncTestCase(unittest.TestCase):
client=client, order_desc=True, timeseries_limit=5,
filter=[], row_limit=100,
)
- self.assertEqual(0, len(client.topn.call_args_list))
- self.assertEqual(2, len(client.groupby.call_args_list))
+ self.assertEqual(2, len(client.topn.call_args_list))
+ self.assertEqual(0, len(client.groupby.call_args_list))
self.assertEqual(0, len(client.timeseries.call_args_list))
- self.assertIn('dimensions', client.groupby.call_args_list[0][1])
- self.assertIn('dimensions', client.groupby.call_args_list[1][1])
- self.assertEqual([spec],
client.groupby.call_args_list[0][1]['dimensions'])
- self.assertEqual([spec],
client.groupby.call_args_list[1][1]['dimensions'])
+ self.assertIn('dimension', client.topn.call_args_list[0][1])
+ self.assertIn('dimension', client.topn.call_args_list[1][1])
+ # uses dimension for pre query and full spec for final query
+ self.assertEqual('matcho',
client.topn.call_args_list[0][1]['dimension'])
+ self.assertEqual(spec, client.topn.call_args_list[1][1]['dimension'])
def test_run_query_multiple_groupby(self):
client = Mock()
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].