potiuk opened a new issue, #24466:
URL: https://github.com/apache/airflow/issues/24466

   ### Discussed in https://github.com/apache/airflow/discussions/24462
   
   UPDATED with logs from @karakanb after 2.3.2 migration 
   
   
   <div type='discussions-op-text'>
   
   <sup>Originally posted by **karakanb** May 26, 2022</sup>
   ### Apache Airflow version
   
   2.3.2
   
   ### What happened
   
   Hi there, this one is a bit of a weird one to reproduce, but I'll try my 
best to giive as much information as possible.
   
   
   ### General Information
   
   First of all, here's some list information:
   - Airflow version: v2.2.5
   - Deployed on k8s with the user-community helm chart:
     - 2 scheduler pods
     - 5 worker pods
     - 1 flower pod
     - 2 web pods
     - Using managed Redis from DigitalOcean
   - Executor: CeleryKubernetesExecutor
   - Deployed on DigitalOcean Managed Kubernetes
   - Uses DigitalOcean Managed Postgres
   - I am using the official Airflow Docker images
   - There are no spikes in the DB metrics, Kubernetes cluster, or anything 
else that I could find.
   
   These are my relevant env variables:
   ```
   AIRFLOW__CELERY__WORKER_AUTOSCALE=8,4
   AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT=64800
   AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags/repo/
   AIRFLOW__CORE__EXECUTOR=CeleryKubernetesExecutor
   AIRFLOW__CORE__LOAD_EXAMPLES=False
   AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1
   AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG=15
   AIRFLOW__CORE__PARALLELISM=30
   AIRFLOW__CORE__SECURE_MODE=True
   AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH=repo
   AIRFLOW__KUBERNETES__DELETE_WORKER_PODS=True
   AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM=airflow-v2-logs
   AIRFLOW__KUBERNETES__NAMESPACE=airflow
   
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE=/opt/airflow/pod_templates/pod_template.yaml
   AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE=20
   AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs
   
AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
   AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=/opt/airflow/logs/scheduler
   AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=120
   AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=30
   AIRFLOW__WEBSERVER__EXPOSE_CONFIG=False
   AIRFLOW__WEBSERVER__RBAC=True
   AIRFLOW__WEBSERVER__WORKER_CLASS=gevent
   AIRFLOW_HOME=/opt/airflow
   AIRFLOW_INSTALLATION_METHOD=apache-airflow
   AIRFLOW_PIP_VERSION=21.3.1
   AIRFLOW_USER_HOME_DIR=/home/airflow
   AIRFLOW_VERSION=2.2.5
   ```
   
   The issue that I will be describing here started happening a week ago after 
I have moved from KubernetesExecutor to CeleryKubernetesExecutor, so it must 
have something to do with it.
   
   ### Problem Statement
   I have some DAGs that have some long-running tasks: be it sensors that take 
hours to complete, or large SQL queries that take a very long time. Given that 
the sensors are waiting hours in many cases, we use `reschedule` for the 
sensors; however, the long running SQL queries cannot be executed that way 
unfortunately, therefore the tasks stay open.
   
   Here's a sample log to show how the logs look when a query is executed 
successfully:
   ```
   [2022-05-26, 05:25:41 ] {cursor.py:705} INFO - query: [SELECT * FROM users 
WHERE...]
   [2022-05-26, 05:57:22 ] {cursor.py:729} INFO - query execution done
   ```
   
   Here's a sample log for a task that started at `2022-05-26, 05:25:37`, that 
actually demonstrates the problem where the task runs for a longer time:
   ```
   [2022-05-26, 05:57:22 ] {cursor.py:705} INFO - query: [----- CREATE OR 
REPLACE TABLE table1 AS WITH users AS ( ...]
   [2022-05-26, 06:59:41 ] {taskinstance.py:1033} INFO - Dependencies not met 
for <TaskInstance: mycompany.task1 scheduled__2022-05-25T01:00:00+00:00 
[running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' 
state.
   [2022-05-26, 06:59:41 ] {taskinstance.py:1033} INFO - Dependencies not met 
for <TaskInstance: mycompany.task1 scheduled__2022-05-25T01:00:00+00:00 
[running]>, dependency 'Task Instance Not Running' FAILED: Task is in the 
running state
   [2022-05-26, 06:59:41 ] {local_task_job.py:99} INFO - Task is not able to be 
run
   ```
   
   Apparently, when the task runs for a longer time, it is being killed. It is 
not just happening with a single instance time, but with many others, therefore 
it is not an operator-specific issue. There are no timeouts, and no additional 
configuration defined on the individual tasks.
   
   Some additional interesting observations:
   - For all those tasks that are killed, I am seeing the same log: `Task is 
not able to be run`
   - For these tasks, the retry counts are going above the `retries` being set 
