This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 876536ea3c Fix dataset_event_manager resolution (#25943)
876536ea3c is described below
commit 876536ea3c45d5f15fcfbe81eda3ee01a101faa3
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Aug 25 15:31:18 2022 -0700
Fix dataset_event_manager resolution (#25943)
Appears `__init__` is not invoked as part of `_run_raw_task` due to the way
TI is refreshed from db. Centralize dataset manager instantiation instead.
---
airflow/config_templates/config.yml | 12 ++++++++++++
airflow/config_templates/default_airflow.cfg | 8 ++++++++
airflow/datasets/manager.py | 29 ++++++++++++++++++++++++++--
airflow/models/taskinstance.py | 7 ++-----
4 files changed, 49 insertions(+), 7 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index c3970dc018..a1807e5bb1 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -386,6 +386,18 @@
type: string
default: "0o077"
example: ~
+ - name: dataset_event_manager_class
+ description: Class to use as dataset event manager.
+ version_added: 2.4.0
+ type: string
+ default: ~
+ example: 'airflow.datasets.manager.DatasetEventManager'
+ - name: dataset_event_manager_kwargs
+ description: Kwargs to supply to dataset event manager.
+ version_added: 2.4.0
+ type: string
+ default: ~
+ example: '{"some_param": "some_value"}'
- name: database
description: ~
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 90fa6e0df3..4944d32f0e 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -220,6 +220,14 @@ max_map_length = 1024
# This value is treated as an octal-integer.
daemon_umask = 0o077
+# Class to use as dataset event manager.
+# Example: dataset_event_manager_class =
airflow.datasets.manager.DatasetEventManager
+# dataset_event_manager_class =
+
+# Kwargs to supply to dataset event manager.
+# Example: dataset_event_manager_kwargs = {{"some_param": "some_value"}}
+# dataset_event_manager_kwargs =
+
[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index a8be553052..e16e392515 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -15,13 +15,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from typing import TYPE_CHECKING
+
from sqlalchemy.orm.session import Session
+from airflow.configuration import conf
from airflow.datasets import Dataset
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent,
DatasetModel
-from airflow.models.taskinstance import TaskInstance
from airflow.utils.log.logging_mixin import LoggingMixin
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+
class DatasetEventManager(LoggingMixin):
"""
@@ -31,8 +36,11 @@ class DatasetEventManager(LoggingMixin):
Airflow deployments can use plugins that broadcast dataset events to each
other.
"""
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+
def register_dataset_change(
- self, *, task_instance: TaskInstance, dataset: Dataset, extra=None,
session: Session, **kwargs
+ self, *, task_instance: "TaskInstance", dataset: Dataset, extra=None,
session: Session, **kwargs
) -> None:
"""
For local datasets, look them up, record the dataset event, queue
dagruns, and broadcast
@@ -59,3 +67,20 @@ class DatasetEventManager(LoggingMixin):
self.log.debug("consuming dag ids %s", consuming_dag_ids)
for dag_id in consuming_dag_ids:
session.merge(DatasetDagRunQueue(dataset_id=dataset.id,
target_dag_id=dag_id))
+
+
+def resolve_dataset_event_manager():
+ _dataset_event_manager_class = conf.getimport(
+ section='core',
+ key='dataset_event_manager_class',
+ fallback='airflow.datasets.manager.DatasetEventManager',
+ )
+ _dataset_event_manager_kwargs = conf.getjson(
+ section='core',
+ key='dataset_event_manager_kwargs',
+ fallback={},
+ )
+ return _dataset_event_manager_class(**_dataset_event_manager_kwargs)
+
+
+dataset_event_manager = resolve_dataset_event_manager()
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5912094c25..c810f5e5ff 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -81,6 +81,7 @@ from airflow import settings
from airflow.compat.functools import cache
from airflow.configuration import conf
from airflow.datasets import Dataset
+from airflow.datasets.manager import dataset_event_manager
from airflow.exceptions import (
AirflowException,
AirflowFailException,
@@ -585,10 +586,6 @@ class TaskInstance(Base, LoggingMixin):
# can be changed when calling 'run'
self.test_mode = False
- self.dataset_event_manager = conf.getimport(
- 'core', 'dataset_event_manager_class',
fallback='airflow.datasets.manager.DatasetEventManager'
- )()
-
@staticmethod
def insert_mapping(run_id: str, task: "Operator", map_index: int) -> dict:
""":meta private:"""
@@ -1538,7 +1535,7 @@ class TaskInstance(Base, LoggingMixin):
self.log.debug("outlet obj %s", obj)
# Lineage can have other types of objects besides datasets
if isinstance(obj, Dataset):
- self.dataset_event_manager.register_dataset_change(
+ dataset_event_manager.register_dataset_change(
task_instance=self,
dataset=obj,
session=session,