This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3813ff9eb8 Filtering datasets by recent update events (#26942)
3813ff9eb8 is described below
commit 3813ff9eb8cded5be0cb51cc9e091eace1d156e4
Author: Brent Bovenzi <[email protected]>
AuthorDate: Thu Oct 13 17:00:36 2022 -0400
Filtering datasets by recent update events (#26942)
* Enable filtering on datasets by last updated time
* Add tests for filtering on dataset last updated time
* use updated_after to filter datasets by recent updates
* change button copy and add to url params
* Only count objects in the filtered query not the total number of datasets
* Only count datasets after filter with DatasetEvent.timestamp
* Tweak the filtering for and counting with uri_pattern
* clean up filtering logic
* apply filters to query and count_query
* Update airflow/www/views.py
Co-authored-by: Daniel Standish
<[email protected]>
* simplify safe parse
* use outer join if no event filters
* fix case statement
* fix case statement
* fix mypy errors
Co-authored-by: blag <[email protected]>
Co-authored-by: Daniel Standish
<[email protected]>
---
airflow/www/static/js/api/useDatasets.ts | 20 ++++++--
airflow/www/static/js/datasets/List.tsx | 57 +++++++++++++++++++++-
airflow/www/views.py | 62 +++++++++++++++++-------
tests/www/views/test_views_dataset.py | 81 ++++++++++++++++++++++++++++++++
4 files changed, 198 insertions(+), 22 deletions(-)
diff --git a/airflow/www/static/js/api/useDatasets.ts
b/airflow/www/static/js/api/useDatasets.ts
index ae92ffacbd..6c459a15b4 100644
--- a/airflow/www/static/js/api/useDatasets.ts
+++ b/airflow/www/static/js/api/useDatasets.ts
@@ -22,33 +22,47 @@ import { useQuery } from 'react-query';
import { getMetaValue } from 'src/utils';
import type { DatasetListItem } from 'src/types';
+import type { unitOfTime } from 'moment';
interface DatasetsData {
datasets: DatasetListItem[];
totalEntries: number;
}
+export interface DateOption {
+ count: number;
+ unit: unitOfTime.DurationConstructor;
+}
+
interface Props {
limit?: number;
offset?: number;
order?: string;
uri?: string;
+ updatedAfter?: DateOption;
}
export default function useDatasets({
- limit, offset, order, uri,
+ limit, offset, order, uri, updatedAfter,
}: Props) {
const query = useQuery(
- ['datasets', limit, offset, order, uri],
+ ['datasets', limit, offset, order, uri, updatedAfter],
() => {
const datasetsUrl = getMetaValue('datasets_api');
const orderParam = order ? { order_by: order } : {};
const uriParam = uri ? { uri_pattern: uri } : {};
+ const updatedAfterParam = updatedAfter && updatedAfter.count &&
updatedAfter.unit
+ ? { updated_after: moment().subtract(updatedAfter.count,
updatedAfter.unit).toISOString() }
+ : {};
return axios.get<AxiosResponse, DatasetsData>(
datasetsUrl,
{
params: {
- offset, limit, ...orderParam, ...uriParam,
+ offset,
+ limit,
+ ...orderParam,
+ ...uriParam,
+ ...updatedAfterParam,
},
},
);
diff --git a/airflow/www/static/js/datasets/List.tsx
b/airflow/www/static/js/datasets/List.tsx
index e242517710..36604b0963 100644
--- a/airflow/www/static/js/datasets/List.tsx
+++ b/airflow/www/static/js/datasets/List.tsx
@@ -29,6 +29,8 @@ import {
InputLeftElement,
InputRightElement,
IconButton,
+ ButtonGroup,
+ Button,
} from '@chakra-ui/react';
import { snakeCase } from 'lodash';
import type { Row, SortingRule } from 'react-table';
@@ -39,6 +41,7 @@ import { useDatasets } from 'src/api';
import { Table, TimeCell } from 'src/components/Table';
import type { API } from 'src/types';
import { getMetaValue } from 'src/utils';
+import type { DateOption } from 'src/api/useDatasets';
interface Props {
onSelect: (datasetId: string) => void;
@@ -68,14 +71,25 @@ const DetailCell = ({ cell: { row } }: CellProps) => {
};
const SEARCH_PARAM = 'search';
+const DATE_FILTER_PARAM = 'updated_within';
+
+const dateOptions: Record<string, DateOption> = {
+ month: { count: 30, unit: 'days' },
+ week: { count: 7, unit: 'days' },
+ day: { count: 24, unit: 'hours' },
+ hour: { count: 1, unit: 'hour' },
+};
const DatasetsList = ({ onSelect }: Props) => {
const limit = 25;
const [offset, setOffset] = useState(0);
+
const [searchParams, setSearchParams] = useSearchParams();
+
const search = decodeURIComponent(searchParams.get(SEARCH_PARAM) || '');
- const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id:
'lastDatasetUpdate', desc: true }]);
+ const dateFilter = searchParams.get(DATE_FILTER_PARAM) || undefined;
+ const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id:
'lastDatasetUpdate', desc: true }]);
const sort = sortBy[0];
const order = sort ? `${sort.desc ? '-' : ''}${snakeCase(sort.id)}` : '';
const uri = search.length > 2 ? search : undefined;
@@ -85,6 +99,7 @@ const DatasetsList = ({ onSelect }: Props) => {
offset,
order,
uri,
+ updatedAfter: dateFilter ? dateOptions[dateFilter] : undefined,
});
const columns = useMemo(
@@ -132,7 +147,7 @@ const DatasetsList = ({ onSelect }: Props) => {
Datasets
</Heading>
</Flex>
- {!datasets.length && !isLoading && !search && (
+ {!datasets.length && !isLoading && !search && !dateFilter && (
<Text mb={4} data-testid="no-datasets-msg">
Looks like you do not have any datasets yet. Check out the
{' '}
@@ -141,6 +156,44 @@ const DatasetsList = ({ onSelect }: Props) => {
to learn how to create a dataset.
</Text>
)}
+ <Flex>
+ <Text mr={2}>Filter datasets with updates in the past:</Text>
+ <ButtonGroup size="sm" isAttached variant="outline">
+ <Button
+ onClick={() => {
+ searchParams.delete(DATE_FILTER_PARAM);
+ setSearchParams(searchParams);
+ }}
+ variant={!dateFilter ? 'solid' : 'outline'}
+ fontWeight={!dateFilter ? 'bold' : 'normal'}
+ >
+ All Time
+ </Button>
+ {Object.keys(dateOptions).map((option) => {
+ const filter = dateOptions[option];
+ const isSelected = option === dateFilter;
+ return (
+ <Button
+ key={option}
+ onClick={() => {
+ if (isSelected) {
+ searchParams.delete(DATE_FILTER_PARAM);
+ } else {
+ searchParams.set(DATE_FILTER_PARAM, option);
+ }
+ setSearchParams(searchParams);
+ }}
+ variant={isSelected ? 'solid' : 'outline'}
+ fontWeight={isSelected ? 'bold' : 'normal'}
+ >
+ {filter.count}
+ {' '}
+ {filter.unit}
+ </Button>
+ );
+ })}
+ </ButtonGroup>
+ </Flex>
<InputGroup my={2} px={1}>
<InputLeftElement pointerEvents="none">
<MdSearch />
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 67f7b327dd..ad7cc5104c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -71,7 +71,7 @@ from pendulum.datetime import DateTime
from pendulum.parsing.exceptions import ParserError
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
-from sqlalchemy import Date, and_, desc, distinct, func, inspect, union_all
+from sqlalchemy import Date, and_, case, desc, func, inspect, union_all
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, joinedload
from wtforms import SelectField, validators
@@ -237,8 +237,15 @@ def get_date_time_num_runs_dag_runs_form_data(www_request,
session, dag):
}
-def _safe_parse_datetime(v):
- """Parse datetime and return error message for invalid dates"""
+def _safe_parse_datetime(v, allow_empty=False):
+ """
+ Parse datetime and return error message for invalid dates
+
+ :param v: the string value to be parsed
+ :param allow_empty: Set True to return none if empty str or None
+ """
+ if allow_empty is True and not v:
+ return None
try:
return timezone.parse(v)
except (TypeError, ParserError):
@@ -3521,11 +3528,19 @@ class Airflow(AirflowBaseView):
"""
allowed_attrs = ['uri', 'last_dataset_update']
+ # Grab query parameters
limit = int(request.args.get("limit", 25))
offset = int(request.args.get("offset", 0))
order_by = request.args.get("order_by", "uri")
uri_pattern = request.args.get("uri_pattern", "")
lstripped_orderby = order_by.lstrip('-')
+ updated_after =
_safe_parse_datetime(request.args.get("updated_after"), allow_empty=True)
+ updated_before =
_safe_parse_datetime(request.args.get("updated_before"), allow_empty=True)
+
+ # Check and clean up query parameters
+ limit = 50 if limit > 50 else limit
+
+ uri_pattern = uri_pattern[:4000]
if lstripped_orderby not in allowed_attrs:
return {
@@ -3535,8 +3550,6 @@ class Airflow(AirflowBaseView):
)
}, 400
- limit = 50 if limit > 50 else limit
-
with create_session() as session:
if lstripped_orderby == "uri":
if order_by[0] == "-":
@@ -3559,28 +3572,43 @@ class Airflow(AirflowBaseView):
if session.bind.dialect.name == "postgresql":
order_by = (order_by[0].nulls_first(), *order_by[1:])
- total_entries = session.query(func.count(DatasetModel.id)).scalar()
+ count_query = session.query(func.count(DatasetModel.id))
+
+ has_event_filters = bool(updated_before or updated_after)
- datasets = [
- dict(dataset)
- for dataset in session.query(
+ query = (
+ session.query(
DatasetModel.id,
DatasetModel.uri,
func.max(DatasetEvent.timestamp).label("last_dataset_update"),
-
func.count(distinct(DatasetEvent.id)).label("total_updates"),
+ func.sum(case((DatasetEvent.id.is_not(None), 1),
else_=0)).label("total_updates"),
)
- .outerjoin(DatasetEvent, DatasetEvent.dataset_id ==
DatasetModel.id)
+ .join(DatasetEvent, DatasetEvent.dataset_id ==
DatasetModel.id, isouter=not has_event_filters)
.group_by(
DatasetModel.id,
DatasetModel.uri,
)
- .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
.order_by(*order_by)
- .offset(offset)
- .limit(limit)
- .all()
- ]
- data = {"datasets": datasets, "total_entries": total_entries}
+ )
+
+ if has_event_filters:
+ count_query = count_query.join(DatasetEvent,
DatasetEvent.dataset_id == DatasetModel.id)
+
+ filters = []
+ if uri_pattern:
+ filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+ if updated_after:
+ filters.append(DatasetEvent.timestamp >= updated_after)
+ if updated_before:
+ filters.append(DatasetEvent.timestamp <= updated_before)
+
+ query = query.filter(*filters)
+ count_query = count_query.filter(*filters)
+
+ query = query.offset(offset).limit(limit)
+
+ datasets = [dict(dataset) for dataset in query.all()]
+ data = {"datasets": datasets, "total_entries":
count_query.scalar()}
return (
htmlsafe_json_dumps(data, separators=(',', ':'),
cls=utils_json.AirflowJsonEncoder),
diff --git a/tests/www/views/test_views_dataset.py
b/tests/www/views/test_views_dataset.py
index a5eb6e23f4..855b2618c9 100644
--- a/tests/www/views/test_views_dataset.py
+++ b/tests/www/views/test_views_dataset.py
@@ -89,6 +89,64 @@ class TestGetDatasets(TestDatasetEndpoint):
msg = "Ordering with 'fake' is disallowed or the attribute does not
exist on the model"
assert response.json['detail'] == msg
+ def test_order_by_raises_400_for_invalid_datetimes(self, admin_client,
session):
+ datasets = [
+ DatasetModel(
+ uri=f"s3://bucket/key/{i}",
+ )
+ for i in [1, 2]
+ ]
+ session.add_all(datasets)
+ session.commit()
+ assert session.query(DatasetModel).count() == 2
+
+ response =
admin_client.get("/object/datasets_summary?updated_before=null")
+
+ assert response.status_code == 400
+ assert "Invalid datetime:" in response.text
+
+ response =
admin_client.get("/object/datasets_summary?updated_after=null")
+
+ assert response.status_code == 400
+ assert "Invalid datetime:" in response.text
+
+ def test_filter_by_datetimes(self, admin_client, session):
+ today = pendulum.today('UTC')
+
+ datasets = [
+ DatasetModel(
+ id=i,
+ uri=f"s3://bucket/key/{i}",
+ )
+ for i in range(1, 4)
+ ]
+ session.add_all(datasets)
+ # Update datasets, one per day, starting with datasets[0], ending with
datasets[2]
+ dataset_events = [
+ DatasetEvent(
+ dataset_id=datasets[i].id,
+ timestamp=today.add(days=-len(datasets) + i + 1),
+ )
+ for i in range(len(datasets))
+ ]
+ session.add_all(dataset_events)
+ session.commit()
+ assert session.query(DatasetModel).count() == len(datasets)
+
+ cutoff = today.add(days=-1).add(minutes=-5).to_iso8601_string()
+ response =
admin_client.get(f"/object/datasets_summary?updated_after={cutoff}")
+
+ assert response.status_code == 200
+ assert response.json['total_entries'] == 2
+ assert [json_dict['id'] for json_dict in response.json['datasets']] ==
[2, 3]
+
+ cutoff = today.add(days=-1).add(minutes=5).to_iso8601_string()
+ response =
admin_client.get(f"/object/datasets_summary?updated_before={cutoff}")
+
+ assert response.status_code == 200
+ assert response.json['total_entries'] == 2
+ assert [json_dict['id'] for json_dict in response.json['datasets']] ==
[1, 2]
+
@pytest.mark.parametrize(
"order_by, ordered_dataset_ids",
[
@@ -157,6 +215,29 @@ class TestGetDatasets(TestDatasetEndpoint):
"total_updates": 0,
},
],
+ "total_entries": 1,
+ }
+
+ uri_pattern = 's3://bucket/key_'
+ response =
admin_client.get(f"/object/datasets_summary?uri_pattern={uri_pattern}")
+
+ assert response.status_code == 200
+ response_data = response.json
+ assert response_data == {
+ "datasets": [
+ {
+ "id": 1,
+ "uri": "s3://bucket/key_1",
+ "last_dataset_update": None,
+ "total_updates": 0,
+ },
+ {
+ "id": 2,
+ "uri": "s3://bucket/key_2",
+ "last_dataset_update": None,
+ "total_updates": 0,
+ },
+ ],
"total_entries": 2,
}