This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8560e9a8e3 Fix Example DAGs Test by adding one more internal API
method for DatasetAlias resolution (#41233)
8560e9a8e3 is described below
commit 8560e9a8e3efdee2c31596766130b833f9ed3c04
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat Aug 3 16:27:52 2024 +0200
Fix Example DAGs Test by adding one more internal API method for
DatasetAlias resolution (#41233)
---
airflow/api_internal/endpoints/rpc_api_endpoint.py | 2 ++
airflow/datasets/__init__.py | 2 ++
2 files changed, 4 insertions(+)
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index 6dc2982e42..4a5fdb0276 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -55,6 +55,7 @@ def initialize_method_map() -> dict[str, Callable]:
from airflow.cli.commands.task_command import _get_ti_db_access
from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.dag_processing.processor import DagFileProcessor
+ from airflow.datasets import expand_alias_to_datasets
from airflow.datasets.manager import DatasetManager
from airflow.models import Trigger, Variable, XCom
from airflow.models.dag import DAG, DagModel
@@ -106,6 +107,7 @@ def initialize_method_map() -> dict[str, Callable]:
DagFileProcessorManager.clear_nonexistent_import_errors,
DagFileProcessorManager.deactivate_stale_dags,
DagWarning.purge_inactive_dag_warnings,
+ expand_alias_to_datasets,
DatasetManager.register_dataset_change,
FileTaskHandler._render_filename_db_access,
Job._add_to_db,
diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 873d314136..0e99188b28 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Callable, ClassVar,
Iterable, Iterator, c
import attr
from sqlalchemy import select
+from airflow.api_internal.internal_api_call import internal_api_call
from airflow.serialization.dag_dependency import DagDependency
from airflow.typing_compat import TypedDict
from airflow.utils.session import NEW_SESSION, provide_session
@@ -138,6 +139,7 @@ def extract_event_key(value: str | Dataset | DatasetAlias)
-> str:
return _sanitize_uri(str(value))
+@internal_api_call
@provide_session
def expand_alias_to_datasets(
alias: str | DatasetAlias, *, session: Session = NEW_SESSION