karakanb commented on issue #24466:
URL: https://github.com/apache/airflow/issues/24466#issuecomment-1159160046

   I keep experiencing the same, although I have limited logs due to a lack of 
log collector yet, although I see no harm in sharing whatever I can find here 
in case someone can help. This will be a bit long, so apologies in advance.
   
   ### Task 1 - Example
   
   here's a task that has experienced the same issue recently:
   ```
   *** Reading local file: 
/opt/airflow/logs/dag_id=my-company/run_id=scheduled__2022-06-15T01:00:00+00:00/task_id=my-task-id/attempt=1.log
   [2022-06-16, 02:14:50 UTC] {taskinstance.py:1159} INFO - Dependencies all 
met for <TaskInstance: my-company.my-task-id 
scheduled__2022-06-15T01:00:00+00:00 [queued]>
   [2022-06-16, 02:14:50 UTC] {taskinstance.py:1159} INFO - Dependencies all 
met for <TaskInstance: my-company.my-task-id 
scheduled__2022-06-15T01:00:00+00:00 [queued]>
   [2022-06-16, 02:14:50 UTC] {taskinstance.py:1356} INFO - 
   
--------------------------------------------------------------------------------
   [2022-06-16, 02:14:50 UTC] {taskinstance.py:1357} INFO - Starting attempt 1 
of 4
   [2022-06-16, 02:14:50 UTC] {taskinstance.py:1358} INFO - 
   
--------------------------------------------------------------------------------
   [2022-06-16, 02:14:50 UTC] {taskinstance.py:1377} INFO - Executing 
<Task(SnowflakeOperator): my-task-id> on 2022-06-15 01:00:00+00:00
   [2022-06-16, 02:14:50 UTC] {standard_task_runner.py:52} INFO - Started 
process 472 to run task
   [2022-06-16, 02:14:50 UTC] {standard_task_runner.py:79} INFO - Running: 
['airflow', 'tasks', 'run', 'my-company', 'my-task-id', 
'scheduled__2022-06-15T01:00:00+00:00', '--job-id', '22342', '--raw', 
'--subdir', 'DAGS_FOLDER/dag_v3.py', '--cfg-path', '/tmp/tmpe6oklcxp', 
'--error-file', '/tmp/tmp7eje3qad']
   [2022-06-16, 02:14:50 UTC] {standard_task_runner.py:80} INFO - Job 22342: 
Subtask my-task-id
   [2022-06-16, 02:14:51 UTC] {task_command.py:370} INFO - Running 
<TaskInstance: my-company.my-task-id scheduled__2022-06-15T01:00:00+00:00 
[running]> on host 
airflow-v2-worker-5.airflow-v2-worker.airflow.svc.cluster.local
   [2022-06-16, 02:14:51 UTC] {taskinstance.py:1569} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=my-company
   AIRFLOW_CTX_TASK_ID=my-task-id
   AIRFLOW_CTX_EXECUTION_DATE=2022-06-15T01:00:00+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-06-15T01:00:00+00:00
   [2022-06-16, 02:14:51 UTC] {snowflake.py:118} INFO - Executing: <some sql 
statement here>
   [2022-06-16, 02:14:51 UTC] {base.py:68} INFO - Using connection ID 
'my-company-connection-abcd' for task execution.
   [2022-06-16, 02:14:51 UTC] {connection.py:257} INFO - Snowflake Connector 
for Python Version: 2.7.8, Python Version: 3.8.13, Platform: 
Linux-5.10.0-0.bpo.9-amd64-x86_64-with-glibc2.2.5
   [2022-06-16, 02:14:51 UTC] {connection.py:876} INFO - This connection is in 
OCSP Fail Open Mode. TLS Certificates would be checked for validity and 
revocation status. Any other Certificate Revocation related exceptions or OCSP 
Responder failures would be disregarded in favor of connectivity.
   [2022-06-16, 02:14:51 UTC] {connection.py:894} INFO - Setting 
use_openssl_only mode to False
   [2022-06-16, 02:14:52 UTC] {cursor.py:710} INFO - query: [<some sql 
statement here>]
   [2022-06-16, 02:14:52 UTC] {cursor.py:734} INFO - query execution done
   [2022-06-16, 02:14:52 UTC] {snowflake.py:324} INFO - Running statement: 
<some sql statement here>
   [2022-06-16, 02:14:52 UTC] {cursor.py:710} INFO - query: [<some sql 
statement here>]
   [2022-06-16, 02:14:53 UTC] {cursor.py:734} INFO - query execution done
   [2022-06-16, 02:14:53 UTC] {snowflake.py:334} INFO - Statement execution 
info - {'status': 'Statement executed successfully.'}
   [2022-06-16, 02:14:53 UTC] {snowflake.py:338} INFO - Rows affected: 1
   [2022-06-16, 02:14:53 UTC] {snowflake.py:339} INFO - Snowflake query id: 
<some uuid here>
   [2022-06-16, 02:14:53 UTC] {snowflake.py:324} INFO - Running statement: 
<some sql statement here>
   [2022-06-16, 02:14:53 UTC] {cursor.py:710} INFO - query: [<some sql 
statement here>]
   [2022-06-16, 02:53:25 UTC] {taskinstance.py:1149} INFO - Dependencies not 
met for <TaskInstance: my-company.my-task-id 
scheduled__2022-06-15T01:00:00+00:00 [running]>, dependency 'Task Instance Not 
Running' FAILED: Task is in the running state
   [2022-06-16, 02:53:25 UTC] {taskinstance.py:1149} INFO - Dependencies not 
met for <TaskInstance: my-company.my-task-id 
scheduled__2022-06-15T01:00:00+00:00 [running]>, dependency 'Task Instance 
State' FAILED: Task is in the 'running' state.
   [2022-06-16, 02:53:25 UTC] {local_task_job.py:101} INFO - Task is not able 
to be run
   ```
   
   unfortunately, I don't have scheduler logs from the start of the task 
