jason810496 commented on issue #58676:
URL: https://github.com/apache/airflow/issues/58676#issuecomment-3845092376

   > Any insights or examples you can share? Thanks!
   
   I use the same Dag and same setup in Breeze today, but I'm unable to to 
reproduce right now 😥
   
   **Breeze setup**:
   - Add the dag to `${AIRFLOW_REPO}/files/dags`
   - `breeze start-airflow --use-airflow-version 3.1.6 --backend postgres`
   - Add `/files/dags` to `PYTHONPATH` ( or `cp /files/dags/<new-dag>.py 
/opt/airflow` for quick workaround )
   
   **The Dag I use**: 
   ```python
   from __future__ import annotations
   
   import asyncio
   import os
   import signal
   import subprocess
   from typing import Any
   
   from airflow.sdk import dag, task, teardown
   from airflow.sdk.bases.operator import BaseOperator
   from airflow.triggers.base import BaseTrigger, TriggerEvent
   
   
   class ProcessWaitTrigger(BaseTrigger):
       """Trigger that waits for a process to finish."""
   
       def __init__(self, pid: int):
           super().__init__()
           self.pid = pid
   
       def serialize(self) -> tuple[str, dict[str, Any]]:
           return (
               "no_logs_during_deferrable.ProcessWaitTrigger",
               {"pid": self.pid},
           )
   
       async def run(self):
           while True:
               try:
                   # Check if process exists (pid 0 signal does not kill)
                   # os.kill is synchronous, but fast.
                   os.kill(self.pid, 0)
                   self.log.info("Process %d still running", self.pid)
                   await asyncio.sleep(1)
               except OSError:
                   # Process dead
                   yield TriggerEvent({"status": "success"})
                   return
   
   
   class ProcessWaitOperator(BaseOperator):
       """Operator that defers until a process finishes."""
   
       template_fields = ("pid",)
   
       def __init__(self, pid: int, **kwargs):
           super().__init__(**kwargs)
           self.pid = pid
   
       def execute(self, context):
           print(f">> Deferring process {self.pid} wait")
           self.defer(
               trigger=ProcessWaitTrigger(pid=int(self.pid)),
               method_name="execute_complete"
           )
   
       def execute_complete(self, context, event=None):
           print(f"Process {self.pid} finished")
           return
   
   @dag(
       dag_id="no_logs_during_deferrable",
       schedule=None,
       catchup=False,
   )
   def no_logs_during_deferrable():
       @task(task_id="submit_job")
       def submit_job(**context):
           # Run sleep 600 in background
           process = subprocess.Popen(["sleep", "600"], close_fds=True)
           print(f"Submitted process {process.pid}")
           return process.pid
   
       @teardown
       @task(task_id="cancel_job")
       def cancel_job(pid: int):
           print(f"Killing process {pid}")
           try:
               os.kill(pid, signal.SIGTERM)
           except OSError:
               print(f"Process {pid} already gone")
   
       pid = submit_job()
       wait_job_finish = ProcessWaitOperator(task_id="wait_job_finish", pid=pid)
       wait_job_finish >> cancel_job(pid)
   
   no_logs_during_deferrable()
   ```
   
   **Todays Logs**:
   ```
   Log message source details 
sources=["/root/airflow/logs/dag_id=no_logs_during_deferrable/run_id=manual__2026-02-04T03:37:09+00:00/task_id=wait_job_finish/attempt=1.log","/root/airflow/logs/dag_id=no_logs_during_deferrable/run_id=manual__2026-02-04T03:37:09+00:00/task_id=wait_job_finish/attempt=1.log.trigger.3.log","http://630c65abc04d:8794/log/dag_id=no_logs_during_deferrable/run_id=manual__2026-02-04T03:37:09+00:00/task_id=wait_job_finish/attempt=1.log.trigger.3.log";]
   [2026-02-04 11:37:13] INFO - DAG bundles loaded: dags-folder 
source=airflow.dag_processing.bundles.manager.DagBundlesManager 
loc=manager.py:179
   [2026-02-04 11:37:13] INFO - Filling up the DagBag from 
/files/dags/no_logs_during_deferrable.py source=airflow.models.dagbag.DagBag 
loc=dagbag.py:593
   [2026-02-04 11:37:13] INFO - >> Deferring process 942 wait source=task.stdout
   [2026-02-04 11:37:13] INFO - Pausing task as DEFERRED.  
dag_id=no_logs_during_deferrable task_id=wait_job_finish 
run_id=manual__2026-02-04T03:37:09+00:00 source=task loc=task_runner.py:928
   [2026-02-04 11:37:14] INFO - trigger 
no_logs_during_deferrable/manual__2026-02-04T03:37:09+00:00/wait_job_finish/-1/1
 (ID 3) starting loc=triggerer_job_runner.py:1117
   [2026-02-04 11:37:14] INFO - Process 942 still running 
source=no_logs_during_deferrable.ProcessWaitTrigger 
loc=no_logs_during_deferrable.py:33
   [2026-02-04 11:37:15] INFO - Process 942 still running 
source=no_logs_during_deferrable.ProcessWaitTrigger 
loc=no_logs_during_deferrable.py:33
   [2026-02-04 11:37:16] INFO - Process 942 still running 
source=no_logs_during_deferrable.ProcessWaitTrigger 
loc=no_logs_during_deferrable.py:33
   [2026-02-04 11:37:17] INFO - Process 942 still running 
source=no_logs_during_deferrable.ProcessWaitTrigger 
loc=no_logs_during_deferrable.py:33
   [2026-02-04 11:37:18] INFO - Process 942 still running 
source=no_logs_during_deferrable.ProcessWaitTrigger 
loc=no_logs_during_deferrable.py:33
   [2026-02-04 11:37:19] INFO - Process 942 still running 
source=no_logs_during_deferrable.ProcessWaitTrigger 
loc=no_logs_during_deferrable.py:33
   ```
   
   However, all the  `Process xxx still running 
source=no_logs_during_deferrable.ProcessWaitTrigger ` logs are not shown either 
on UI or the local filesystem, I had `cat path/to/trigger/logs` several time, 
and the logs only show `trigger 
{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} (ID 
{trigger_id}) starting loc=triggerer_job_runner.py:1117` yesterday
   
   
https://github.com/apache/airflow/blob/fe0633d729c85131ca96aa41a8c56282a407b7d5/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L1183
   
   Only this exact log line was shown yesterday.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to