mohittalele opened a new issue, #25603:
URL: https://github.com/apache/airflow/issues/25603

   ### Apache Airflow version
   
   2.3.3
   
   ### What happened
   
   I have use case where I would like to change the priority of the task based 
on some user defined conf. For example if the dag_run conf has  some 
"high_priority in it then assign a high weight in task instance.
   
   For that I am using task_instance_mutation_hook.  Here is my 
`airflow_local_settings.py `
   
   ```
   import json
   from airflow.models import TaskInstance
   def task_instance_mutation_hook(task_instance: TaskInstance):
   DAG_ID = "vyper"
   if task_instance.dag_id == DAG_ID:
       print("dag_id is =" , task_instance.dag_id)
       json_obj = json.loads(task_instance.get_dagrun().conf.get('message'))
       print("json_obj = " , json_obj)
       user_metadata = json_obj["Records"][0]["s3"]["object"]["userMetadata"]
       high_priority_weight = 10
       critical_priority_weight = 20
       if "X-Amz-Meta-priority" in user_metadata:
           if user_metadata["X-Amz-Meta-priority"] == "high":
               task_instance.priority_weight = high_priority_weight
               print("User defined metadata has X-Amz-Meta-priority:high value")
   ```
   The above mutation hook throws below error. 
   
   
   ```
   
   [2022-08-08, 13:34:02 CEST] {taskinstance.py:1909} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3805, in _bulk_save_mappings
       render_nulls,
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
 line 73, in _bulk_insert
       connection = session_transaction.connection(base_mapper)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 624, in connection
       self._assert_active()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 617, in _assert_active
       raise sa_exc.ResourceClosedError(closed_msg)
   sqlalchemy.exc.ResourceClosedError: This transaction is closed
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/trigger_dagrun.py",
 line 150, in execute
       replace_microseconds=False,
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/api/common/trigger_dag.py",
 line 129, in trigger_dag
       replace_microseconds=replace_microseconds,
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/api/common/trigger_dag.py",
 line 94, in _trigger_dag
       run_after=pendulum.instance(execution_date)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 71, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 
2390, in create_dagrun
       run.verify_integrity(session=session)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", 
line 853, in verify_integrity
       self._create_task_instances(dag.dag_id, tasks, created_counts, 
hook_is_noop, session=session)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", 
line 1048, in _create_task_instances
       session.bulk_save_objects(tasks)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3602, in bulk_save_objects
       False,
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3811, in _bulk_save_mappings
       transaction.rollback(_capture_exception=True)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py",
 line 84, in __exit__
       compat.raise_(value, with_traceback=traceback)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 207, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3811, in _bulk_save_mappings
       transaction.rollback(_capture_exception=True)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 851, in rollback
       self._assert_active(prepared_ok=True, rollback_ok=True)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 617, in _assert_active
       raise sa_exc.ResourceClosedError(closed_msg)
   sqlalchemy.exc.ResourceClosedError: This transaction is closed
   ```
   
   ### What you think should happen instead
   
   The task mutation hook should be able to update the task_instance property. 
[Here 
](https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html#task-instance-mutation)
 in documentation , task_instance queue is updated. I would expect the mutation 
hook to update the priority weight of task instance
   
   ### How to reproduce
   
   - Create a mutation hook in airflow_local_settings.py
   - try to update the task_instance priority_weight 
   - trigger dag ( error is produced by triggered manually as well as automatic 
scheduled) 
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" 
VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian 
HOME_URL="https://www.debian.org/"; SUPPORT_URL="https://www.debian.org/support"; 
BUG_REPORT_URL="https://bugs.debian.org/";
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to