execution because the pods have restarted and the logs are gone, but my logs 
start somewhere in the middle and I see this:
   ```
   [2022-06-16 03:46:03,405] {scheduler_job.py:353} INFO - 16 tasks up for 
execution:
   ... some other tasks listed here
   [2022-06-16 03:46:03,405] {scheduler_job.py:353}     <TaskInstance: 
my-company.my-task-id scheduled__2022-06-15T01:00:00+00:00 [scheduled]>
   [2022-06-16 03:46:03,409] {scheduler_job.py:418} INFO - DAG my-company has 
15/15 running and queued tasks
   [2022-06-16 03:46:03,409] {scheduler_job.py:425} INFO - Not executing 
<TaskInstance: my-company.my-task-id scheduled__2022-06-15T01:00:00+00:00 
[scheduled]> since the number of tasks running or queued from DAG my-company is 
>= to the DAG's max_active_tasks limit of 15
   ```
   
   As you can see in the first few lines, the same task is one of the tasks 
that is already scheduled, and it is not being scheduled only because 
`max_active_tasks` is preventing the DAG to have more tasks running.
   
   For my worker logs, I don't have them from the beginning as well, but I have 
them from when the first attempt ends, 02:53, and check this out:
   ```
   [2022-06-16 02:53:24,246] INFO/ForkPoolWorker-57 Running <TaskInstance: 
my-company.my-task-id scheduled__2022-06-15T01:00:00+00:00 [running]> on host 
airflow-v2-worker-1.airflow-v2-worker.airflow.svc.cluster.local
   ```
   
   Right at the time that the first attempt has ended, there is a log in the 
worker that the an instance of the same task is being picked up.
   
   The curious thing is that, according to my task logs for the **2nd 
attempt**, the task hasn't started until 04:00 UTC:
   ```
   *** Reading local file: 
/opt/airflow/logs/dag_id=my-company/run_id=scheduled__2022-06-15T01:00:00+00:00/task_id=my-task-id/attempt=2.log
   [2022-06-16, 04:00:42 UTC] {taskinstance.py:1159} INFO - Dependencies all 
met for <TaskInstance: my-company.my-task-id 
scheduled__2022-06-15T01:00:00+00:00 [queued]>
   [2022-06-16, 04:00:42 UTC] {taskinstance.py:1159} INFO - Dependencies all 
met for <TaskInstance: my-company.my-task-id 
scheduled__2022-06-15T01:00:00+00:00 [queued]>
   [2022-06-16, 04:00:42 UTC] {taskinstance.py:1356} INFO - 
   
--------------------------------------------------------------------------------
   [2022-06-16, 04:00:42 UTC] {taskinstance.py:1357} INFO - Starting attempt 2 
of 4
   [2022-06-16, 04:00:42 UTC] {taskinstance.py:1358} INFO - 
   
--------------------------------------------------------------------------------
   [2022-06-16, 04:00:42 UTC] {taskinstance.py:1377} INFO - Executing 
<Task(SnowflakeOperator): my-task-id> on 2022-06-15 01:00:00+00:00
   [2022-06-16, 04:00:42 UTC] {standard_task_runner.py:52} INFO - Started 
process 15625 to run task
   ...
   ```
   
   #### Facts
   - This DAG has 251 task, and the distribution is:
     - SnowflakeOperators: 224
     - PythonOperators: 13
     - S3KeySensors: 12
     - SlackWebhookOperator: 1
     - DummyOperator: 1
   - The DAG runs once a day. 
   - So far, I have been able to observe this issue only with 
`SnowflakeOperator` tasks after the upgrade.
     - In v2.2.5, I was able to see this issue with different operators as well.
     - After upgrading to v2.3.2, I see this issue only with SnowflakeOperator.
     - This is my biggest DAG, and it is full of Snowflake operators, therefore 
it might be the case that I only see this issue with SnowflakeOperator because 
this is my biggest DAG, or some other reason. So I don't want to say "this 
issue is specific to SnowflakeOperator".
   
   #### Hypothesis
   
   Here's my hypothesis about what might have happened:
   - the first task starts at 02:14:50 UTC
   - sometime between 02:14:50 - 02:53:24, scheduler schedules another instance 
of this task, and the task manages to start at 02:53:24 UTC.
     - the task might have started if there were less than 15 tasks running for 
this DAG, which is the `max_active_tasks` limit for the DAG.
   - when the second instance of the first attempt starts, the scheduler kills 
the first task with `Task is not able to be run`.
     - the task is killed at `02:53:25 UTC`, and the worker has a log for 
running this task at `02:53:24,246`, so it could be the case that this ghost 
attempt that started in the background caused the original one to be killed.
   - For some reason, there are no logs for this ghost task anywhere.
   - At `04:00:42 UTC`, scheduler starts the actual second attempt of this task.
   
   ---------
   
   I hope these are helpful, I'll try to collect more logs.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to