potiuk opened a new issue, #24466: URL: https://github.com/apache/airflow/issues/24466
### Discussed in https://github.com/apache/airflow/discussions/24462 UPDATED with logs from @karakanb after 2.3.2 migration <div type='discussions-op-text'> <sup>Originally posted by **karakanb** May 26, 2022</sup> ### Apache Airflow version 2.3.2 ### What happened Hi there, this one is a bit of a weird one to reproduce, but I'll try my best to giive as much information as possible. ### General Information First of all, here's some list information: - Airflow version: v2.2.5 - Deployed on k8s with the user-community helm chart: - 2 scheduler pods - 5 worker pods - 1 flower pod - 2 web pods - Using managed Redis from DigitalOcean - Executor: CeleryKubernetesExecutor - Deployed on DigitalOcean Managed Kubernetes - Uses DigitalOcean Managed Postgres - I am using the official Airflow Docker images - There are no spikes in the DB metrics, Kubernetes cluster, or anything else that I could find. These are my relevant env variables: ``` AIRFLOW__CELERY__WORKER_AUTOSCALE=8,4 AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT=64800 AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags/repo/ AIRFLOW__CORE__EXECUTOR=CeleryKubernetesExecutor AIRFLOW__CORE__LOAD_EXAMPLES=False AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1 AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG=15 AIRFLOW__CORE__PARALLELISM=30 AIRFLOW__CORE__SECURE_MODE=True AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH=repo AIRFLOW__KUBERNETES__DELETE_WORKER_PODS=True AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM=airflow-v2-logs AIRFLOW__KUBERNETES__NAMESPACE=airflow AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE=/opt/airflow/pod_templates/pod_template.yaml AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE=20 AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=/opt/airflow/logs/scheduler AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=120 AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=30 AIRFLOW__WEBSERVER__EXPOSE_CONFIG=False AIRFLOW__WEBSERVER__RBAC=True AIRFLOW__WEBSERVER__WORKER_CLASS=gevent AIRFLOW_HOME=/opt/airflow AIRFLOW_INSTALLATION_METHOD=apache-airflow AIRFLOW_PIP_VERSION=21.3.1 AIRFLOW_USER_HOME_DIR=/home/airflow AIRFLOW_VERSION=2.2.5 ``` The issue that I will be describing here started happening a week ago after I have moved from KubernetesExecutor to CeleryKubernetesExecutor, so it must have something to do with it. ### Problem Statement I have some DAGs that have some long-running tasks: be it sensors that take hours to complete, or large SQL queries that take a very long time. Given that the sensors are waiting hours in many cases, we use `reschedule` for the sensors; however, the long running SQL queries cannot be executed that way unfortunately, therefore the tasks stay open. Here's a sample log to show how the logs look when a query is executed successfully: ``` [2022-05-26, 05:25:41 ] {cursor.py:705} INFO - query: [SELECT * FROM users WHERE...] [2022-05-26, 05:57:22 ] {cursor.py:729} INFO - query execution done ``` Here's a sample log for a task that started at `2022-05-26, 05:25:37`, that actually demonstrates the problem where the task runs for a longer time: ``` [2022-05-26, 05:57:22 ] {cursor.py:705} INFO - query: [----- CREATE OR REPLACE TABLE table1 AS WITH users AS ( ...] [2022-05-26, 06:59:41 ] {taskinstance.py:1033} INFO - Dependencies not met for <TaskInstance: mycompany.task1 scheduled__2022-05-25T01:00:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state. [2022-05-26, 06:59:41 ] {taskinstance.py:1033} INFO - Dependencies not met for <TaskInstance: mycompany.task1 scheduled__2022-05-25T01:00:00+00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state [2022-05-26, 06:59:41 ] {local_task_job.py:99} INFO - Task is not able to be run ``` Apparently, when the task runs for a longer time, it is being killed. It is not just happening with a single instance time, but with many others, therefore it is not an operator-specific issue. There are no timeouts, and no additional configuration defined on the individual tasks. Some additional interesting observations: - For all those tasks that are killed, I am seeing the same log: `Task is not able to be run` - For these tasks, the retry counts are going above the `retries` being set for the DAG as well. - The DAG has 3 retries configured, and there'll be usually 4 instances running. - This smells like a race condition somewhere, but not sure. Unfortunately, I don't have the scheduler logs, but I am on the lookout for them. As I have mentioned, this has only started happening after I switched to `CeleryKubernetesExecutor`. I'd love to investigate this further, and it is causing a lot of pain now so I might need to get back to Kubernetes Executor, but I really don't want to given that KubernetesExecutor is much slower than `CeleryKubernetesExecutor` due to `git clone` happening on every task. Let me know if I can provide additional information, I am trying to find more patterns and details around this so that we can fix this issue, so any leads around what should be looked at is much appreciated. ## More info from the discussion: @pingzh I don't have the zombies_killed metric in my /metrics endpoint, not sure. @MattiaGallegati thanks a lot for the information. I haven't observed the issue for the past 3 days after the upgrade, I'll keep observing and report here. I am seeing the issue much rarer than before, but it still happens after the upgrade. Here's one that has failed: ```` *** Reading local file: /opt/airflow/logs/dag_id=company/run_id=scheduled__2022-06-13T01:00:00+00:00/task_id=my_task_id/attempt=1.log [2022-06-14, 04:10:00 ] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [queued]> [2022-06-14, 04:10:00 ] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [queued]> [2022-06-14, 04:10:00 ] {taskinstance.py:1356} INFO - -------------------------------------------------------------------------------- [2022-06-14, 04:10:00 ] {taskinstance.py:1357} INFO - Starting attempt 1 of 4 [2022-06-14, 04:10:00 ] {taskinstance.py:1358} INFO - -------------------------------------------------------------------------------- [2022-06-14, 04:10:00 ] {taskinstance.py:1377} INFO - Executing <Task(SnowflakeOperator): my_task_id> on 2022-06-13 01:00:00+00:00 [2022-06-14, 04:10:00 ] {standard_task_runner.py:52} INFO - Started process 982 to run task [2022-06-14, 04:10:00 ] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'company', 'my_task_id', 'scheduled__2022-06-13T01:00:00+00:00', '--job-id', '182516', '--raw', '--subdir', 'DAGS_FOLDER/dag_v3.py', '--cfg-path', '/tmp/tmpckf3rysy', '--error-file', '/tmp/tmpzqc4fc0m'] [2022-06-14, 04:10:00 ] {standard_task_runner.py:80} INFO - Job 182516: Subtask my_task_id [2022-06-14, 04:10:00 ] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:525: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config. option = self._get_environment_variables(deprecated_key, deprecated_section, key, section) [2022-06-14, 04:10:01 ] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:525: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config. option = self._get_environment_variables(deprecated_key, deprecated_section, key, section) [2022-06-14, 04:10:01 ] {task_command.py:370} INFO - Running <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [running]> on host airflow-v2-worker-5.airflow-v2-worker.airflow.svc.cluster.local [2022-06-14, 04:10:01 ] {taskinstance.py:1569} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=company AIRFLOW_CTX_TASK_ID=my_task_id AIRFLOW_CTX_EXECUTION_DATE=2022-06-13T01:00:00+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-06-13T01:00:00+00:00 [2022-06-14, 04:10:01 ] {snowflake.py:118} INFO - Executing: <some sql statement here> [2022-06-14, 04:10:01 ] {base.py:68} INFO - Using connection ID 'my-connection-id' for task execution. [2022-06-14, 04:10:01 ] {connection.py:257} INFO - Snowflake Connector for Python Version: 2.7.8, Python Version: 3.8.13, Platform: Linux-5.10.0-0.bpo.9-amd64-x86_64-with-glibc2.2.5 [2022-06-14, 04:10:01 ] {connection.py:876} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity. [2022-06-14, 04:10:01 ] {connection.py:894} INFO - Setting use_openssl_only mode to False [2022-06-14, 04:10:02 ] {cursor.py:710} INFO - query: [<some sql statement here>] [2022-06-14, 04:10:02 ] {cursor.py:734} INFO - query execution done [2022-06-14, 04:10:02 ] {snowflake.py:324} INFO - Running statement: <some sql statement here> [2022-06-14, 04:10:02 ] {cursor.py:710} INFO - query: [<some sql statement here>] [2022-06-14, 04:10:02 ] {cursor.py:734} INFO - query execution done [2022-06-14, 04:10:02 ] {snowflake.py:334} INFO - Statement execution info - {'status': 'Statement executed successfully.'} [2022-06-14, 04:10:02 ] {snowflake.py:338} INFO - Rows affected: 1 [2022-06-14, 04:10:02 ] {snowflake.py:339} INFO - Snowflake query id: <some-uuid-here> [2022-06-14, 04:10:02 ] {snowflake.py:324} INFO - Running statement: <some sql statement here> [2022-06-14, 04:10:02 ] {cursor.py:710} INFO - query: [<some sql statement here>] [2022-06-14, 04:10:03 ] {cursor.py:734} INFO - query execution done [2022-06-14, 04:10:03 ] {snowflake.py:334} INFO - Statement execution info - {'status': 'some_table already exists, statement succeeded.'} [2022-06-14, 04:10:03 ] {snowflake.py:338} INFO - Rows affected: 1 [2022-06-14, 04:10:03 ] {snowflake.py:339} INFO - Snowflake query id: <some-uuid-here> [2022-06-14, 04:10:03 ] {snowflake.py:324} INFO - Running statement: <some sql statement here> [2022-06-14, 04:10:03 ] {cursor.py:710} INFO - query: [<some sql statement here>] [2022-06-14, 04:10:08 ] {cursor.py:734} INFO - query execution done [2022-06-14, 04:10:08 ] {snowflake.py:334} INFO - Statement execution info - {'number of rows deleted': 562} [2022-06-14, 04:10:08 ] {snowflake.py:338} INFO - Rows affected: 562 [2022-06-14, 04:10:08 ] {snowflake.py:339} INFO - Snowflake query id: <some-uuid-here> [2022-06-14, 04:10:08 ] {snowflake.py:324} INFO - Running statement: <some sql statement here> [2022-06-14, 04:10:08 ] {cursor.py:710} INFO - query: [<some sql statement here>] [2022-06-14, 04:16:29 ] {taskinstance.py:1149} INFO - Dependencies not met for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state [2022-06-14, 04:16:29 ] {taskinstance.py:1149} INFO - Dependencies not met for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state. [2022-06-14, 04:16:29 ] {local_task_job.py:101} INFO - Task is not able to be **run** ``` ### What you think should happen instead The tasks should keep running until they are finished. ### How to reproduce I really don't know, sorry. I have tried my best to explain the situation above. ### Operating System Debian GNU/Linux 10 (buster) ### Versions of Apache Airflow Providers ``` apache-airflow-providers-amazon==3.2.0 apache-airflow-providers-celery==2.1.3 apache-airflow-providers-cncf-kubernetes==3.0.0 apache-airflow-providers-docker==2.5.2 apache-airflow-providers-elasticsearch==2.2.0 apache-airflow-providers-ftp==2.1.2 apache-airflow-providers-google==6.7.0 apache-airflow-providers-grpc==2.0.4 apache-airflow-providers-hashicorp==2.1.4 apache-airflow-providers-http==2.1.2 apache-airflow-providers-imap==2.2.3 apache-airflow-providers-microsoft-azure==3.7.2 apache-airflow-providers-microsoft-mssql==2.0.1 apache-airflow-providers-mysql==2.2.3 apache-airflow-providers-odbc==2.0.4 apache-airflow-providers-postgres==4.1.0 apache-airflow-providers-redis==2.0.4 apache-airflow-providers-sendgrid==2.0.4 apache-airflow-providers-sftp==2.5.2 apache-airflow-providers-slack==4.2.3 apache-airflow-providers-snowflake==2.3.0 apache-airflow-providers-sqlite==2.1.3 apache-airflow-providers-ssh==2.3.0 apache-airflow-providers-tableau==2.1.2 ``` ### Deployment Other 3rd-party Helm chart ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) </div> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