for the DAG as well.
     - The DAG has 3 retries configured, and there'll be usually 4 instances 
running.
     - This smells like a race condition somewhere, but not sure. 
   
   Unfortunately, I don't have the scheduler logs, but I am on the lookout for 
them. 
   
   As I have mentioned, this has only started happening after I switched to 
`CeleryKubernetesExecutor`. I'd love to investigate this further, and it is 
causing a lot of pain now so I might need to get back to Kubernetes Executor, 
but I really don't want to given that KubernetesExecutor is much slower than 
`CeleryKubernetesExecutor` due to `git clone` happening on every task.
   
   Let me know if I can provide additional information, I am trying to find 
more patterns and details around this so that we can fix this issue, so any 
leads around what should be looked at is much appreciated.
   
   ## More info from the discussion: 
   
   @pingzh I don't have the zombies_killed metric in my /metrics endpoint, not 
sure.
   
   @MattiaGallegati thanks a lot for the information. I haven't observed the 
issue for the past 3 days after the upgrade, I'll keep observing and report 
here.
   
   I am seeing the issue much rarer than before, but it still happens after the 
upgrade. Here's one that has failed:
   
   ````
   *** Reading local file: 
/opt/airflow/logs/dag_id=company/run_id=scheduled__2022-06-13T01:00:00+00:00/task_id=my_task_id/attempt=1.log
   [2022-06-14, 04:10:00 ] {taskinstance.py:1159} INFO - Dependencies all met 
for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 
[queued]>
   [2022-06-14, 04:10:00 ] {taskinstance.py:1159} INFO - Dependencies all met 
for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 
[queued]>
   [2022-06-14, 04:10:00 ] {taskinstance.py:1356} INFO - 
   
--------------------------------------------------------------------------------
   [2022-06-14, 04:10:00 ] {taskinstance.py:1357} INFO - Starting attempt 1 of 4
   [2022-06-14, 04:10:00 ] {taskinstance.py:1358} INFO - 
   
--------------------------------------------------------------------------------
   [2022-06-14, 04:10:00 ] {taskinstance.py:1377} INFO - Executing 
<Task(SnowflakeOperator): my_task_id> on 2022-06-13 01:00:00+00:00
   [2022-06-14, 04:10:00 ] {standard_task_runner.py:52} INFO - Started process 
982 to run task
   [2022-06-14, 04:10:00 ] {standard_task_runner.py:79} INFO - Running: 
['airflow', 'tasks', 'run', 'company', 'my_task_id', 
'scheduled__2022-06-13T01:00:00+00:00', '--job-id', '182516', '--raw', 
'--subdir', 'DAGS_FOLDER/dag_v3.py', '--cfg-path', '/tmp/tmpckf3rysy', 
'--error-file', '/tmp/tmpzqc4fc0m']
   [2022-06-14, 04:10:00 ] {standard_task_runner.py:80} INFO - Job 182516: 
Subtask my_task_id
   [2022-06-14, 04:10:00 ] {warnings.py:109} WARNING - 
/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:525: 
DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the 
sql_alchemy_conn option in [database] - the old setting has been used, but 
please update your config.
     option = self._get_environment_variables(deprecated_key, 
deprecated_section, key, section)
   
   [2022-06-14, 04:10:01 ] {warnings.py:109} WARNING - 
/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:525: 
DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the 
sql_alchemy_conn option in [database] - the old setting has been used, but 
please update your config.
     option = self._get_environment_variables(deprecated_key, 
deprecated_section, key, section)
   
   [2022-06-14, 04:10:01 ] {task_command.py:370} INFO - Running <TaskInstance: 
company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [running]> on host 
airflow-v2-worker-5.airflow-v2-worker.airflow.svc.cluster.local
   [2022-06-14, 04:10:01 ] {taskinstance.py:1569} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=company
   AIRFLOW_CTX_TASK_ID=my_task_id
   AIRFLOW_CTX_EXECUTION_DATE=2022-06-13T01:00:00+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-06-13T01:00:00+00:00
   [2022-06-14, 04:10:01 ] {snowflake.py:118} INFO - Executing: <some sql 
statement here>
   [2022-06-14, 04:10:01 ] {base.py:68} INFO - Using connection ID 
'my-connection-id' for task execution.
   [2022-06-14, 04:10:01 ] {connection.py:257} INFO - Snowflake Connector for 
Python Version: 2.7.8, Python Version: 3.8.13, Platform: 
Linux-5.10.0-0.bpo.9-amd64-x86_64-with-glibc2.2.5
   [2022-06-14, 04:10:01 ] {connection.py:876} INFO - This connection is in 
