MatrixManAtYrService commented on issue #25586:
URL: https://github.com/apache/airflow/issues/25586#issuecomment-1208938356

   I can't be 100% sure it's the same thing, but I'm seeing similar behavior: 
scheduler just kind of stops without explanation with this (much simpler) dag:
   
   ```python3
   from airflow import Dataset, DAG
   from airflow.operators.python import PythonOperator
   from datetime import datetime, timedelta
   from math import ceil
   import pause
   
   def wait_until_twenty_sec():
       now = datetime.now()
       minutes = 0
       if now.second < 15:
           seconds = 20
       elif now.second < 35:
           seconds = 40
       elif now.second < 55:
           seconds = 0
           minutes = +1
       else:
           seconds = 20
           minutes += 1
       go_time = now + timedelta(minutes=minutes)
       go_time = datetime(
           year=go_time.year,
           month=go_time.month,
           day=go_time.day,
           hour=go_time.hour,
           minute=go_time.minute,
           second=seconds,
       )
       print(f"waiting until {go_time}")
       pause.until(go_time)
   
   a3_a2 = Dataset("a3_a2")
   a1_a2 = Dataset("a1_a2")
   a1_a3 = Dataset("a1_a3")
   
   with DAG(dag_id="start_a1",
            start_date=datetime(1970, 1, 1),schedule_interval=None,
   ) as start_a1:
   
       PythonOperator(task_id='a1_a2', python_callable=wait_until_twenty_sec, 
outlets=[a1_a2])
       PythonOperator(task_id='a1_a3', python_callable=wait_until_twenty_sec, 
outlets=[a1_a3])
   
   with DAG(dag_id="a2",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a2, a3_a2],
   ) as a2:
   
       PythonOperator(task_id='no_outlets', 
python_callable=wait_until_twenty_sec)
   
   with DAG(dag_id="a3",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a3],
   ) as a3:
   
       PythonOperator(task_id='a3_a2', python_callable=wait_until_twenty_sec, 
outlets=[a3_a2])
   ```
   
   In this case it gets stuck with one task queued:
   
   
![stuck](https://user-images.githubusercontent.com/5834582/183572594-c8916b68-f7e5-4208-baa4-199283df86e2.png)
   
   Again, scheduler logs are uninteresting:
   ```
   venv ❯ airflow scheduler
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   {scheduler_job.py:692} INFO - Starting the scheduler
   {scheduler_job.py:697} INFO - Processing each file at most -1 times
   [51831] [INFO] Starting gunicorn 20.1.0
   [51831] [INFO] Listening at: http://[::]:8793 (51831)
   [51831] [INFO] Using worker: sync
   {executor_loader.py:105} INFO - Loaded executor: LocalExecutor
   [51833] [INFO] Booting worker with pid: 51833
   [51843] [INFO] Booting worker with pid: 51843
   {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 51969
   {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs
   {settings.py:55} INFO - Configured default timezone Timezone('UTC')
   {scheduler_job.py:341} INFO - 2 tasks up for execution:
        <TaskInstance: start_a1.a1_a2 manual__2022-08-09T05:31:45.907115+00:00 
[scheduled]>
        <TaskInstance: start_a1.a1_a3 manual__2022-08-09T05:31:45.907115+00:00 
[scheduled]>
   {scheduler_job.py:406} INFO - DAG start_a1 has 0/16 running and queued tasks
   {scheduler_job.py:406} INFO - DAG start_a1 has 1/16 running and queued tasks
   {scheduler_job.py:492} INFO - Setting the following tasks to queued state:
        <TaskInstance: start_a1.a1_a2 manual__2022-08-09T05:31:45.907115+00:00 
[scheduled]>
        <TaskInstance: start_a1.a1_a3 manual__2022-08-09T05:31:45.907115+00:00 
[scheduled]>
   {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a1', 
task_id='a1_a2', run_id='manual__2022-08-09T05:31:45.907115+00:00', 
try_number=1, map_index=-1) to executor with priority 1 and queue default
   {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 
'start_a1', 'a1_a2', 'manual__2022-08-09T05:31:45.907115+00:00', '--local', 
'--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a1', 
task_id='a1_a3', run_id='manual__2022-08-09T05:31:45.907115+00:00', 
try_number=1, map_index=-1) to executor with priority 1 and queue default
   {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 
'start_a1', 'a1_a3', 'manual__2022-08-09T05:31:45.907115+00:00', '--local', 
'--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 
'run', 'start_a1', 'a1_a2', 'manual__2022-08-09T05:31:45.907115+00:00', 
'--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 
'run', 'start_a1', 'a1_a3', 'manual__2022-08-09T05:31:45.907115+00:00', 
'--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {task_command.py:378} INFO - Running <TaskInstance: start_a1.a1_a2 
manual__2022-08-09T05:31:45.907115+00:00 [queued]> on host ChoedanKal
   {task_command.py:378} INFO - Running <TaskInstance: start_a1.a1_a3 
manual__2022-08-09T05:31:45.907115+00:00 [queued]> on host ChoedanKal
   {dagrun.py:567} INFO - Marking run <DagRun start_a1 @ 2022-08-09 
05:31:45.907115+00:00: manual__2022-08-09T05:31:45.907115+00:00, state:running, 
queued_at: 2022-08-09 05:31:46.059843+00:00. externally triggered: True> 
successful
   {dagrun.py:612} INFO - DagRun Finished: dag_id=start_a1, 
execution_date=2022-08-09 05:31:45.907115+00:00, 
run_id=manual__2022-08-09T05:31:45.907115+00:00, run_start_date=2022-08-09 
05:31:58.934238+00:00, run_end_date=2022-08-09 05:32:20.411708+00:00, 
run_duration=21.47747, state=success, external_trigger=True, run_type=manual, 
data_interval_start=2022-08-09 05:31:45.907115+00:00, 
data_interval_end=2022-08-09 05:31:45.907115+00:00, 
dag_hash=cf8aacf64e7f829b69d2b2baf4ba97fb
   {dag.py:3178} INFO - Setting next_dagrun for start_a1 to None, run_after=None
   {scheduler_job.py:583} INFO - Executor reports execution of start_a1.a1_a3 
run_id=manual__2022-08-09T05:31:45.907115+00:00 exited with status success for 
try_number 1
   {scheduler_job.py:583} INFO - Executor reports execution of start_a1.a1_a2 
run_id=manual__2022-08-09T05:31:45.907115+00:00 exited with status success for 
try_number 1
   {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a1, 
task_id=a1_a2, run_id=manual__2022-08-09T05:31:45.907115+00:00, map_index=-1, 
run_start_date=2022-08-09 05:32:00.297901+00:00, run_end_date=2022-08-09 
05:32:20.156646+00:00, run_duration=19.858745, state=success, 
executor_state=success, try_number=1, max_tries=0, job_id=3, pool=default_pool, 
queue=default, priority_weight=1, operator=PythonOperator, 
queued_dttm=2022-08-09 05:31:59.210223+00:00, queued_by_job_id=1, pid=52002
   {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a1, 
task_id=a1_a3, run_id=manual__2022-08-09T05:31:45.907115+00:00, map_index=-1, 
run_start_date=2022-08-09 05:32:00.479911+00:00, run_end_date=2022-08-09 
05:32:20.058977+00:00, run_duration=19.579066, state=success, 
executor_state=success, try_number=1, max_tries=0, job_id=2, pool=default_pool, 
queue=default, priority_weight=1, operator=PythonOperator, 
queued_dttm=2022-08-09 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to