This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 722a719769 Rename `created_at` to `timestamp` in DatasetEvent (#25292)
722a719769 is described below
commit 722a7197693583e8c0fbc191cdee33f3556baa06
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Jul 25 21:04:37 2022 +0100
Rename `created_at` to `timestamp` in DatasetEvent (#25292)
Timestamp seems more appropriate
---
airflow/api_connexion/endpoints/dag_run_endpoint.py | 6 +++---
airflow/api_connexion/endpoints/dataset_endpoint.py | 4 ++--
airflow/api_connexion/openapi/v1.yaml | 2 +-
airflow/api_connexion/schemas/dataset_schema.py | 2 +-
.../versions/0114_2_4_0_add_dataset_model.py | 4 ++--
airflow/models/dataset.py | 7 ++++---
airflow/www/static/js/datasets/Details.tsx | 4 ++--
airflow/www/static/js/types/api-generated.ts | 2 +-
.../api_connexion/endpoints/test_dag_run_endpoint.py | 20 +++++++++-----------
.../api_connexion/endpoints/test_dataset_endpoint.py | 18 +++++++++---------
tests/api_connexion/schemas/test_dataset_schema.py | 10 +++++-----
11 files changed, 39 insertions(+), 40 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index be6feb23f3..30e01eedec 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -139,15 +139,15 @@ def _get_upstream_dataset_events(*, dag_run: DagRun,
session: Session) -> List["
dataset_event_filters = [
DatasetDagRef.dag_id == dag_run.dag_id,
- DatasetEvent.created_at <= dag_run.execution_date,
+ DatasetEvent.timestamp <= dag_run.execution_date,
]
if previous_dag_run:
- dataset_event_filters.append(DatasetEvent.created_at >
previous_dag_run.execution_date)
+ dataset_event_filters.append(DatasetEvent.timestamp >
previous_dag_run.execution_date)
dataset_events = (
session.query(DatasetEvent)
.join(DatasetDagRef, DatasetEvent.dataset_id ==
DatasetDagRef.dataset_id)
.filter(*dataset_event_filters)
- .order_by(DatasetEvent.created_at)
+ .order_by(DatasetEvent.timestamp)
.all()
)
return dataset_events
diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py
b/airflow/api_connexion/endpoints/dataset_endpoint.py
index 5c9e7606fc..0239063ca0 100644
--- a/airflow/api_connexion/endpoints/dataset_endpoint.py
+++ b/airflow/api_connexion/endpoints/dataset_endpoint.py
@@ -72,7 +72,7 @@ def get_dataset_events(
*,
limit: int,
offset: int = 0,
- order_by: str = "created_at",
+ order_by: str = "timestamp",
dataset_id: Optional[int] = None,
source_dag_id: Optional[str] = None,
source_task_id: Optional[str] = None,
@@ -81,7 +81,7 @@ def get_dataset_events(
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get dataset events"""
- allowed_attrs = ['source_dag_id', 'source_task_id', 'source_run_id',
'source_map_index', 'created_at']
+ allowed_attrs = ['source_dag_id', 'source_task_id', 'source_run_id',
'source_map_index', 'timestamp']
query = session.query(DatasetEvent)
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index 7c9d45365e..55c24d871d 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3555,7 +3555,7 @@ components:
type: integer
description: The task map index that updated the dataset.
nullable: true
- created_at:
+ timestamp:
type: string
description: The dataset event creation time
nullable: false
diff --git a/airflow/api_connexion/schemas/dataset_schema.py
b/airflow/api_connexion/schemas/dataset_schema.py
index e63f6ea7eb..06c1dc866d 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -72,7 +72,7 @@ class DatasetEventSchema(SQLAlchemySchema):
source_dag_id = auto_field()
source_run_id = auto_field()
source_map_index = auto_field()
- created_at = auto_field()
+ timestamp = auto_field()
class DatasetEventCollection(NamedTuple):
diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
index deb6c3c33f..8cc5d9dc2e 100644
--- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
+++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
@@ -128,10 +128,10 @@ def _create_dataset_event_table():
sa.Column('source_dag_id', String(250), nullable=True),
sa.Column('source_run_id', String(250), nullable=True),
sa.Column('source_map_index', sa.Integer(), nullable=True,
server_default='-1'),
- sa.Column('created_at', TIMESTAMP, nullable=False),
+ sa.Column('timestamp', TIMESTAMP, nullable=False),
sqlite_autoincrement=True, # ensures PK values not reused
)
- op.create_index('idx_dataset_id_created_at', 'dataset_event',
['dataset_id', 'created_at'])
+ op.create_index('idx_dataset_id_timestamp', 'dataset_event',
['dataset_id', 'timestamp'])
def upgrade():
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index c16f157f8c..21373106b5 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -211,6 +211,7 @@ class DatasetEvent(Base):
:param source_dag_id: the dag_id of the TI which updated the dataset
:param source_run_id: the run_id of the TI which updated the dataset
:param source_map_index: the map_index of the TI which updated the dataset
+ :param timestamp: the time the event was logged
We use relationships instead of foreign keys so that dataset events are
not deleted even
if the foreign key object is.
@@ -223,11 +224,11 @@ class DatasetEvent(Base):
source_dag_id = Column(StringID(), nullable=True)
source_run_id = Column(StringID(), nullable=True)
source_map_index = Column(Integer, nullable=True,
server_default=text("-1"))
- created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
+ timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
__tablename__ = "dataset_event"
__table_args__ = (
- Index('idx_dataset_id_created_at', dataset_id, created_at),
+ Index('idx_dataset_id_timestamp', dataset_id, timestamp),
{'sqlite_autoincrement': True}, # ensures PK values not reused
)
@@ -267,7 +268,7 @@ class DatasetEvent(Base):
def __eq__(self, other) -> bool:
if isinstance(other, self.__class__):
- return self.dataset_id == other.dataset_id and self.created_at ==
other.created_at
+ return self.dataset_id == other.dataset_id and self.timestamp ==
other.timestamp
else:
return NotImplemented
diff --git a/airflow/www/static/js/datasets/Details.tsx
b/airflow/www/static/js/datasets/Details.tsx
index 3bbee2a818..a99efdd42f 100644
--- a/airflow/www/static/js/datasets/Details.tsx
+++ b/airflow/www/static/js/datasets/Details.tsx
@@ -67,8 +67,8 @@ const DatasetDetails = ({ datasetId, onBack }: Props) => {
const columns = useMemo(
() => [
{
- Header: 'Created At',
- accessor: 'createdAt',
+ Header: 'Timestamp',
+ accessor: 'timestamp',
Cell: TimeCell,
},
{
diff --git a/airflow/www/static/js/types/api-generated.ts
b/airflow/www/static/js/types/api-generated.ts
index c798721eb1..bccbd79a23 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1499,7 +1499,7 @@ export interface components {
/** @description The task map index that updated the dataset. */
source_map_index?: number | null;
/** @description The dataset event creation time */
- created_at?: string;
+ timestamp?: string;
};
/**
* @description A collection of dataset events.
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 1a25475df1..3b65450b99 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1543,8 +1543,8 @@ def
test__get_upstream_dataset_events_with_prior(configured_app):
first_timestamp = pendulum.datetime(2022, 1, 1, tz='UTC')
session.add_all(
[
- DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
- DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+ DatasetEvent(dataset_id=dataset1a.id, timestamp=first_timestamp),
+ DatasetEvent(dataset_id=dataset1b.id, timestamp=first_timestamp),
]
)
dr1 = DagRun(
@@ -1557,9 +1557,9 @@ def
test__get_upstream_dataset_events_with_prior(configured_app):
session.add(dr1)
session.add_all(
[
- DatasetEvent(dataset_id=dataset1a.id,
created_at=first_timestamp.add(microseconds=2000)),
- DatasetEvent(dataset_id=dataset1b.id,
created_at=first_timestamp.add(microseconds=3000)),
- DatasetEvent(dataset_id=dataset1b.id,
created_at=first_timestamp.add(microseconds=4000)),
+ DatasetEvent(dataset_id=dataset1a.id,
timestamp=first_timestamp.add(microseconds=2000)),
+ DatasetEvent(dataset_id=dataset1b.id,
timestamp=first_timestamp.add(microseconds=3000)),
+ DatasetEvent(dataset_id=dataset1b.id,
timestamp=first_timestamp.add(microseconds=4000)),
]
)
dr2 = DagRun( # this dag run should be ignored
@@ -1578,15 +1578,13 @@ def
test__get_upstream_dataset_events_with_prior(configured_app):
)
dr3.dag = dag2
session.add(dr3)
- session.add_all(
- [DatasetEvent(dataset_id=dataset1a.id,
created_at=first_timestamp.add(microseconds=5000))]
- )
+ session.add_all([DatasetEvent(dataset_id=dataset1a.id,
timestamp=first_timestamp.add(microseconds=5000))])
session.commit()
session.expunge_all()
events = _get_upstream_dataset_events(dag_run=dr3, session=session)
- event_times = [x.created_at for x in events]
+ event_times = [x.timestamp for x in events]
assert event_times == [
first_timestamp.add(microseconds=2000),
first_timestamp.add(microseconds=3000),
@@ -1612,7 +1610,7 @@ class
TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
assert len(result) == 1
created_at = pendulum.now('UTC')
# make sure whatever is returned by this func is what comes out in
response.
- d = DatasetEvent(dataset_id=1, created_at=created_at)
+ d = DatasetEvent(dataset_id=1, timestamp=created_at)
d.dataset = Dataset(id=1, uri='hello', created_at=created_at,
updated_at=created_at)
mock_get_events.return_value = [d]
response = self.client.get(
@@ -1623,7 +1621,7 @@ class
TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
expected_response = {
'dataset_events': [
{
- 'created_at': str(created_at),
+ 'timestamp': str(created_at),
'dataset_id': 1,
'dataset_uri': d.dataset.uri,
'extra': None,
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py
b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index a06142dcbb..0d025d3c6c 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -269,7 +269,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
"source_map_index": -1,
}
- events = [DatasetEvent(id=i,
created_at=timezone.parse(self.default_time), **common) for i in [1, 2]]
+ events = [DatasetEvent(id=i,
timestamp=timezone.parse(self.default_time), **common) for i in [1, 2]]
session.add_all(events)
session.commit()
assert session.query(DatasetEvent).count() == 2
@@ -282,13 +282,13 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
"dataset_events": [
{
"id": 1,
- "created_at": self.default_time,
+ "timestamp": self.default_time,
**common,
"dataset_uri": d.uri,
},
{
"id": 2,
- "created_at": self.default_time,
+ "timestamp": self.default_time,
**common,
"dataset_uri": d.uri,
},
@@ -328,7 +328,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
source_task_id=f"task{i}",
source_run_id=f"run{i}",
source_map_index=i,
- created_at=timezone.parse(self.default_time),
+ timestamp=timezone.parse(self.default_time),
)
for i in [1, 2, 3]
]
@@ -353,7 +353,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
"source_task_id": "task2",
"source_run_id": "run2",
"source_map_index": 2,
- "created_at": self.default_time,
+ "timestamp": self.default_time,
}
],
"total_entries": 1,
@@ -369,7 +369,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
source_task_id="bar",
source_run_id="custom",
source_map_index=-1,
- created_at=timezone.parse(self.default_time),
+ timestamp=timezone.parse(self.default_time),
)
for i in [1, 2]
]
@@ -425,7 +425,7 @@ class
TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
source_task_id="bar",
source_run_id=f"run{i}",
source_map_index=-1,
- created_at=timezone.parse(self.default_time),
+ timestamp=timezone.parse(self.default_time),
)
for i in range(1, 10)
]
@@ -447,7 +447,7 @@ class
TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
source_task_id="bar",
source_run_id=f"run{i}",
source_map_index=-1,
- created_at=timezone.parse(self.default_time),
+ timestamp=timezone.parse(self.default_time),
)
for i in range(1, 110)
]
@@ -469,7 +469,7 @@ class
TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
source_task_id="bar",
source_run_id=f"run{i}",
source_map_index=-1,
- created_at=timezone.parse(self.default_time),
+ timestamp=timezone.parse(self.default_time),
)
for i in range(1, 200)
]
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py
b/tests/api_connexion/schemas/test_dataset_schema.py
index f6ed25b85c..46e2732f2e 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -111,7 +111,7 @@ class TestDatasetEventSchema(TestDatasetSchemaBase):
source_task_id="bar",
source_run_id="custom",
source_map_index=-1,
- created_at=timezone.parse(self.timestamp),
+ timestamp=timezone.parse(self.timestamp),
)
session.add(event)
session.flush()
@@ -125,7 +125,7 @@ class TestDatasetEventSchema(TestDatasetSchemaBase):
"source_task_id": "bar",
"source_run_id": "custom",
"source_map_index": -1,
- "created_at": self.timestamp,
+ "timestamp": self.timestamp,
}
@@ -140,7 +140,7 @@ class
TestDatasetEventCollectionSchema(TestDatasetSchemaBase):
"source_map_index": -1,
}
- events = [DatasetEvent(id=i,
created_at=timezone.parse(self.timestamp), **common) for i in [1, 2]]
+ events = [DatasetEvent(id=i, timestamp=timezone.parse(self.timestamp),
**common) for i in [1, 2]]
session.add_all(events)
session.flush()
serialized_data = dataset_event_collection_schema.dump(
@@ -148,8 +148,8 @@ class
TestDatasetEventCollectionSchema(TestDatasetSchemaBase):
)
assert serialized_data == {
"dataset_events": [
- {"id": 1, "created_at": self.timestamp, **common},
- {"id": 2, "created_at": self.timestamp, **common},
+ {"id": 1, "timestamp": self.timestamp, **common},
+ {"id": 2, "timestamp": self.timestamp, **common},
],
"total_entries": 2,
}