This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d356560baa Rename `DatasetEventManager` to `DatasetManager` (#26054)
d356560baa is described below
commit d356560baa5a41d4bda87e4010ea6d90855d25f3
Author: Jed Cunningham <[email protected]>
AuthorDate: Tue Aug 30 08:51:28 2022 -0700
Rename `DatasetEventManager` to `DatasetManager` (#26054)
This manager class will end up dealing with more than just dataset
events over time, so rename it to be more general.
---
airflow/config_templates/config.yml | 10 +++++-----
airflow/config_templates/default_airflow.cfg | 12 ++++++------
airflow/datasets/manager.py | 22 +++++++++++-----------
airflow/models/taskinstance.py | 4 ++--
tests/datasets/test_manager.py | 8 ++++----
5 files changed, 28 insertions(+), 28 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index a1807e5bb1..13b299d1fe 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -386,14 +386,14 @@
type: string
default: "0o077"
example: ~
- - name: dataset_event_manager_class
- description: Class to use as dataset event manager.
+ - name: dataset_manager_class
+ description: Class to use as dataset manager.
version_added: 2.4.0
type: string
default: ~
- example: 'airflow.datasets.manager.DatasetEventManager'
- - name: dataset_event_manager_kwargs
- description: Kwargs to supply to dataset event manager.
+ example: 'airflow.datasets.manager.DatasetManager'
+ - name: dataset_manager_kwargs
+ description: Kwargs to supply to dataset manager.
version_added: 2.4.0
type: string
default: ~
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 4944d32f0e..119873d9f2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -220,13 +220,13 @@ max_map_length = 1024
# This value is treated as an octal-integer.
daemon_umask = 0o077
-# Class to use as dataset event manager.
-# Example: dataset_event_manager_class =
airflow.datasets.manager.DatasetEventManager
-# dataset_event_manager_class =
+# Class to use as dataset manager.
+# Example: dataset_manager_class = airflow.datasets.manager.DatasetManager
+# dataset_manager_class =
-# Kwargs to supply to dataset event manager.
-# Example: dataset_event_manager_kwargs = {{"some_param": "some_value"}}
-# dataset_event_manager_kwargs =
+# Kwargs to supply to dataset manager.
+# Example: dataset_manager_kwargs = {{"some_param": "some_value"}}
+# dataset_manager_kwargs =
[database]
# The SqlAlchemy connection string to the metadata database.
diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index e16e392515..7becb630f7 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -28,11 +28,11 @@ if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
-class DatasetEventManager(LoggingMixin):
+class DatasetManager(LoggingMixin):
"""
- A pluggable class that manages operations for dataset events.
+ A pluggable class that manages operations for datasets.
- The intent is to have one place to handle all DatasetEvent-related
operations, so different
+ The intent is to have one place to handle all Dataset-related operations,
so different
Airflow deployments can use plugins that broadcast dataset events to each
other.
"""
@@ -69,18 +69,18 @@ class DatasetEventManager(LoggingMixin):
session.merge(DatasetDagRunQueue(dataset_id=dataset.id,
target_dag_id=dag_id))
-def resolve_dataset_event_manager():
- _dataset_event_manager_class = conf.getimport(
+def resolve_dataset_manager():
+ _dataset_manager_class = conf.getimport(
section='core',
- key='dataset_event_manager_class',
- fallback='airflow.datasets.manager.DatasetEventManager',
+ key='dataset_manager_class',
+ fallback='airflow.datasets.manager.DatasetManager',
)
- _dataset_event_manager_kwargs = conf.getjson(
+ _dataset_manager_kwargs = conf.getjson(
section='core',
- key='dataset_event_manager_kwargs',
+ key='dataset_manager_kwargs',
fallback={},
)
- return _dataset_event_manager_class(**_dataset_event_manager_kwargs)
+ return _dataset_manager_class(**_dataset_manager_kwargs)
-dataset_event_manager = resolve_dataset_event_manager()
+dataset_manager = resolve_dataset_manager()
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index c810f5e5ff..2d93898f19 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -81,7 +81,7 @@ from airflow import settings
from airflow.compat.functools import cache
from airflow.configuration import conf
from airflow.datasets import Dataset
-from airflow.datasets.manager import dataset_event_manager
+from airflow.datasets.manager import dataset_manager
from airflow.exceptions import (
AirflowException,
AirflowFailException,
@@ -1535,7 +1535,7 @@ class TaskInstance(Base, LoggingMixin):
self.log.debug("outlet obj %s", obj)
# Lineage can have other types of objects besides datasets
if isinstance(obj, Dataset):
- dataset_event_manager.register_dataset_change(
+ dataset_manager.register_dataset_change(
task_instance=self,
dataset=obj,
session=session,
diff --git a/tests/datasets/test_manager.py b/tests/datasets/test_manager.py
index 4b467ba224..e81f1a5172 100644
--- a/tests/datasets/test_manager.py
+++ b/tests/datasets/test_manager.py
@@ -21,7 +21,7 @@ from unittest import mock
import pytest
from airflow.datasets import Dataset
-from airflow.datasets.manager import DatasetEventManager
+from airflow.datasets.manager import DatasetManager
from airflow.models.dataset import DatasetModel
@@ -44,9 +44,9 @@ def create_mock_dag():
yield mock_dag
-class TestDatasetEventManager:
+class TestDatasetManager:
def test_register_dataset_change_dataset_doesnt_exist(self,
mock_task_instance):
- dsem = DatasetEventManager()
+ dsem = DatasetManager()
dataset = Dataset(uri="dataset_doesnt_exist")
@@ -62,7 +62,7 @@ class TestDatasetEventManager:
mock_session.merge.assert_not_called()
def test_register_dataset_change(self, mock_task_instance):
- dsem = DatasetEventManager()
+ dsem = DatasetManager()
mock_dag_1 = mock.MagicMock()
mock_dag_1.dag_id = 1