nehemiascr commented on issue #32375:
URL: https://github.com/apache/airflow/issues/32375#issuecomment-1630675549

   Jens, thank you for your response
   I tried with Airflow 2.6.2 and through the UI, same error
   I noticed that the error occurs after reading from an external database, I 
get a MySQL database connection from an Airflow Connection, query this database 
and then the error happens, I am providing some code of what i am doing that 
causes the issue.
   I created a simple Dag that can run tasks in parallel and serially, when I 
run the dag with 20 tasks serially and 20 tasks in parallel but without reading 
the external database, updating the `pod_override` configuration of the tasks 
succeeds; but when running the same dag with 1 task in parallel and 1 task 
serially (2 serial tasks) and reading my database, then it fails.
   Also I am trying 3 different query methods, they all fail
   
   **airflow_local_settings.py:**
   ``` python
   import logging
   import socket
   from typing import Dict
   from airflow.models import TaskInstance, Connection
   from airflow.exceptions import AirflowNotFoundException
   from sqlalchemy import create_engine, exc
   
   logger = logging.getLogger(__name__)
   
   def get_customer_database_size_method_0(db_id: int) -> Dict:
       """
       Without querying database, made up size of 123.45
       """
       result = {"database": f"mycompany_cdb_{db_id:010d}", "size_mb":123.45}
       logger.info(f"size of database method_1 {result['database']} is 
{result['size_mb']}")
       return result
   
   def get_customer_database_size_method_1(db_id: int) -> Dict:
       """
       Instantiate SQLAlchemy engine from Airflow Connection uri
       create a connection, run query, close connection
       """
       connection_name = "mysql_default"
   
       try:
           conn = Connection.get_connection_from_secrets(connection_name)
       except AirflowNotFoundException as e:
           logger.exception(e)
           return {"database": f"mycompany_cdb_{db_id:010d}", "size_mb":0}
   
       engine = create_engine(conn.get_uri())
       stmt = f"SELECT table_schema AS 'database', ROUND(SUM(data_length + 
index_length) / 1024 / 1024, 2) AS 'size_mb' FROM information_schema.tables 
WHERE table_schema = 'mycompany_cdb_{db_id:010d}'"
   
       try:
           c = engine.connect()
           result = dict(c.execute(stmt).first())
           c.close()
       except exc.DBAPIError as e:
           if e.connection_invalidated:
               print("Connection was invalidated!")
   
       logger.info(f"size of database method_1 {result['database']} is 
{result['size_mb']}")
       return result
   
   def get_customer_database_size_method_2(db_id: int) -> Dict:
       """
       Instantiate SQLAlchemy engine from Airflow Connection uri
       query engine
       """
       connection_name = "mysql_default"
   
       try:
           conn = Connection.get_connection_from_secrets(connection_name)
       except AirflowNotFoundException as e:
           logger.exception(e)
           return {"database": f"mycompany_cdb_{db_id:010d}", "size_mb":0}
   
       engine = create_engine(conn.get_uri())
       stmt = f"SELECT table_schema AS 'database', ROUND(SUM(data_length + 
index_length) / 1024 / 1024, 2) AS 'size_mb' FROM information_schema.tables 
WHERE table_schema = 'mycompany_cdb_{db_id:010d}'"
       result = dict(engine.execute(stmt).first())
       logger.info(f"size of database method_2 {result['database']} is 
{result['size_mb']}")
       return result
   
   def get_customer_database_size_method_3(db_id: int) -> Dict:
       """
       MyCompanyDatabaseManager
       """
       from mycompany_core.db_engine import MyCompanyDatabaseManager, get_engine
       from mycompanyet.mysql_engines import DRIVER_OVERRIDE, QUERY_OVERRIDE
       from airflow_configs.hooks import get_mysql_hook
       db_manager = MyCompanyDatabaseManager(
                   get_engine(get_mysql_hook().get_sqlalchemy_engine().url),
                   drivername_override=DRIVER_OVERRIDE,
                   query_override=QUERY_OVERRIDE,
               )
       engine = db_manager.get_engine()
       stmt = f"SELECT table_schema AS 'database', ROUND(SUM(data_length + 
index_length) / 1024 / 1024, 2) AS 'size_mb' FROM information_schema.tables 
WHERE table_schema = 'mycompany_cdb_{db_id:010d}'"
       result = dict(engine.execute(stmt).first())
   
       logger.info(f"size of database method_3 {result['database']} is 
{result['size_mb']}")
       return result
   
   def task_instance_mutation_hook(ti: TaskInstance):
   
       logger.info(f"on task_instance_mutation_hook {ti.dag_id} task_id 
{ti.task_id} dag_run {ti.dag_run}")
       hostname = socket.gethostname()
       logger.info(f"hostname {hostname} {ti.dag_id} task_id {ti.task_id} 
dag_run {ti.dag_run}")
   
       if not ti.executor_config:
           logger.info(f"no ti.executor_config :::: dag_id {ti.dag_id} task_id 
{ti.task_id} dag_run {ti.dag_run}")
           return 
   
       if not ti.executor_config.get('pod_override', None):
           logger.info(f"no ti.executor_config['pod_override'] :::: dag_id 
{ti.dag_id} task_id {ti.task_id} dag_run {ti.dag_run}")
           return 
   
       if ti.dag_run:
           logger.info(f"dag_run for dag_id {ti.dag_id} task_id {ti.task_id} 
dag_run {ti.dag_run}")
           if ti.dag_run.conf:
               dag_run_conf = ti.dag_run.conf
               logger.info(f"dag_run_conf dag_id {ti.dag_id} task_id 
{ti.task_id} dag_run {ti.dag_run} dag_run_conf {dag_run_conf}")
               database_id = dag_run_conf["database_id"]
               logger.info(f"dag_run_conf database_id {database_id} dag_id 
{ti.dag_id} task_id {ti.task_id} dag_run {ti.dag_run} dag_run_conf 
{dag_run_conf}")
           else:
               database_id = ti.run_id.split('-')[-1]
               logger.info(f"dag_run but no dag_run.conf database_id 
{database_id} dag_id {ti.dag_id} task_id {ti.task_id} dag_run {ti.dag_run}")
       else:
           database_id = ti.run_id.split('-')[-1]
           logger.info(f"no dag_run database_id {database_id} dag_id 
{ti.dag_id} task_id {ti.task_id} dag_run {ti.dag_run}")
   
       if ti.dag_id.startswith('my_dag'):
           if ti.task_id.startswith("my_task"):
               database_id = int(database_id)
               logger.info(f"database_id {database_id}")
               logger.info(f"pod_override before update dag_id {ti.dag_id} 
task_id {ti.task_id} dag_run {ti.dag_run} {ti.executor_config['pod_override']}")
               
               # depending on the dag, we try a different database query method
               my_dag_number = int(ti.dag_id.split('_')[-1])
               if my_dag_number == 0:
                   database_size = 
get_customer_database_size_method_0(database_id) # fake query
               elif my_dag_number == 1:
                   database_size = 
get_customer_database_size_method_1(database_id) 
               elif my_dag_number == 2:
                   database_size = 
get_customer_database_size_method_2(database_id)
               elif my_dag_number == 3:
                   database_size = 
get_customer_database_size_method_3(database_id) 
               else:
                   database_size = {"database": 
f"mycompany_cdb_{database_id:010d}", "size_mb":9.9}
   
               logger.info(f"database_size {database_size} dag_id {ti.dag_id} 
task_id {ti.task_id} dag_run {ti.dag_run}")
               database_size = float(database_size["size_mb"] or 0)
   
               logger.info(f"database_size {database_size} dag_id {ti.dag_id} 
task_id {ti.task_id} dag_run {ti.dag_run}")
               extra_annotations = {
                   "database_id": database_id,
                   "size_in_mb": float(database_size),
                   "karpenter.sh/tag": "mySchedulingAnnotation",
                   "hostname_of_task_instance_mutation_hook": hostname,
               }
               logger.info(f"extra_annotations dag_id {ti.dag_id} task_id 
{ti.task_id} dag_run {ti.dag_run}  {extra_annotations}")
               
ti.executor_config["pod_override"].metadata.annotations.update(extra_annotations)
               # logger.info(f"pod_override after update dag_id {ti.dag_id} 
task_id {ti.task_id} dag_run {ti.dag_run} {ti.executor_config['pod_override']}")
       else:
           extra_annotations = {
                   "hostname_of_task_instance_mutation_hook": hostname,
                   "database_id": database_id,
               }
           logger.info(f"extra_annotations dag_id {ti.dag_id} task_id 
{ti.task_id} dag_run {ti.dag_run}  {extra_annotations}")
           
ti.executor_config["pod_override"].metadata.annotations.update(extra_annotations)
           # logger.info(f"pod_override after update dag_id {ti.dag_id} task_id 
{ti.task_id} dag_run {ti.dag_run} {ti.executor_config['pod_override']}")
   ```
   
   **my_dag_0.py**:
   ``` python
   import json
   import logging
   import socket
   
   import airflow
   from airflow.models import DAG
   from airflow.operators.empty import EmptyOperator
   from airflow.operators.python import PythonOperator
   from airflow.utils.trigger_rule import TriggerRule
   from kubernetes.client import models as k8s
   
   
   log = logging.getLogger(__name__)
   
   
   def get_node_config():
       # Use kubernetes.client.models.V1Affinity to define node affinity
       executor_config = {
           "pod_override": k8s.V1Pod(
               metadata=k8s.V1ObjectMeta(
                   annotations={"karpenter.sh/do-not-evict": "true"}
               ),
               spec=k8s.V1PodSpec(
                   containers=[
                       k8s.V1Container(
                           name="base",
                           
resources=k8s.V1ResourceRequirements(limits={"memory": " 4G"}),
                       )
                   ],
                   affinity=k8s.V1Affinity(
                       node_affinity=k8s.V1NodeAffinity(
                           
required_during_scheduling_ignored_during_execution=k8s.V1NodeSelector(
                               node_selector_terms=[
                                   k8s.V1NodeSelectorTerm(
                                       match_expressions=[
                                           k8s.V1NodeSelectorRequirement(
                                               key="size", operator="In", 
values=["standard"]
                                           ),
                                       ]
                                   )
                               ]
                           )
                       )
                   ),
               ),
           ),
       }
       return executor_config
   
   
   def my_task_callable(**kwargs):
       log.info(f"hostname: {socket.gethostname()}")
       config = kwargs["dag_run"].conf
       log.info(f"config {config}")
       airflow_dag_run_id = kwargs["dag_run"].run_id
       log.info(f"airflow_dag_run_id {airflow_dag_run_id}")
       
       log.info(f"addding 2 + 2")
       result = 2 + 2
       log.info(f"which results in {result}")
       
   
   default_args = {"owner": "airflow", "start_date": 
airflow.utils.dates.days_ago(1), "retries": 3}
   
   """
   {
       "database_id": 9
   }
   """
   if __name__.startswith("unusual_prefix"):
       with DAG(
           dag_id="my_dag_0",
           catchup=False,
           default_args=default_args,
           schedule_interval=None,
       ) as dag:
           begin_my_dag_op = EmptyOperator(task_id="begin_my_dag_op")
           end_my_task_parallel_op = 
EmptyOperator(task_id="end_my_task_parallel_op", 
trigger_rule=TriggerRule.NONE_FAILED)
           end_my_task_serial_op = 
EmptyOperator(task_id="end_my_task_serial_op", 
trigger_rule=TriggerRule.NONE_FAILED)
           end_my_dag_op = EmptyOperator(task_id="end_my_dag_op")
   
           my_task_parallel_op = dict()
           num_of_jobs = 20
           for job_num in range(1, num_of_jobs + 1):
               my_task_parallel_op[job_num] = PythonOperator(
                   task_id=f"my_task_parallel_{job_num}",
                   python_callable=my_task_callable,
                   provide_context=True,
                   executor_config=get_node_config(),
               )
               begin_my_dag_op >> my_task_parallel_op[job_num] >> 
end_my_task_parallel_op
   
           last_serial_task = end_my_task_parallel_op
           my_task_serial_op = dict()
           num_of_jobs = 20
           for job_num in range(1, num_of_jobs + 1):
               my_task_serial_op[job_num] = PythonOperator(
                   task_id=f"my_task_serial_{job_num}",
                   python_callable=my_task_callable,
                   provide_context=True,
                   executor_config=get_node_config(),
               )
               last_serial_task >> my_task_serial_op[job_num] >> 
end_my_task_serial_op
               last_serial_task = my_task_serial_op[job_num]
   
           end_my_task_serial_op >> end_my_dag_op
   ```
   <img width="1335" alt="my_dag_0" 
