This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3813ff9eb8 Filtering datasets by recent update events (#26942)
3813ff9eb8 is described below

commit 3813ff9eb8cded5be0cb51cc9e091eace1d156e4
Author: Brent Bovenzi <[email protected]>
AuthorDate: Thu Oct 13 17:00:36 2022 -0400

    Filtering datasets by recent update events (#26942)
    
    * Enable filtering on datasets by last updated time
    
    * Add tests for filtering on dataset last updated time
    
    * use updated_after to filter datasets by recent updates
    
    * change button copy and add to url params
    
    * Only count objects in the filtered query not the total number of datasets
    
    * Only count datasets after filter with DatasetEvent.timestamp
    
    * Tweak the filtering for and counting with uri_pattern
    
    * clean up filtering logic
    
    * apply filters to query and count_query
    
    * Update airflow/www/views.py
    
    Co-authored-by: Daniel Standish 
<[email protected]>
    
    * simplify safe parse
    
    * use outer join if no event filters
    
    * fix case statement
    
    * fix case statement
    
    * fix mypy errors
    
    Co-authored-by: blag <[email protected]>
    Co-authored-by: Daniel Standish 
<[email protected]>
---
 airflow/www/static/js/api/useDatasets.ts | 20 ++++++--
 airflow/www/static/js/datasets/List.tsx  | 57 +++++++++++++++++++++-
 airflow/www/views.py                     | 62 +++++++++++++++++-------
 tests/www/views/test_views_dataset.py    | 81 ++++++++++++++++++++++++++++++++
 4 files changed, 198 insertions(+), 22 deletions(-)

diff --git a/airflow/www/static/js/api/useDatasets.ts 
b/airflow/www/static/js/api/useDatasets.ts
index ae92ffacbd..6c459a15b4 100644
--- a/airflow/www/static/js/api/useDatasets.ts
+++ b/airflow/www/static/js/api/useDatasets.ts
@@ -22,33 +22,47 @@ import { useQuery } from 'react-query';
 
 import { getMetaValue } from 'src/utils';
 import type { DatasetListItem } from 'src/types';
+import type { unitOfTime } from 'moment';
 
 interface DatasetsData {
   datasets: DatasetListItem[];
   totalEntries: number;
 }
 
+export interface DateOption {
+  count: number;
+  unit: unitOfTime.DurationConstructor;
+}
+
 interface Props {
   limit?: number;
   offset?: number;
   order?: string;
   uri?: string;
+  updatedAfter?: DateOption;
 }
 
 export default function useDatasets({
-  limit, offset, order, uri,
+  limit, offset, order, uri, updatedAfter,
 }: Props) {
   const query = useQuery(
-    ['datasets', limit, offset, order, uri],
+    ['datasets', limit, offset, order, uri, updatedAfter],
     () => {
       const datasetsUrl = getMetaValue('datasets_api');
       const orderParam = order ? { order_by: order } : {};
       const uriParam = uri ? { uri_pattern: uri } : {};
+      const updatedAfterParam = updatedAfter && updatedAfter.count && 
updatedAfter.unit
+        ? { updated_after: moment().subtract(updatedAfter.count, 
updatedAfter.unit).toISOString() }
+        : {};
       return axios.get<AxiosResponse, DatasetsData>(
         datasetsUrl,
         {
           params: {
-            offset, limit, ...orderParam, ...uriParam,
+            offset,
+            limit,
+            ...orderParam,
+            ...uriParam,
+            ...updatedAfterParam,
           },
         },
       );
diff --git a/airflow/www/static/js/datasets/List.tsx 
b/airflow/www/static/js/datasets/List.tsx
index e242517710..36604b0963 100644
--- a/airflow/www/static/js/datasets/List.tsx
+++ b/airflow/www/static/js/datasets/List.tsx
@@ -29,6 +29,8 @@ import {
   InputLeftElement,
   InputRightElement,
   IconButton,
+  ButtonGroup,
+  Button,
 } from '@chakra-ui/react';
 import { snakeCase } from 'lodash';
 import type { Row, SortingRule } from 'react-table';
@@ -39,6 +41,7 @@ import { useDatasets } from 'src/api';
 import { Table, TimeCell } from 'src/components/Table';
 import type { API } from 'src/types';
 import { getMetaValue } from 'src/utils';
+import type { DateOption } from 'src/api/useDatasets';
 
 interface Props {
   onSelect: (datasetId: string) => void;
@@ -68,14 +71,25 @@ const DetailCell = ({ cell: { row } }: CellProps) => {
 };
 
 const SEARCH_PARAM = 'search';
+const DATE_FILTER_PARAM = 'updated_within';
+
+const dateOptions: Record<string, DateOption> = {
+  month: { count: 30, unit: 'days' },
+  week: { count: 7, unit: 'days' },
+  day: { count: 24, unit: 'hours' },
+  hour: { count: 1, unit: 'hour' },
+};
 
 const DatasetsList = ({ onSelect }: Props) => {
   const limit = 25;
   const [offset, setOffset] = useState(0);
+
   const [searchParams, setSearchParams] = useSearchParams();
+
   const search = decodeURIComponent(searchParams.get(SEARCH_PARAM) || '');
-  const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id: 
'lastDatasetUpdate', desc: true }]);
+  const dateFilter = searchParams.get(DATE_FILTER_PARAM) || undefined;
 
+  const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id: 
'lastDatasetUpdate', desc: true }]);
   const sort = sortBy[0];
   const order = sort ? `${sort.desc ? '-' : ''}${snakeCase(sort.id)}` : '';
   const uri = search.length > 2 ? search : undefined;
@@ -85,6 +99,7 @@ const DatasetsList = ({ onSelect }: Props) => {
     offset,
     order,
     uri,
+    updatedAfter: dateFilter ? dateOptions[dateFilter] : undefined,
   });
 
   const columns = useMemo(
@@ -132,7 +147,7 @@ const DatasetsList = ({ onSelect }: Props) => {
           Datasets
         </Heading>
       </Flex>
-      {!datasets.length && !isLoading && !search && (
+      {!datasets.length && !isLoading && !search && !dateFilter && (
         <Text mb={4} data-testid="no-datasets-msg">
           Looks like you do not have any datasets yet. Check out the
           {' '}
@@ -141,6 +156,44 @@ const DatasetsList = ({ onSelect }: Props) => {
           to learn how to create a dataset.
         </Text>
       )}
+      <Flex>
+        <Text mr={2}>Filter datasets with updates in the past:</Text>
+        <ButtonGroup size="sm" isAttached variant="outline">
+          <Button
+            onClick={() => {
+              searchParams.delete(DATE_FILTER_PARAM);
+              setSearchParams(searchParams);
+            }}
+            variant={!dateFilter ? 'solid' : 'outline'}
+            fontWeight={!dateFilter ? 'bold' : 'normal'}
+          >
+            All Time
+          </Button>
+          {Object.keys(dateOptions).map((option) => {
+            const filter = dateOptions[option];
+            const isSelected = option === dateFilter;
+            return (
+              <Button
+                key={option}
+                onClick={() => {
+                  if (isSelected) {
+                    searchParams.delete(DATE_FILTER_PARAM);
+                  } else {
+                    searchParams.set(DATE_FILTER_PARAM, option);
+                  }
+                  setSearchParams(searchParams);
+                }}
+                variant={isSelected ? 'solid' : 'outline'}
+                fontWeight={isSelected ? 'bold' : 'normal'}
+              >
+                {filter.count}
+                {' '}
+                {filter.unit}
+              </Button>
+            );
+          })}
+        </ButtonGroup>
+      </Flex>
       <InputGroup my={2} px={1}>
         <InputLeftElement pointerEvents="none">
           <MdSearch />
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 67f7b327dd..ad7cc5104c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -71,7 +71,7 @@ from pendulum.datetime import DateTime
 from pendulum.parsing.exceptions import ParserError
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
-from sqlalchemy import Date, and_, desc, distinct, func, inspect, union_all
+from sqlalchemy import Date, and_, case, desc, func, inspect, union_all
 from sqlalchemy.exc import IntegrityError
 from sqlalchemy.orm import Session, joinedload
 from wtforms import SelectField, validators
@@ -237,8 +237,15 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, 
session, dag):
     }
 
 
-def _safe_parse_datetime(v):
-    """Parse datetime and return error message for invalid dates"""
+def _safe_parse_datetime(v, allow_empty=False):
+    """
+    Parse datetime and return error message for invalid dates
+
+    :param v: the string value to be parsed
+    :param allow_empty: Set True to return none if empty str or None
+    """
+    if allow_empty is True and not v:
+        return None
     try:
         return timezone.parse(v)
     except (TypeError, ParserError):
@@ -3521,11 +3528,19 @@ class Airflow(AirflowBaseView):
         """
         allowed_attrs = ['uri', 'last_dataset_update']
 
+        # Grab query parameters
         limit = int(request.args.get("limit", 25))
         offset = int(request.args.get("offset", 0))
         order_by = request.args.get("order_by", "uri")
         uri_pattern = request.args.get("uri_pattern", "")
         lstripped_orderby = order_by.lstrip('-')
+        updated_after = 
_safe_parse_datetime(request.args.get("updated_after"), allow_empty=True)
+        updated_before = 
_safe_parse_datetime(request.args.get("updated_before"), allow_empty=True)
+
+        # Check and clean up query parameters
+        limit = 50 if limit > 50 else limit
+
+        uri_pattern = uri_pattern[:4000]
 
         if lstripped_orderby not in allowed_attrs:
             return {
@@ -3535,8 +3550,6 @@ class Airflow(AirflowBaseView):
                 )
             }, 400
 
-        limit = 50 if limit > 50 else limit
-
         with create_session() as session:
             if lstripped_orderby == "uri":
                 if order_by[0] == "-":
@@ -3559,28 +3572,43 @@ class Airflow(AirflowBaseView):
                     if session.bind.dialect.name == "postgresql":
                         order_by = (order_by[0].nulls_first(), *order_by[1:])
 
-            total_entries = session.query(func.count(DatasetModel.id)).scalar()
+            count_query = session.query(func.count(DatasetModel.id))
+
+            has_event_filters = bool(updated_before or updated_after)
 
-            datasets = [
-                dict(dataset)
-                for dataset in session.query(
+            query = (
+                session.query(
                     DatasetModel.id,
                     DatasetModel.uri,
                     
func.max(DatasetEvent.timestamp).label("last_dataset_update"),
-                    
func.count(distinct(DatasetEvent.id)).label("total_updates"),
+                    func.sum(case((DatasetEvent.id.is_not(None), 1), 
else_=0)).label("total_updates"),
                 )
-                .outerjoin(DatasetEvent, DatasetEvent.dataset_id == 
DatasetModel.id)
+                .join(DatasetEvent, DatasetEvent.dataset_id == 
DatasetModel.id, isouter=not has_event_filters)
                 .group_by(
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if has_event_filters:
+                count_query = count_query.join(DatasetEvent, 
DatasetEvent.dataset_id == DatasetModel.id)
+
+            filters = []
+            if uri_pattern:
+                filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+            if updated_after:
+                filters.append(DatasetEvent.timestamp >= updated_after)
+            if updated_before:
+                filters.append(DatasetEvent.timestamp <= updated_before)
+
+            query = query.filter(*filters)
+            count_query = count_query.filter(*filters)
+
+            query = query.offset(offset).limit(limit)
+
+            datasets = [dict(dataset) for dataset in query.all()]
+            data = {"datasets": datasets, "total_entries": 
count_query.scalar()}
 
             return (
                 htmlsafe_json_dumps(data, separators=(',', ':'), 
cls=utils_json.AirflowJsonEncoder),
diff --git a/tests/www/views/test_views_dataset.py 
b/tests/www/views/test_views_dataset.py
index a5eb6e23f4..855b2618c9 100644
--- a/tests/www/views/test_views_dataset.py
+++ b/tests/www/views/test_views_dataset.py
@@ -89,6 +89,64 @@ class TestGetDatasets(TestDatasetEndpoint):
         msg = "Ordering with 'fake' is disallowed or the attribute does not 
exist on the model"
         assert response.json['detail'] == msg
 
+    def test_order_by_raises_400_for_invalid_datetimes(self, admin_client, 
session):
+        datasets = [
+            DatasetModel(
+                uri=f"s3://bucket/key/{i}",
+            )
+            for i in [1, 2]
+        ]
+        session.add_all(datasets)
+        session.commit()
+        assert session.query(DatasetModel).count() == 2
+
+        response = 
admin_client.get("/object/datasets_summary?updated_before=null")
+
+        assert response.status_code == 400
+        assert "Invalid datetime:" in response.text
+
+        response = 
admin_client.get("/object/datasets_summary?updated_after=null")
+
+        assert response.status_code == 400
+        assert "Invalid datetime:" in response.text
+
+    def test_filter_by_datetimes(self, admin_client, session):
+        today = pendulum.today('UTC')
+
+        datasets = [
+            DatasetModel(
+                id=i,
+                uri=f"s3://bucket/key/{i}",
+            )
+            for i in range(1, 4)
+        ]
+        session.add_all(datasets)
+        # Update datasets, one per day, starting with datasets[0], ending with 
datasets[2]
+        dataset_events = [
+            DatasetEvent(
+                dataset_id=datasets[i].id,
+                timestamp=today.add(days=-len(datasets) + i + 1),
+            )
+            for i in range(len(datasets))
+        ]
+        session.add_all(dataset_events)
+        session.commit()
+        assert session.query(DatasetModel).count() == len(datasets)
+
+        cutoff = today.add(days=-1).add(minutes=-5).to_iso8601_string()
+        response = 
admin_client.get(f"/object/datasets_summary?updated_after={cutoff}")
+
+        assert response.status_code == 200
+        assert response.json['total_entries'] == 2
+        assert [json_dict['id'] for json_dict in response.json['datasets']] == 
[2, 3]
+
+        cutoff = today.add(days=-1).add(minutes=5).to_iso8601_string()
+        response = 
admin_client.get(f"/object/datasets_summary?updated_before={cutoff}")
+
+        assert response.status_code == 200
+        assert response.json['total_entries'] == 2
+        assert [json_dict['id'] for json_dict in response.json['datasets']] == 
[1, 2]
+
     @pytest.mark.parametrize(
         "order_by, ordered_dataset_ids",
         [
@@ -157,6 +215,29 @@ class TestGetDatasets(TestDatasetEndpoint):
                     "total_updates": 0,
                 },
             ],
+            "total_entries": 1,
+        }
+
+        uri_pattern = 's3://bucket/key_'
+        response = 
admin_client.get(f"/object/datasets_summary?uri_pattern={uri_pattern}")
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "datasets": [
+                {
+                    "id": 1,
+                    "uri": "s3://bucket/key_1",
+                    "last_dataset_update": None,
+                    "total_updates": 0,
+                },
+                {
+                    "id": 2,
+                    "uri": "s3://bucket/key_2",
+                    "last_dataset_update": None,
+                    "total_updates": 0,
+                },
+            ],
             "total_entries": 2,
         }
 

Reply via email to