tirkarthi opened a new issue, #32150: URL: https://github.com/apache/airflow/issues/32150
### Apache Airflow version main (development) ### What happened While loading a Mapped TriggerDagRun operator inside the webserver the trigger_dag_id is set as `None` and `operator` also doesn't have `partial_kwargs` set to fetch it. This is part of `partial_kwargs` during serialization. So None value causes the link to return 500 error as per below traceback. https://github.com/apache/airflow/blob/016ce9948625a556093b0182439aa50314c651da/airflow/operators/trigger_dagrun.py#L64 I tried the below patch but I am not sure if it's a good idea. ```diff diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py index 66b75923fd..1fc8ce2134 100644 --- a/airflow/models/mappedoperator.py +++ b/airflow/models/mappedoperator.py @@ -636,6 +636,10 @@ class MappedOperator(AbstractOperator): op = SerializedBaseOperator(task_id=self.task_id, params=self.params, _airflow_from_mapped=True) SerializedBaseOperator.populate_operator(op, self.operator_class) + + for k, v in getattr(self, 'partial_kwargs', {}).items(): + setattr(op, k, v) + return op def _get_specified_expand_input(self) -> ExpandInput: ``` ```python import datetime from airflow.decorators import task from airflow.models.dag import dag from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow import DAG with DAG( catchup=False, start_date=datetime.datetime(2022, 1, 2), schedule_interval=datetime.timedelta(seconds=3600), dag_id="my_dag_trigger", dagrun_timeout=datetime.timedelta(seconds=3600), default_args={ "retry_delay": datetime.timedelta(seconds=3600), } ) as dag: t1 = TriggerDagRunOperator.partial( task_id="my_task", trigger_dag_id="example_bash_operator" ).expand(conf=[{"params": {"name": "foo!!!"}}, {"params": {"name": "foo!!!"}}]) t2 = TriggerDagRunOperator( task_id="my_task_1", trigger_dag_id="example_bash_operator" ) t1 >> t2 ``` ``` Traceback (most recent call last): File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2552, in __call__ return self.wsgi_app(environ, start_response) File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2532, in wsgi_app response = self.handle_exception(e) File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2529, in wsgi_app response = self.full_dispatch_request() File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 1825, in full_dispatch_request rv = self.handle_user_exception(e) File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 1823, in full_dispatch_request rv = self.dispatch_request() File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 1799, in dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) File "/home/karthikeyan/stuff/python/airflow/airflow/www/auth.py", line 48, in decorated return func(*args, **kwargs) File "/home/karthikeyan/stuff/python/airflow/airflow/www/decorators.py", line 125, in wrapper return f(*args, **kwargs) File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 76, in wrapper return func(*args, session=session, **kwargs) File "/home/karthikeyan/stuff/python/airflow/airflow/www/views.py", line 3700, in extra_links url = task.get_extra_links(ti, link_name) File "/home/karthikeyan/stuff/python/airflow/airflow/models/abstractoperator.py", line 390, in get_extra_links return link.get_link(self.unmap(None), ti_key=ti.key) File "/home/karthikeyan/stuff/python/airflow/airflow/operators/trigger_dagrun.py", line 65, in get_link return build_airflow_url_with_query(query) File "/home/karthikeyan/stuff/python/airflow/airflow/utils/helpers.py", line 253, in build_airflow_url_with_query return flask.url_for(f"Airflow.{view}", **query) File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/helpers.py", line 256, in url_for return current_app.url_for( File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2034, in url_for return self.handle_url_build_error(error, endpoint, values) File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py", line 2023, in url_for rv = url_adapter.build( # type: ignore[union-attr] File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/werkzeug/routing/map.py", line 917, in build raise BuildError(endpoint, values, method, self) werkzeug.routing.exceptions.BuildError: Could not build url for endpoint 'Airflow.grid' with values ['base_date']. Did you forget to specify values ['dag_id']? ``` ### What you think should happen instead The appropriate extra link should be returned. ### How to reproduce 1. Run the attached dag. 2. Try accessing the extra link url from graph view. Test case ```diff diff --git a/tests/www/views/test_views_extra_links.py b/tests/www/views/test_views_extra_links.py index db5d832ef7..2e04d1cd68 100644 --- a/tests/www/views/test_views_extra_links.py +++ b/tests/www/views/test_views_extra_links.py @@ -26,6 +26,8 @@ import pytest from airflow.models import DAG from airflow.models.baseoperator import BaseOperator, BaseOperatorLink +from airflow.models.serialized_dag import SerializedDagModel +from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import DagRunState, TaskInstanceState @@ -115,6 +117,18 @@ def task_3(dag): return Dummy3TestOperator(task_id="some_dummy_task_3", dag=dag) [email protected](scope="module", autouse=True) +def trigger_task(dag): + TRIGGERED_DAG_ID = "dag" + + return TriggerDagRunOperator.partial( + task_id="test_trigger_expand", + trigger_dag_id=TRIGGERED_DAG_ID, + reset_dag_run=True, + dag=dag, + ).expand(execution_date=[DEFAULT_DATE]) + + @pytest.fixture(scope="module", autouse=True) def init_blank_task_instances(): """Make sure there are no runs before we test anything. @@ -290,3 +304,18 @@ def test_operator_extra_link_multiple_operators(dag_run, task_2, task_3, viewer_ if isinstance(response.data, bytes): response_str = response_str.decode() assert json.loads(response_str) == {"url": "https://www.google.com", "error": None} + + +def test_trigger_dagrun_expand_view(app, trigger_task, dag_run, viewer_client): + """Test TriggerDagRunOperator with expand.""" + with mock.patch.object(app, "dag_bag") as mock_dag_bag: + SerializedDagModel.write_dag(trigger_task.dag) + mock_dag_bag.get_dag.return_value = SerializedDagModel.get_dag(trigger_task.dag.dag_id) + + response = viewer_client.get( + f"{ENDPOINT}?dag_id={trigger_task.dag_id}&task_id={trigger_task.task_id}" + f"&execution_date={DEFAULT_DATE}&link_name=Triggered DAG&map_index=0", + follow_redirects=True, + ) + breakpoint() + assert response.status_code == 200 ``` ### Operating System Ubuntu ### Versions of Apache Airflow Providers _No response_ ### Deployment Virtualenv installation ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
