tirkarthi opened a new issue, #32150:
URL: https://github.com/apache/airflow/issues/32150

   ### Apache Airflow version
   
   main (development)
   
   ### What happened
   
   While loading a Mapped TriggerDagRun operator inside the webserver the 
trigger_dag_id is set as `None` and `operator` also doesn't have 
`partial_kwargs` set to fetch it. This is part of `partial_kwargs` during 
serialization. So None value causes the link to return 500 error as per below 
traceback.
   
   
https://github.com/apache/airflow/blob/016ce9948625a556093b0182439aa50314c651da/airflow/operators/trigger_dagrun.py#L64
   
   I tried the below patch but I am not sure if it's a good idea.
   
   ```diff
   diff --git a/airflow/models/mappedoperator.py 
b/airflow/models/mappedoperator.py
   index 66b75923fd..1fc8ce2134 100644
   --- a/airflow/models/mappedoperator.py
   +++ b/airflow/models/mappedoperator.py
   @@ -636,6 +636,10 @@ class MappedOperator(AbstractOperator):
    
            op = SerializedBaseOperator(task_id=self.task_id, 
params=self.params, _airflow_from_mapped=True)
            SerializedBaseOperator.populate_operator(op, self.operator_class)
   +
   +        for k, v in getattr(self, 'partial_kwargs', {}).items():
   +            setattr(op, k, v)
   +
            return op
    
        def _get_specified_expand_input(self) -> ExpandInput:
   ```
   
   ```python
   import datetime
   
   from airflow.decorators import task
   from airflow.models.dag import dag
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator
   
   from airflow import DAG
   
   with DAG(
       catchup=False,
       start_date=datetime.datetime(2022, 1, 2),
       schedule_interval=datetime.timedelta(seconds=3600),
       dag_id="my_dag_trigger",
       dagrun_timeout=datetime.timedelta(seconds=3600),
       default_args={
           "retry_delay": datetime.timedelta(seconds=3600),
       }
   ) as dag:
       t1 = TriggerDagRunOperator.partial(
           task_id="my_task", trigger_dag_id="example_bash_operator"
       ).expand(conf=[{"params": {"name": "foo!!!"}}, {"params": {"name": 
"foo!!!"}}])
   
       t2 = TriggerDagRunOperator(
           task_id="my_task_1", trigger_dag_id="example_bash_operator"
       )
   
       t1 >> t2
   ```
   
   ```
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py",
 line 2552, in __call__
       return self.wsgi_app(environ, start_response)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py",
 line 2532, in wsgi_app
       response = self.handle_exception(e)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py",
 line 2529, in wsgi_app
       response = self.full_dispatch_request()
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py",
 line 1825, in full_dispatch_request
       rv = self.handle_user_exception(e)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py",
 line 1823, in full_dispatch_request
       rv = self.dispatch_request()
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py",
 line 1799, in dispatch_request
       return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
     File "/home/karthikeyan/stuff/python/airflow/airflow/www/auth.py", line 
48, in decorated
       return func(*args, **kwargs)
     File "/home/karthikeyan/stuff/python/airflow/airflow/www/decorators.py", 
line 125, in wrapper
       return f(*args, **kwargs)
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, session=session, **kwargs)
     File "/home/karthikeyan/stuff/python/airflow/airflow/www/views.py", line 
3700, in extra_links
       url = task.get_extra_links(ti, link_name)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/models/abstractoperator.py", 
line 390, in get_extra_links
       return link.get_link(self.unmap(None), ti_key=ti.key)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/operators/trigger_dagrun.py", 
line 65, in get_link
       return build_airflow_url_with_query(query)
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/helpers.py", 
line 253, in build_airflow_url_with_query
       return flask.url_for(f"Airflow.{view}", **query)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/helpers.py",
 line 256, in url_for
       return current_app.url_for(
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py",
 line 2034, in url_for
       return self.handle_url_build_error(error, endpoint, values)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/flask/app.py",
 line 2023, in url_for
       rv = url_adapter.build(  # type: ignore[union-attr]
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.8/site-packages/werkzeug/routing/map.py",
 line 917, in build
       raise BuildError(endpoint, values, method, self)
   werkzeug.routing.exceptions.BuildError: Could not build url for endpoint 
'Airflow.grid' with values ['base_date']. Did you forget to specify values 
['dag_id']?
   ```
   
   ### What you think should happen instead
   
   The appropriate extra link should be returned.
   
   ### How to reproduce
   
   1. Run the attached dag.
   2. Try accessing the extra link url from graph view.
   
   Test case
   
   ```diff
   diff --git a/tests/www/views/test_views_extra_links.py 
b/tests/www/views/test_views_extra_links.py
   index db5d832ef7..2e04d1cd68 100644
   --- a/tests/www/views/test_views_extra_links.py
   +++ b/tests/www/views/test_views_extra_links.py
   @@ -26,6 +26,8 @@ import pytest
    
    from airflow.models import DAG
    from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
   +from airflow.models.serialized_dag import SerializedDagModel
   +from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    from airflow.utils import timezone
    from airflow.utils.session import create_session
    from airflow.utils.state import DagRunState, TaskInstanceState
   @@ -115,6 +117,18 @@ def task_3(dag):
        return Dummy3TestOperator(task_id="some_dummy_task_3", dag=dag)
    
    
   [email protected](scope="module", autouse=True)
   +def trigger_task(dag):
   +    TRIGGERED_DAG_ID = "dag"
   +
   +    return TriggerDagRunOperator.partial(
   +        task_id="test_trigger_expand",
   +        trigger_dag_id=TRIGGERED_DAG_ID,
   +        reset_dag_run=True,
   +        dag=dag,
   +    ).expand(execution_date=[DEFAULT_DATE])
   +
   +
    @pytest.fixture(scope="module", autouse=True)
    def init_blank_task_instances():
        """Make sure there are no runs before we test anything.
   @@ -290,3 +304,18 @@ def 
test_operator_extra_link_multiple_operators(dag_run, task_2, task_3, viewer_
        if isinstance(response.data, bytes):
            response_str = response_str.decode()
        assert json.loads(response_str) == {"url": "https://www.google.com";, 
"error": None}
   +
   +
   +def test_trigger_dagrun_expand_view(app, trigger_task, dag_run, 
viewer_client):
   +    """Test TriggerDagRunOperator with expand."""
   +    with mock.patch.object(app, "dag_bag") as mock_dag_bag:
   +        SerializedDagModel.write_dag(trigger_task.dag)
   +        mock_dag_bag.get_dag.return_value = 
SerializedDagModel.get_dag(trigger_task.dag.dag_id)
   +
   +        response = viewer_client.get(
   +            
f"{ENDPOINT}?dag_id={trigger_task.dag_id}&task_id={trigger_task.task_id}"
   +            f"&execution_date={DEFAULT_DATE}&link_name=Triggered 
DAG&map_index=0",
   +            follow_redirects=True,
   +        )
   +        breakpoint()
   +        assert response.status_code == 200
   ```
   
   ### Operating System
   
   Ubuntu
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to