nehemiascr opened a new issue, #32375:
URL: https://github.com/apache/airflow/issues/32375
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Airflow 2.5.1 in k8s from chart airflow:8.7.1,
I wrote a `task_instance_mutation_hook` function and added it to a file
`airflow_local_settings.py`, my function is being picked up and executed, I am
making a change to the 'pod_override' property of the task instance's
executor_config, it seems that as soon as the updated config tries to go to the
database, I get an `sqlalchemy.exc.ResourceClosedError: This transaction is
closed error`, don;t don;t what to do to solve this error.
# airflow_local_settings.py
``` python
import logging
from airflow.models import TaskInstance
log = logging.getLogger(__name__)
def task_instance_mutation_hook(ti: TaskInstance):
if ti.executor_config.get('pod_override', None):
log.info(f"pod_override before update
{ti.executor_config['pod_override']}")
ti.executor_config["pod_override"].metadata.annotations["some_annotation"] =
"myAnnotation"
log.info(f"pod_override after update
{ti.executor_config['pod_override']}")
else:
log.info("no ti.executor_config['pod_override']")
log.info(f"dag_run_conf {dag_run_conf}")
```
Log trace from web-server
``` Log
[2023-07-05 13:50:38,780] {airflow_local_settings.py:40} INFO - pod_override
before update {'api_version': None,
'kind': None,
'metadata': {'annotations': {'some_annotation': 'myAnnotation',
'karpenter.sh/do-not-evict': 'true'},
'cluster_name': None,
'creation_timestamp': None,
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': None,
'generation': None,
'labels': None,
'managed_fields': None,
'name': None,
'namespace': None,
'owner_references': None,
'resource_version': None,
'self_link': None,
'uid': None},
'spec': None,
'status': None}
[2023-07-05 13:50:38,780] {airflow_local_settings.py:42} INFO - pod_override
after update {'api_version': None,
'kind': None,
'metadata': {'annotations': {'some_annotation': 'myAnnotation',
'karpenter.sh/do-not-evict': 'true'},
'cluster_name': None,
'creation_timestamp': None,
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': None,
'generation': None,
'labels': None,
'managed_fields': None,
'name': None,
'namespace': None,
'owner_references': None,
'resource_version': None,
'self_link': None,
'uid': None},
'spec': None,
'status': None}
[2023-07-05 13:50:38,782] {airflow_local_settings.py:47} INFO - dag_run_conf
<DagRun import_pipeline @ 2023-07-05 13:50:38.630131+00:00:
api_2023-07-05T13:50:38.229515-db-9, state:running, queued_at: 2023-07-05
13:50:38.673636+00:00. externally triggered: True>
[2023-07-05 13:50:38,783] {airflow_local_settings.py:44} INFO - no
ti.executor_config['pod_override']
[2023-07-05 13:50:38,786] {airflow_local_settings.py:47} INFO - dag_run_conf
<DagRun import_pipeline @ 2023-07-05 13:50:38.630131+00:00:
api_2023-07-05T13:50:38.229515-db-9, state:running, queued_at: 2023-07-05
13:50:38.673636+00:00. externally triggered: True>
[2023-07-05 13:50:38,786] {app.py:1741} ERROR - Exception on
/api/v1/dags/import_pipeline/dagRuns [POST]
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3799, in _bulk_save_mappings
persistence._bulk_insert(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
line 73, in _bulk_insert
connection = session_transaction.connection(base_mapper)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 624, in connection
self._assert_active()
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 617, in _assert_active
raise sa_exc.ResourceClosedError(closed_msg)
sqlalchemy.exc.ResourceClosedError: This transaction is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
2525, in wsgi_app
response = self.full_dispatch_request()
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
1822, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
1820, in full_dispatch_request
rv = self.dispatch_request()
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
1796, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/decorator.py",
line 68, in wrapper
response = function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/uri_parsing.py",
line 149, in wrapper
response = function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py",
line 196, in wrapper
response = function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py",
line 399, in wrapper
return function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/response.py",
line 112, in wrapper
response = function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/parameter.py",
line 120, in wrapper
return function(**kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/security.py",
line 51, in decorated
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 75, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/endpoints/dag_run_endpoint.py",
line 311, in post_dag_run
dag_run = dag.create_dagrun(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 72, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line
2651, in create_dagrun
run.verify_integrity(session=session)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 72, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 932, in verify_integrity
self._create_task_instances(self.dag_id, tis_to_create, created_counts,
hook_is_noop, session=session)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 1120, in _create_task_instances
session.bulk_save_objects(tasks)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3595, in bulk_save_objects
self._bulk_save_mappings(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3811, in _bulk_save_mappings
transaction.rollback(_capture_exception=True)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
line 84, in __exit__
compat.raise_(value, with_traceback=traceback)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3811, in _bulk_save_mappings
transaction.rollback(_capture_exception=True)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 851, in rollback
self._assert_active(prepared_ok=True, rollback_ok=True)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 617, in _assert_active
raise sa_exc.ResourceClosedError(closed_msg)
sqlalchemy.exc.ResourceClosedError: This transaction is closed
10.10.10.60 - airflowdags [05/Jul/2023:13:50:38 +0000] "POST
/api/v1/dags/import_pipeline/dagRuns HTTP/1.1" 500 1560 "-"
"python-requests/2.26.0"
[2023-07-05 13:50:38,805] {flask_api.py:251} DEBUG - Getting data and status
code
[2023-07-05 13:50:38,805] {validation.py:168} DEBUG -
http://10.10.10.60:32700/api/v1/dags/import_pipeline/dagRuns validating
schema...
[2023-07-05 13:50:38,805] {validation.py:365} DEBUG -
http://10.10.10.60:32700/api/v1/dags/import_pipeline/dagRuns validating
parameters...
[2023-07-05 13:50:38,805] {parameter.py:85} DEBUG - Function Arguments:
['dag_id', 'session']
[2023-07-05 13:50:39,025] {manager.py:226} INFO - Updated user admin admin
[2023-07-05 13:50:39,042] {abstract.py:280} DEBUG - Getting data and status
code
[2023-07-05 13:50:39,043] {abstract.py:423} DEBUG - Prepared body and status
code (409)
[2023-07-05 13:50:39,043] {abstract.py:292} DEBUG - Got framework response
10.10.10.60 - airflowdags [05/Jul/2023:13:50:39 +0000] "POST
/api/v1/dags/import_pipeline/dagRuns HTTP/1.1" 409 284 "-"
"python-requests/2.26.0"
[2023-07-05 13:50:39,076] {flask_api.py:251} DEBUG - Getting data and status
code
[2023-07-05 13:50:39,077] {validation.py:168} DEBUG -
http://10.10.10.60:32700/api/v1/dags/import_pipeline/dagRuns validating
schema...
[2023-07-05 13:50:39,078] {validation.py:365} DEBUG -
http://10.10.10.60:32700/api/v1/dags/import_pipeline/dagRuns validating
parameters...
[2023-07-05 13:50:39,078] {parameter.py:85} DEBUG - Function Arguments:
['dag_id', 'session']
[2023-07-05 13:50:39,313] {manager.py:226} INFO - Updated user admin admin
[2023-07-05 13:50:39,338] {abstract.py:280} DEBUG - Getting data and status
code
[2023-07-05 13:50:39,338] {abstract.py:423} DEBUG - Prepared body and status
code (409)
[2023-07-05 13:50:39,338] {abstract.py:292} DEBUG - Got framework response
10.10.10.60 - airflowdags [05/Jul/2023:13:50:39 +0000] "POST
/api/v1/dags/import_pipeline/dagRuns HTTP/1.1" 409 284 "-"
"python-requests/2.26.0"
```
### What you think should happen instead
The executor_config of the task instance gets updated and the scheduling of
the task continues
### How to reproduce
try to update the executor_config of the task instance from a
task_instance_mutation_hook
### Operating System
PRETTY_NAME="CentOS Stream 8"
### Versions of Apache Airflow Providers
2.5.1
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
chart airflow:8.7.1
### Anything else
My goal is to add annotations and labels to the k8s executor config
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]