nehemiascr commented on issue #32375:
URL: https://github.com/apache/airflow/issues/32375#issuecomment-1630675549
Jens, thank you for your response
I tried with Airflow 2.6.2 and through the UI, same error
I noticed that the error occurs after reading from an external database, I
get a MySQL database connection from an Airflow Connection, query this database
and then the error happens, I am providing some code of what i am doing that
causes the issue.
I created a simple Dag that can run tasks in parallel and serially, when I
run the dag with 20 tasks serially and 20 tasks in parallel but without reading
the external database, updating the `pod_override` configuration of the tasks
succeeds; but when running the same dag with 1 task in parallel and 1 task
serially (2 serial tasks) and reading my database, then it fails.
Also I am trying 3 different query methods, they all fail
**airflow_local_settings.py:**
``` python
import logging
import socket
from typing import Dict
from airflow.models import TaskInstance, Connection
from airflow.exceptions import AirflowNotFoundException
from sqlalchemy import create_engine, exc
logger = logging.getLogger(__name__)
def get_customer_database_size_method_0(db_id: int) -> Dict:
"""
Without querying database, made up size of 123.45
"""
result = {"database": f"mycompany_cdb_{db_id:010d}", "size_mb":123.45}
logger.info(f"size of database method_1 {result['database']} is
{result['size_mb']}")
return result
def get_customer_database_size_method_1(db_id: int) -> Dict:
"""
Instantiate SQLAlchemy engine from Airflow Connection uri
create a connection, run query, close connection
"""
connection_name = "mysql_default"
try:
conn = Connection.get_connection_from_secrets(connection_name)
except AirflowNotFoundException as e:
logger.exception(e)
return {"database": f"mycompany_cdb_{db_id:010d}", "size_mb":0}
engine = create_engine(conn.get_uri())
stmt = f"SELECT table_schema AS 'database', ROUND(SUM(data_length +
index_length) / 1024 / 1024, 2) AS 'size_mb' FROM information_schema.tables
WHERE table_schema = 'mycompany_cdb_{db_id:010d}'"
try:
c = engine.connect()
result = dict(c.execute(stmt).first())
c.close()
except exc.DBAPIError as e:
if e.connection_invalidated:
print("Connection was invalidated!")
logger.info(f"size of database method_1 {result['database']} is
{result['size_mb']}")
return result
def get_customer_database_size_method_2(db_id: int) -> Dict:
"""
Instantiate SQLAlchemy engine from Airflow Connection uri
query engine
"""
connection_name = "mysql_default"
try:
conn = Connection.get_connection_from_secrets(connection_name)
except AirflowNotFoundException as e:
logger.exception(e)
return {"database": f"mycompany_cdb_{db_id:010d}", "size_mb":0}
engine = create_engine(conn.get_uri())
stmt = f"SELECT table_schema AS 'database', ROUND(SUM(data_length +
index_length) / 1024 / 1024, 2) AS 'size_mb' FROM information_schema.tables
WHERE table_schema = 'mycompany_cdb_{db_id:010d}'"
result = dict(engine.execute(stmt).first())
logger.info(f"size of database method_2 {result['database']} is
{result['size_mb']}")
return result
def get_customer_database_size_method_3(db_id: int) -> Dict:
"""
MyCompanyDatabaseManager
"""
from mycompany_core.db_engine import MyCompanyDatabaseManager, get_engine
from mycompanyet.mysql_engines import DRIVER_OVERRIDE, QUERY_OVERRIDE
from airflow_configs.hooks import get_mysql_hook
db_manager = MyCompanyDatabaseManager(
get_engine(get_mysql_hook().get_sqlalchemy_engine().url),
drivername_override=DRIVER_OVERRIDE,
query_override=QUERY_OVERRIDE,
)
engine = db_manager.get_engine()
stmt = f"SELECT table_schema AS 'database', ROUND(SUM(data_length +
index_length) / 1024 / 1024, 2) AS 'size_mb' FROM information_schema.tables
WHERE table_schema = 'mycompany_cdb_{db_id:010d}'"
result = dict(engine.execute(stmt).first())
logger.info(f"size of database method_3 {result['database']} is
{result['size_mb']}")
return result
def task_instance_mutation_hook(ti: TaskInstance):
logger.info(f"on task_instance_mutation_hook {ti.dag_id} task_id
{ti.task_id} dag_run {ti.dag_run}")
hostname = socket.gethostname()
logger.info(f"hostname {hostname} {ti.dag_id} task_id {ti.task_id}
dag_run {ti.dag_run}")
if not ti.executor_config:
logger.info(f"no ti.executor_config :::: dag_id {ti.dag_id} task_id
{ti.task_id} dag_run {ti.dag_run}")
return
if not ti.executor_config.get('pod_override', None):
logger.info(f"no ti.executor_config['pod_override'] :::: dag_id
{ti.dag_id} task_id {ti.task_id} dag_run {ti.dag_run}")
return
if ti.dag_run:
logger.info(f"dag_run for dag_id {ti.dag_id} task_id {ti.task_id}
dag_run {ti.dag_run}")
if ti.dag_run.conf:
dag_run_conf = ti.dag_run.conf
logger.info(f"dag_run_conf dag_id {ti.dag_id} task_id
{ti.task_id} dag_run {ti.dag_run} dag_run_conf {dag_run_conf}")
database_id = dag_run_conf["database_id"]
logger.info(f"dag_run_conf database_id {database_id} dag_id
{ti.dag_id} task_id {ti.task_id} dag_run {ti.dag_run} dag_run_conf
{dag_run_conf}")
else:
database_id = ti.run_id.split('-')[-1]
logger.info(f"dag_run but no dag_run.conf database_id
{database_id} dag_id {ti.dag_id} task_id {ti.task_id} dag_run {ti.dag_run}")
else:
database_id = ti.run_id.split('-')[-1]
logger.info(f"no dag_run database_id {database_id} dag_id
{ti.dag_id} task_id {ti.task_id} dag_run {ti.dag_run}")
if ti.dag_id.startswith('my_dag'):
if ti.task_id.startswith("my_task"):
database_id = int(database_id)
logger.info(f"database_id {database_id}")
logger.info(f"pod_override before update dag_id {ti.dag_id}
task_id {ti.task_id} dag_run {ti.dag_run} {ti.executor_config['pod_override']}")
# depending on the dag, we try a different database query method
my_dag_number = int(ti.dag_id.split('_')[-1])
if my_dag_number == 0:
database_size =
get_customer_database_size_method_0(database_id) # fake query
elif my_dag_number == 1:
database_size =
get_customer_database_size_method_1(database_id)
elif my_dag_number == 2:
database_size =
get_customer_database_size_method_2(database_id)
elif my_dag_number == 3:
database_size =
get_customer_database_size_method_3(database_id)
else:
database_size = {"database":
f"mycompany_cdb_{database_id:010d}", "size_mb":9.9}
logger.info(f"database_size {database_size} dag_id {ti.dag_id}
task_id {ti.task_id} dag_run {ti.dag_run}")
database_size = float(database_size["size_mb"] or 0)
logger.info(f"database_size {database_size} dag_id {ti.dag_id}
task_id {ti.task_id} dag_run {ti.dag_run}")
extra_annotations = {
"database_id": database_id,
"size_in_mb": float(database_size),
"karpenter.sh/tag": "mySchedulingAnnotation",
"hostname_of_task_instance_mutation_hook": hostname,
}
logger.info(f"extra_annotations dag_id {ti.dag_id} task_id
{ti.task_id} dag_run {ti.dag_run} {extra_annotations}")
ti.executor_config["pod_override"].metadata.annotations.update(extra_annotations)
# logger.info(f"pod_override after update dag_id {ti.dag_id}
task_id {ti.task_id} dag_run {ti.dag_run} {ti.executor_config['pod_override']}")
else:
extra_annotations = {
"hostname_of_task_instance_mutation_hook": hostname,
"database_id": database_id,
}
logger.info(f"extra_annotations dag_id {ti.dag_id} task_id
{ti.task_id} dag_run {ti.dag_run} {extra_annotations}")
ti.executor_config["pod_override"].metadata.annotations.update(extra_annotations)
# logger.info(f"pod_override after update dag_id {ti.dag_id} task_id
{ti.task_id} dag_run {ti.dag_run} {ti.executor_config['pod_override']}")
```
**my_dag_0.py**:
``` python
import json
import logging
import socket
import airflow
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
log = logging.getLogger(__name__)
def get_node_config():
# Use kubernetes.client.models.V1Affinity to define node affinity
executor_config = {
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
annotations={"karpenter.sh/do-not-evict": "true"}
),
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(limits={"memory": " 4G"}),
)
],
affinity=k8s.V1Affinity(
node_affinity=k8s.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=k8s.V1NodeSelector(
node_selector_terms=[
k8s.V1NodeSelectorTerm(
match_expressions=[
k8s.V1NodeSelectorRequirement(
key="size", operator="In",
values=["standard"]
),
]
)
]
)
)
),
),
),
}
return executor_config
def my_task_callable(**kwargs):
log.info(f"hostname: {socket.gethostname()}")
config = kwargs["dag_run"].conf
log.info(f"config {config}")
airflow_dag_run_id = kwargs["dag_run"].run_id
log.info(f"airflow_dag_run_id {airflow_dag_run_id}")
log.info(f"addding 2 + 2")
result = 2 + 2
log.info(f"which results in {result}")
default_args = {"owner": "airflow", "start_date":
airflow.utils.dates.days_ago(1), "retries": 3}
"""
{
"database_id": 9
}
"""
if __name__.startswith("unusual_prefix"):
with DAG(
dag_id="my_dag_0",
catchup=False,
default_args=default_args,
schedule_interval=None,
) as dag:
begin_my_dag_op = EmptyOperator(task_id="begin_my_dag_op")
end_my_task_parallel_op =
EmptyOperator(task_id="end_my_task_parallel_op",
trigger_rule=TriggerRule.NONE_FAILED)
end_my_task_serial_op =
EmptyOperator(task_id="end_my_task_serial_op",
trigger_rule=TriggerRule.NONE_FAILED)
end_my_dag_op = EmptyOperator(task_id="end_my_dag_op")
my_task_parallel_op = dict()
num_of_jobs = 20
for job_num in range(1, num_of_jobs + 1):
my_task_parallel_op[job_num] = PythonOperator(
task_id=f"my_task_parallel_{job_num}",
python_callable=my_task_callable,
provide_context=True,
executor_config=get_node_config(),
)
begin_my_dag_op >> my_task_parallel_op[job_num] >>
end_my_task_parallel_op
last_serial_task = end_my_task_parallel_op
my_task_serial_op = dict()
num_of_jobs = 20
for job_num in range(1, num_of_jobs + 1):
my_task_serial_op[job_num] = PythonOperator(
task_id=f"my_task_serial_{job_num}",
python_callable=my_task_callable,
provide_context=True,
executor_config=get_node_config(),
)
last_serial_task >> my_task_serial_op[job_num] >>
end_my_task_serial_op
last_serial_task = my_task_serial_op[job_num]
end_my_task_serial_op >> end_my_dag_op
```
<img width="1335" alt="my_dag_0"
src="https://github.com/apache/airflow/assets/1874087/50d8cd4a-dc85-41b2-a8ba-4161b38cf4e8">
**my_dag_1.py**, **my_dag_2.py** and **my_dag_3.py**:
``` python
...
num_of_jobs = 1
...
```
<img width="1122" alt="my_dag_1-my_dag_2-my_dag_3"
src="https://github.com/apache/airflow/assets/1874087/11c10f11-50ba-4dff-8546-f995ab613197">
The difference between my_dag_0 and the others is just the `num_of_jobs`
variable which is set to 20 for my_dag_0 and 1 for the others, the idea of
my_dag_0 is to show that, without reading the database, updating the
`pod_override` configuration of various tasks succeeds.
Log excerpt of when it fails:
``` Log
10.10.10.60 - - [10/Jul/2023:16:31:49 +0000] "GET /health HTTP/1.1" 200 243
"-" "kube-probe/1.22"
10.10.10.60 - - [10/Jul/2023:16:31:49 +0000] "GET /health HTTP/1.1" 200 243
"-" "kube-probe/1.22"
[2023-07-10T16:31:57.588+0000] {airflow_local_settings.py:85} INFO - on
task_instance_mutation_hook my_dag_3 task_id begin_my_dag_op dag_run None
[2023-07-10T16:31:57.589+0000] {airflow_local_settings.py:87} INFO -
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id begin_my_dag_op dag_run
None
[2023-07-10T16:31:57.589+0000] {airflow_local_settings.py:90} INFO - no
ti.executor_config :::: dag_id my_dag_3 task_id begin_my_dag_op dag_run None
[2023-07-10T16:31:57.590+0000] {airflow_local_settings.py:85} INFO - on
task_instance_mutation_hook my_dag_3 task_id end_my_task_parallel_op dag_run
None
[2023-07-10T16:31:57.590+0000] {airflow_local_settings.py:87} INFO -
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id end_my_task_parallel_op
dag_run None
[2023-07-10T16:31:57.590+0000] {airflow_local_settings.py:90} INFO - no
ti.executor_config :::: dag_id my_dag_3 task_id end_my_task_parallel_op dag_run
None
[2023-07-10T16:31:57.590+0000] {airflow_local_settings.py:85} INFO - on
task_instance_mutation_hook my_dag_3 task_id end_my_task_serial_op dag_run None
[2023-07-10T16:31:57.591+0000] {airflow_local_settings.py:87} INFO -
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id end_my_task_serial_op
dag_run None
[2023-07-10T16:31:57.591+0000] {airflow_local_settings.py:90} INFO - no
ti.executor_config :::: dag_id my_dag_3 task_id end_my_task_serial_op dag_run
None
[2023-07-10T16:31:57.591+0000] {airflow_local_settings.py:85} INFO - on
task_instance_mutation_hook my_dag_3 task_id end_my_dag_op dag_run None
[2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:87} INFO -
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id end_my_dag_op dag_run
None
[2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:90} INFO - no
ti.executor_config :::: dag_id my_dag_3 task_id end_my_dag_op dag_run None
[2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:85} INFO - on
task_instance_mutation_hook my_dag_3 task_id my_task_parallel_1 dag_run None
[2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:87} INFO -
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id my_task_parallel_1
dag_run None
[2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:109} INFO - no
dag_run database_id 9 dag_id my_dag_3 task_id my_task_parallel_1 dag_run None
[2023-07-10T16:31:57.593+0000] {airflow_local_settings.py:114} INFO -
database_id 9
[2023-07-10T16:31:57.596+0000] {airflow_local_settings.py:115} INFO -
pod_override before update dag_id my_dag_3 task_id my_task_parallel_1 dag_run
None {'api_version': None,
'kind': None,
'metadata': {'annotations': {'karpenter.sh/do-not-evict': 'true'},
'cluster_name': None,
'creation_timestamp': None,
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': None,
'generation': None,
'labels': None,
'managed_fields': None,
'name': None,
'namespace': None,
'owner_references': None,
'resource_version': None,
'self_link': None,
'uid': None},
'spec': {'active_deadline_seconds': None,
'affinity': {'node_affinity':
{'preferred_during_scheduling_ignored_during_execution': None,
'required_during_scheduling_ignored_during_execution': {'node_selector_terms':
[{'match_expressions': [{'key': 'size',
'operator':
'In',
'values':
['standard']}],
'match_fields': None}]}},
'pod_affinity': None,
'pod_anti_affinity': None},
'automount_service_account_token': None,
'containers': [{'args': None,
'command': None,
'env': None,
'env_from': None,
'image': None,
'image_pull_policy': None,
'lifecycle': None,
'liveness_probe': None,
'name': 'base',
'ports': None,
'readiness_probe': None,
'resources': {'limits': {'memory': ' 4G'},
'requests': None},
'security_context': None,
'startup_probe': None,
'stdin': None,
'stdin_once': None,
'termination_message_path': None,
'termination_message_policy': None,
'tty': None,
'volume_devices': None,
'volume_mounts': None,
'working_dir': None}],
'dns_config': None,
'dns_policy': None,
'enable_service_links': None,
'ephemeral_containers': None,
'host_aliases': None,
'host_ipc': None,
'host_network': None,
'host_pid': None,
'hostname': None,
'image_pull_secrets': None,
'init_containers': None,
'node_name': None,
'node_selector': None,
'os': None,
'overhead': None,
'preemption_policy': None,
'priority': None,
'priority_class_name': None,
'readiness_gates': None,
'restart_policy': None,
'runtime_class_name': None,
'scheduler_name': None,
'security_context': None,
'service_account': None,
'service_account_name': None,
'set_hostname_as_fqdn': None,
'share_process_namespace': None,
'subdomain': None,
'termination_grace_period_seconds': None,
'tolerations': None,
'topology_spread_constraints': None,
'volumes': None},
'status': None}
[2023-07-10T16:31:58.076+0000] {environment.py:107} INFO - No environment
configuration found.
[2023-07-10T16:31:58.083+0000] {managed_identity.py:94} INFO -
ManagedIdentityCredential will use IMDS
[2023-07-10T16:31:58.397+0000] {selector_events.py:54} DEBUG - Using
selector: EpollSelector
[2023-07-10T16:31:58.619+0000] {base.py:73} INFO - Using connection ID
'mysql_default' for task execution.
[2023-07-10T16:31:58.635+0000] {base.py:73} INFO - Using connection ID
'mysql_default' for task execution.
[2023-07-10T16:31:58.679+0000] {airflow_local_settings.py:80} INFO - size of
database method_3 mycompany_cdb_0000000009 is 171.16
[2023-07-10T16:31:58.679+0000] {airflow_local_settings.py:130} INFO -
database_size {'database': 'mycompany_cdb_0000000009', 'size_mb':
Decimal('171.16')} dag_id my_dag_3 task_id my_task_parallel_1 dag_run None
[2023-07-10T16:31:58.680+0000] {airflow_local_settings.py:133} INFO -
database_size 171.16 dag_id my_dag_3 task_id my_task_parallel_1 dag_run None
[2023-07-10T16:31:58.680+0000] {airflow_local_settings.py:140} INFO -
extra_annotations dag_id my_dag_3 task_id my_task_parallel_1 dag_run None
{'database_id': 9, 'size_in_mb': 171.16, 'karpenter.sh/tag':
'mySchedulingAnnotation', 'hostname_of_task_instance_mutation_hook':
'airflow-web-56554b8989-h2pq9'}
[2023-07-10T16:31:58.681+0000] {airflow_local_settings.py:85} INFO - on
task_instance_mutation_hook my_dag_3 task_id my_task_serial_1 dag_run None
[2023-07-10T16:31:58.681+0000] {airflow_local_settings.py:87} INFO -
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id my_task_serial_1 dag_run
None
[2023-07-10T16:31:58.681+0000] {airflow_local_settings.py:109} INFO - no
dag_run database_id 9 dag_id my_dag_3 task_id my_task_serial_1 dag_run None
[2023-07-10T16:31:58.681+0000] {airflow_local_settings.py:114} INFO -
database_id 9
[2023-07-10T16:31:58.685+0000] {airflow_local_settings.py:115} INFO -
pod_override before update dag_id my_dag_3 task_id my_task_serial_1 dag_run
None {'api_version': None,
'kind': None,
'metadata': {'annotations': {'karpenter.sh/do-not-evict': 'true'},
'cluster_name': None,
'creation_timestamp': None,
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': None,
'generation': None,
'labels': None,
'managed_fields': None,
'name': None,
'namespace': None,
'owner_references': None,
'resource_version': None,
'self_link': None,
'uid': None},
'spec': {'active_deadline_seconds': None,
'affinity': {'node_affinity':
{'preferred_during_scheduling_ignored_during_execution': None,
'required_during_scheduling_ignored_during_execution': {'node_selector_terms':
[{'match_expressions': [{'key': 'size',
'operator':
'In',
'values':
['standard']}],
'match_fields': None}]}},
'pod_affinity': None,
'pod_anti_affinity': None},
'automount_service_account_token': None,
'containers': [{'args': None,
'command': None,
'env': None,
'env_from': None,
'image': None,
'image_pull_policy': None,
'lifecycle': None,
'liveness_probe': None,
'name': 'base',
'ports': None,
'readiness_probe': None,
'resources': {'limits': {'memory': ' 4G'},
'requests': None},
'security_context': None,
'startup_probe': None,
'stdin': None,
'stdin_once': None,
'termination_message_path': None,
'termination_message_policy': None,
'tty': None,
'volume_devices': None,
'volume_mounts': None,
'working_dir': None}],
'dns_config': None,
'dns_policy': None,
'enable_service_links': None,
'ephemeral_containers': None,
'host_aliases': None,
'host_ipc': None,
'host_network': None,
'host_pid': None,
'hostname': None,
'image_pull_secrets': None,
'init_containers': None,
'node_name': None,
'node_selector': None,
'os': None,
'overhead': None,
'preemption_policy': None,
'priority': None,
'priority_class_name': None,
'readiness_gates': None,
'restart_policy': None,
'runtime_class_name': None,
'scheduler_name': None,
'security_context': None,
'service_account': None,
'service_account_name': None,
'set_hostname_as_fqdn': None,
'share_process_namespace': None,
'subdomain': None,
'termination_grace_period_seconds': None,
'tolerations': None,
'topology_spread_constraints': None,
'volumes': None},
'status': None}
[2023-07-10T16:31:58.692+0000] {base.py:73} INFO - Using connection ID
'mysql_default' for task execution.
[2023-07-10T16:31:58.710+0000] {airflow_local_settings.py:80} INFO - size of
database method_3 mycompany_cdb_0000000009 is 171.16
[2023-07-10T16:31:58.711+0000] {airflow_local_settings.py:130} INFO -
database_size {'database': 'mycompany_cdb_0000000009', 'size_mb':
Decimal('171.16')} dag_id my_dag_3 task_id my_task_serial_1 dag_run None
[2023-07-10T16:31:58.711+0000] {airflow_local_settings.py:133} INFO -
database_size 171.16 dag_id my_dag_3 task_id my_task_serial_1 dag_run None
[2023-07-10T16:31:58.711+0000] {airflow_local_settings.py:140} INFO -
extra_annotations dag_id my_dag_3 task_id my_task_serial_1 dag_run None
{'database_id': 9, 'size_in_mb': 171.16, 'karpenter.sh/tag':
'mySchedulingAnnotation', 'hostname_of_task_instance_mutation_hook':
'airflow-web-56554b8989-h2pq9'}
[2023-07-10T16:31:58.712+0000] {app.py:1744} ERROR - Exception on /trigger
[POST]
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3799, in _bulk_save_mappings
persistence._bulk_insert(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
line 73, in _bulk_insert
connection = session_transaction.connection(base_mapper)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 624, in connection
self._assert_active()
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 617, in _assert_active
raise sa_exc.ResourceClosedError(closed_msg)
sqlalchemy.exc.ResourceClosedError: This transaction is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
2529, in wsgi_app
response = self.full_dispatch_request()
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
1825, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
1823, in full_dispatch_request
rv = self.dispatch_request()
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
1799, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/auth.py", line
47, in decorated
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py",
line 125, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/views.py", line
2052, in trigger
dag.create_dagrun(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line
2687, in create_dagrun
run.verify_integrity(session=session)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 73, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 962, in verify_integrity
self._create_task_instances(self.dag_id, tis_to_create, created_counts,
hook_is_noop, session=session)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 1150, in _create_task_instances
session.bulk_save_objects(tasks)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3595, in bulk_save_objects
self._bulk_save_mappings(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3811, in _bulk_save_mappings
transaction.rollback(_capture_exception=True)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
line 84, in __exit__
compat.raise_(value, with_traceback=traceback)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3811, in _bulk_save_mappings
transaction.rollback(_capture_exception=True)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 851, in rollback
self._assert_active(prepared_ok=True, rollback_ok=True)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 617, in _assert_active
raise sa_exc.ResourceClosedError(closed_msg)
sqlalchemy.exc.ResourceClosedError: This transaction is closed
10.10.10.60 - - [10/Jul/2023:16:31:58 +0000] "POST
/trigger?dag_id=my_dag_3&origin=%2Fdags%2Fmy_dag_3%2Fgrid HTTP/1.1" 500 1538
"http://10.10.10.60/trigger?dag_id=my_dag_3&origin=%2Fdags%2Fmy_dag_3%2Fgrid"
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/114.0.0.0 Safari/537.36"
10.10.10.60 - - [10/Jul/2023:16:31:59 +0000] "GET /health HTTP/1.1" 200 243
"-" "kube-probe/1.22"
10.10.10.60 - - [10/Jul/2023:16:31:59 +0000] "GET /health HTTP/1.1" 200 243
"-" "kube-probe/1.22"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]