src="https://github.com/apache/airflow/assets/1874087/50d8cd4a-dc85-41b2-a8ba-4161b38cf4e8";>
   
   **my_dag_1.py**, **my_dag_2.py** and **my_dag_3.py**:
   ``` python
   ...
           num_of_jobs = 1
   ...
   ```
   <img width="1122" alt="my_dag_1-my_dag_2-my_dag_3" 
src="https://github.com/apache/airflow/assets/1874087/11c10f11-50ba-4dff-8546-f995ab613197";>
   
   The difference between my_dag_0 and the others is just the `num_of_jobs` 
variable which is set to 20 for my_dag_0 and 1 for the others, the idea of 
my_dag_0 is to show that, without reading the database, updating the 
`pod_override` configuration of various tasks succeeds.
   
   Log excerpt of when it fails:
   ``` Log
   10.10.10.60 - - [10/Jul/2023:16:31:49 +0000] "GET /health HTTP/1.1" 200 243 
"-" "kube-probe/1.22"
   10.10.10.60 - - [10/Jul/2023:16:31:49 +0000] "GET /health HTTP/1.1" 200 243 
"-" "kube-probe/1.22"
   
   
   
   
   
   
   
   
   
   [2023-07-10T16:31:57.588+0000] {airflow_local_settings.py:85} INFO - on 
task_instance_mutation_hook my_dag_3 task_id begin_my_dag_op dag_run None
   [2023-07-10T16:31:57.589+0000] {airflow_local_settings.py:87} INFO - 
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id begin_my_dag_op dag_run 
None
   [2023-07-10T16:31:57.589+0000] {airflow_local_settings.py:90} INFO - no 
ti.executor_config :::: dag_id my_dag_3 task_id begin_my_dag_op dag_run None
   [2023-07-10T16:31:57.590+0000] {airflow_local_settings.py:85} INFO - on 
task_instance_mutation_hook my_dag_3 task_id end_my_task_parallel_op dag_run 
None
   [2023-07-10T16:31:57.590+0000] {airflow_local_settings.py:87} INFO - 
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id end_my_task_parallel_op 
dag_run None
   [2023-07-10T16:31:57.590+0000] {airflow_local_settings.py:90} INFO - no 
ti.executor_config :::: dag_id my_dag_3 task_id end_my_task_parallel_op dag_run 
None
   [2023-07-10T16:31:57.590+0000] {airflow_local_settings.py:85} INFO - on 
task_instance_mutation_hook my_dag_3 task_id end_my_task_serial_op dag_run None
   [2023-07-10T16:31:57.591+0000] {airflow_local_settings.py:87} INFO - 
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id end_my_task_serial_op 
dag_run None
   [2023-07-10T16:31:57.591+0000] {airflow_local_settings.py:90} INFO - no 
ti.executor_config :::: dag_id my_dag_3 task_id end_my_task_serial_op dag_run 
None
   [2023-07-10T16:31:57.591+0000] {airflow_local_settings.py:85} INFO - on 
task_instance_mutation_hook my_dag_3 task_id end_my_dag_op dag_run None
   [2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:87} INFO - 
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id end_my_dag_op dag_run 
None
   [2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:90} INFO - no 
ti.executor_config :::: dag_id my_dag_3 task_id end_my_dag_op dag_run None
   [2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:85} INFO - on 
task_instance_mutation_hook my_dag_3 task_id my_task_parallel_1 dag_run None
   [2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:87} INFO - 
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id my_task_parallel_1 
dag_run None
   [2023-07-10T16:31:57.592+0000] {airflow_local_settings.py:109} INFO - no 
dag_run database_id 9 dag_id my_dag_3 task_id my_task_parallel_1 dag_run None
   [2023-07-10T16:31:57.593+0000] {airflow_local_settings.py:114} INFO - 
database_id 9
   [2023-07-10T16:31:57.596+0000] {airflow_local_settings.py:115} INFO - 
pod_override before update dag_id my_dag_3 task_id my_task_parallel_1 dag_run 
None {'api_version': None,
    'kind': None,
    'metadata': {'annotations': {'karpenter.sh/do-not-evict': 'true'},
                 'cluster_name': None,
                 'creation_timestamp': None,
                 'deletion_grace_period_seconds': None,
                 'deletion_timestamp': None,
                 'finalizers': None,
                 'generate_name': None,
                 'generation': None,
                 'labels': None,
                 'managed_fields': None,
                 'name': None,
                 'namespace': None,
                 'owner_references': None,
                 'resource_version': None,
                 'self_link': None,
                 'uid': None},
    'spec': {'active_deadline_seconds': None,
             'affinity': {'node_affinity': 
{'preferred_during_scheduling_ignored_during_execution': None,
                                            
'required_during_scheduling_ignored_during_execution': {'node_selector_terms': 
[{'match_expressions': [{'key': 'size',
                                                                                
                                                                    'operator': 
'In',
                                                                                
                                                                    'values': 
['standard']}],
                                                                                
                                             'match_fields': None}]}},
                          'pod_affinity': None,
                          'pod_anti_affinity': None},
             'automount_service_account_token': None,
             'containers': [{'args': None,
                             'command': None,
                             'env': None,
                             'env_from': None,
                             'image': None,
                             'image_pull_policy': None,
                             'lifecycle': None,
                             'liveness_probe': None,
                             'name': 'base',
                             'ports': None,
                             'readiness_probe': None,
                             'resources': {'limits': {'memory': ' 4G'},
                                           'requests': None},
                             'security_context': None,
                             'startup_probe': None,
                             'stdin': None,
                             'stdin_once': None,
                             'termination_message_path': None,
                             'termination_message_policy': None,
                             'tty': None,
                             'volume_devices': None,
                             'volume_mounts': None,
                             'working_dir': None}],
             'dns_config': None,
             'dns_policy': None,
             'enable_service_links': None,
             'ephemeral_containers': None,
             'host_aliases': None,
             'host_ipc': None,
             'host_network': None,
             'host_pid': None,
             'hostname': None,
             'image_pull_secrets': None,
             'init_containers': None,
             'node_name': None,
             'node_selector': None,
             'os': None,
             'overhead': None,
             'preemption_policy': None,
             'priority': None,
             'priority_class_name': None,
             'readiness_gates': None,
             'restart_policy': None,
             'runtime_class_name': None,
             'scheduler_name': None,
             'security_context': None,
             'service_account': None,
             'service_account_name': None,
             'set_hostname_as_fqdn': None,
             'share_process_namespace': None,
             'subdomain': None,
             'termination_grace_period_seconds': None,
             'tolerations': None,
             'topology_spread_constraints': None,
             'volumes': None},
    'status': None}
   [2023-07-10T16:31:58.076+0000] {environment.py:107} INFO - No environment 
configuration found.
   [2023-07-10T16:31:58.083+0000] {managed_identity.py:94} INFO - 
ManagedIdentityCredential will use IMDS
   [2023-07-10T16:31:58.397+0000] {selector_events.py:54} DEBUG - Using 
selector: EpollSelector
   [2023-07-10T16:31:58.619+0000] {base.py:73} INFO - Using connection ID 
'mysql_default' for task execution.
   [2023-07-10T16:31:58.635+0000] {base.py:73} INFO - Using connection ID 
'mysql_default' for task execution.
   [2023-07-10T16:31:58.679+0000] {airflow_local_settings.py:80} INFO - size of 
database method_3 mycompany_cdb_0000000009 is 171.16
   [2023-07-10T16:31:58.679+0000] {airflow_local_settings.py:130} INFO - 
database_size {'database': 'mycompany_cdb_0000000009', 'size_mb': 
Decimal('171.16')} dag_id my_dag_3 task_id my_task_parallel_1 dag_run None
   [2023-07-10T16:31:58.680+0000] {airflow_local_settings.py:133} INFO - 
database_size 171.16 dag_id my_dag_3 task_id my_task_parallel_1 dag_run None
   [2023-07-10T16:31:58.680+0000] {airflow_local_settings.py:140} INFO - 
extra_annotations dag_id my_dag_3 task_id my_task_parallel_1 dag_run None  
{'database_id': 9, 'size_in_mb': 171.16, 'karpenter.sh/tag': 
'mySchedulingAnnotation', 'hostname_of_task_instance_mutation_hook': 
'airflow-web-56554b8989-h2pq9'}
   [2023-07-10T16:31:58.681+0000] {airflow_local_settings.py:85} INFO - on 
task_instance_mutation_hook my_dag_3 task_id my_task_serial_1 dag_run None
   [2023-07-10T16:31:58.681+0000] {airflow_local_settings.py:87} INFO - 
hostname airflow-web-56554b8989-h2pq9 my_dag_3 task_id my_task_serial_1 dag_run 
None
   [2023-07-10T16:31:58.681+0000] {airflow_local_settings.py:109} INFO - no 
dag_run database_id 9 dag_id my_dag_3 task_id my_task_serial_1 dag_run None
   [2023-07-10T16:31:58.681+0000] {airflow_local_settings.py:114} INFO - 
database_id 9
   [2023-07-10T16:31:58.685+0000] {airflow_local_settings.py:115} INFO - 
pod_override before update dag_id my_dag_3 task_id my_task_serial_1 dag_run 
None {'api_version': None,
    'kind': None,
    'metadata': {'annotations': {'karpenter.sh/do-not-evict': 'true'},
                 'cluster_name': None,
                 'creation_timestamp': None,
                 'deletion_grace_period_seconds': None,
                 'deletion_timestamp': None,
                 'finalizers': None,
                 'generate_name': None,
                 'generation': None,
                 'labels': None,
                 'managed_fields': None,
                 'name': None,
                 'namespace': None,
                 'owner_references': None,
                 'resource_version': None,
                 'self_link': None,
                 'uid': None},
    'spec': {'active_deadline_seconds': None,
             'affinity': {'node_affinity': 
{'preferred_during_scheduling_ignored_during_execution': None,
                                            
'required_during_scheduling_ignored_during_execution': {'node_selector_terms': 
[{'match_expressions': [{'key': 'size',
                                                                                
                                                                    'operator': 
'In',
                                                                                
                                                                    'values': 
['standard']}],
                                                                                
                                             'match_fields': None}]}},
                          'pod_affinity': None,
                          'pod_anti_affinity': None},
             'automount_service_account_token': None,
             'containers': [{'args': None,
                             'command': None,
                             'env': None,
                             'env_from': None,
                             'image': None,
                             'image_pull_policy': None,
                             'lifecycle': None,
                             'liveness_probe': None,
                             'name': 'base',
                             'ports': None,
                             'readiness_probe': None,
                             'resources': {'limits': {'memory': ' 4G'},
                                           'requests': None},
                             'security_context': None,
                             'startup_probe': None,
                             'stdin': None,
                             'stdin_once': None,
                             'termination_message_path': None,
                             'termination_message_policy': None,
                             'tty': None,
                             'volume_devices': None,
                             'volume_mounts': None,
                             'working_dir': None}],
             'dns_config': None,
             'dns_policy': None,
             'enable_service_links': None,
             'ephemeral_containers': None,
             'host_aliases': None,
             'host_ipc': None,
             'host_network': None,
             'host_pid': None,
             'hostname': None,
             'image_pull_secrets': None,
             'init_containers': None,
             'node_name': None,
             'node_selector': None,
             'os': None,
             'overhead': None,
             'preemption_policy': None,
             'priority': None,
             'priority_class_name': None,
             'readiness_gates': None,
             'restart_policy': None,
             'runtime_class_name': None,
             'scheduler_name': None,
             'security_context': None,
             'service_account': None,
             'service_account_name': None,
             'set_hostname_as_fqdn': None,
             'share_process_namespace': None,
             'subdomain': None,
             'termination_grace_period_seconds': None,
             'tolerations': None,
             'topology_spread_constraints': None,
             'volumes': None},
    'status': None}
   [2023-07-10T16:31:58.692+0000] {base.py:73} INFO - Using connection ID 
'mysql_default' for task execution.
   [2023-07-10T16:31:58.710+0000] {airflow_local_settings.py:80} INFO - size of 
database method_3 mycompany_cdb_0000000009 is 171.16
   [2023-07-10T16:31:58.711+0000] {airflow_local_settings.py:130} INFO - 
database_size {'database': 'mycompany_cdb_0000000009', 'size_mb': 
Decimal('171.16')} dag_id my_dag_3 task_id my_task_serial_1 dag_run None
   [2023-07-10T16:31:58.711+0000] {airflow_local_settings.py:133} INFO - 
database_size 171.16 dag_id my_dag_3 task_id my_task_serial_1 dag_run None
   [2023-07-10T16:31:58.711+0000] {airflow_local_settings.py:140} INFO - 
extra_annotations dag_id my_dag_3 task_id my_task_serial_1 dag_run None  
{'database_id': 9, 'size_in_mb': 171.16, 'karpenter.sh/tag': 
'mySchedulingAnnotation', 'hostname_of_task_instance_mutation_hook': 
'airflow-web-56554b8989-h2pq9'}
   [2023-07-10T16:31:58.712+0000] {app.py:1744} ERROR - Exception on /trigger 
[POST]
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3799, in _bulk_save_mappings
       persistence._bulk_insert(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 73, in _bulk_insert
       connection = session_transaction.connection(base_mapper)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 624, in connection
       self._assert_active()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 617, in _assert_active
       raise sa_exc.ResourceClosedError(closed_msg)
   sqlalchemy.exc.ResourceClosedError: This transaction is closed
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
2529, in wsgi_app
       response = self.full_dispatch_request()
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1825, in full_dispatch_request
       rv = self.handle_user_exception(e)
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1823, in full_dispatch_request
       rv = self.dispatch_request()
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1799, in dispatch_request
       return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/auth.py", line 
47, in decorated
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", 
line 125, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/views.py", line 
2052, in trigger
       dag.create_dagrun(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 
2687, in create_dagrun
       run.verify_integrity(session=session)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 73, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 962, in verify_integrity
       self._create_task_instances(self.dag_id, tis_to_create, created_counts, 
hook_is_noop, session=session)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 1150, in _create_task_instances
       session.bulk_save_objects(tasks)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3595, in bulk_save_objects
       self._bulk_save_mappings(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3811, in _bulk_save_mappings
       transaction.rollback(_capture_exception=True)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
 line 84, in __exit__
       compat.raise_(value, with_traceback=traceback)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 207, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3811, in _bulk_save_mappings
       transaction.rollback(_capture_exception=True)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 851, in rollback
       self._assert_active(prepared_ok=True, rollback_ok=True)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 617, in _assert_active
       raise sa_exc.ResourceClosedError(closed_msg)
   sqlalchemy.exc.ResourceClosedError: This transaction is closed
   10.10.10.60 - - [10/Jul/2023:16:31:58 +0000] "POST 
/trigger?dag_id=my_dag_3&origin=%2Fdags%2Fmy_dag_3%2Fgrid HTTP/1.1" 500 1538 
"http://10.10.10.60/trigger?dag_id=my_dag_3&origin=%2Fdags%2Fmy_dag_3%2Fgrid"; 
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, 
like Gecko) Chrome/114.0.0.0 Safari/537.36"
   10.10.10.60 - - [10/Jul/2023:16:31:59 +0000] "GET /health HTTP/1.1" 200 243 
"-" "kube-probe/1.22"
   10.10.10.60 - - [10/Jul/2023:16:31:59 +0000] "GET /health HTTP/1.1" 200 243 
"-" "kube-probe/1.22"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to