OCSP Fail Open Mode. TLS Certificates would be checked for validity and 
revocation status. Any other Certificate Revocation related exceptions or OCSP 
Responder failures would be disregarded in favor of connectivity.
   [2022-06-14, 04:10:01 ] {connection.py:894} INFO - Setting use_openssl_only 
mode to False
   [2022-06-14, 04:10:02 ] {cursor.py:710} INFO - query: [<some sql statement 
here>]
   [2022-06-14, 04:10:02 ] {cursor.py:734} INFO - query execution done
   [2022-06-14, 04:10:02 ] {snowflake.py:324} INFO - Running statement: <some 
sql statement here>
   [2022-06-14, 04:10:02 ] {cursor.py:710} INFO - query: [<some sql statement 
here>]
   [2022-06-14, 04:10:02 ] {cursor.py:734} INFO - query execution done
   [2022-06-14, 04:10:02 ] {snowflake.py:334} INFO - Statement execution info - 
{'status': 'Statement executed successfully.'}
   [2022-06-14, 04:10:02 ] {snowflake.py:338} INFO - Rows affected: 1
   [2022-06-14, 04:10:02 ] {snowflake.py:339} INFO - Snowflake query id: 
<some-uuid-here>
   [2022-06-14, 04:10:02 ] {snowflake.py:324} INFO - Running statement: <some 
sql statement here>
   [2022-06-14, 04:10:02 ] {cursor.py:710} INFO - query: [<some sql statement 
here>]
   [2022-06-14, 04:10:03 ] {cursor.py:734} INFO - query execution done
   [2022-06-14, 04:10:03 ] {snowflake.py:334} INFO - Statement execution info - 
{'status': 'some_table already exists, statement succeeded.'}
   [2022-06-14, 04:10:03 ] {snowflake.py:338} INFO - Rows affected: 1
   [2022-06-14, 04:10:03 ] {snowflake.py:339} INFO - Snowflake query id:  
<some-uuid-here>
   [2022-06-14, 04:10:03 ] {snowflake.py:324} INFO - Running statement: <some 
sql statement here>
   [2022-06-14, 04:10:03 ] {cursor.py:710} INFO - query: [<some sql statement 
here>]
   [2022-06-14, 04:10:08 ] {cursor.py:734} INFO - query execution done
   [2022-06-14, 04:10:08 ] {snowflake.py:334} INFO - Statement execution info - 
{'number of rows deleted': 562}
   [2022-06-14, 04:10:08 ] {snowflake.py:338} INFO - Rows affected: 562
   [2022-06-14, 04:10:08 ] {snowflake.py:339} INFO - Snowflake query id: 
<some-uuid-here>
   [2022-06-14, 04:10:08 ] {snowflake.py:324} INFO - Running statement: <some 
sql statement here>
   [2022-06-14, 04:10:08 ] {cursor.py:710} INFO - query: [<some sql statement 
here>]
   [2022-06-14, 04:16:29 ] {taskinstance.py:1149} INFO - Dependencies not met 
for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 
[running]>, dependency 'Task Instance Not Running' FAILED: Task is in the 
running state
   [2022-06-14, 04:16:29 ] {taskinstance.py:1149} INFO - Dependencies not met 
for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 
[running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' 
state.
   [2022-06-14, 04:16:29 ] {local_task_job.py:101} INFO - Task is not able to 
be **run**
   ```
   
   
   
   ### What you think should happen instead
   
   The tasks should keep running until they are finished.
   
   ### How to reproduce
   
   I really don't know, sorry. I have tried my best to explain the situation 
above.
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==3.2.0
   apache-airflow-providers-celery==2.1.3
   apache-airflow-providers-cncf-kubernetes==3.0.0
   apache-airflow-providers-docker==2.5.2
   apache-airflow-providers-elasticsearch==2.2.0
   apache-airflow-providers-ftp==2.1.2
   apache-airflow-providers-google==6.7.0
   apache-airflow-providers-grpc==2.0.4
   apache-airflow-providers-hashicorp==2.1.4
   apache-airflow-providers-http==2.1.2
   apache-airflow-providers-imap==2.2.3
   apache-airflow-providers-microsoft-azure==3.7.2
   apache-airflow-providers-microsoft-mssql==2.0.1
   apache-airflow-providers-mysql==2.2.3
   apache-airflow-providers-odbc==2.0.4
   apache-airflow-providers-postgres==4.1.0
   apache-airflow-providers-redis==2.0.4
   apache-airflow-providers-sendgrid==2.0.4
   apache-airflow-providers-sftp==2.5.2
   apache-airflow-providers-slack==4.2.3
   apache-airflow-providers-snowflake==2.3.0
   apache-airflow-providers-sqlite==2.1.3
   apache-airflow-providers-ssh==2.3.0
   apache-airflow-providers-tableau==2.1.2
   ```
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   </div>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to