raffifu commented on issue #34204:
URL: https://github.com/apache/airflow/issues/34204#issuecomment-1718809331

   @potiuk is the `deferrable` mode should behave the same as `non deferable` 
mode on ExternalTaskSensor? Because i found that the `deferrable` will not fail 
if the `external_task_id` failed. I think it's because there's no checking 
about the state of task or dags on the run method. 
   This error happen when i'm trying to implement the proposed solution from 
@ecodina. But, even if we're using the current code, it will fail because of 
the timeout not because the `external_task` failed  (it will confused the user 
i think)
   ```
     while True:
         delta = utcnow() - self.trigger_start_time
         if delta.total_seconds() < self._timeout_sec:
             if await self.count_running_dags() == 0:  # type: ignore[call-arg]
                 self.log.info("Waiting for DAG to start execution...")
                 await asyncio.sleep(self.poll_interval)
             else:
                 break
         else:
             yield TriggerEvent({"status": "timeout"})
             return
   
     # Wait Task to success
     while True:
         if await self.count_tasks() == len(self.execution_dates):  # type: 
ignore[call-arg]
             yield TriggerEvent({"status": "success"})
             return
   
         self.log.info("Task is still running, sleeping for %s seconds...", 
self.poll_interval)
         await asyncio.sleep(self.poll_interval)
   ```
   
   Current code:
   ```
   while True:
       delta = utcnow() - self.trigger_start_time
       if delta.total_seconds() < self._timeout_sec:
           # mypy confuses typing here
           if await self.count_running_dags() == 0:  # type: ignore[call-arg]
               self.log.info("Waiting for DAG to start execution...")
               await asyncio.sleep(self.poll_interval)
       else:
           yield TriggerEvent({"status": "timeout"}) # <---- This will 
triggered if the running task is suddenly failed
           return
       # mypy confuses typing here
       if await self.count_tasks() == len(self.execution_dates):  # type: 
ignore[call-arg]
           yield TriggerEvent({"status": "success"})
           return
       self.log.info("Task is still running, sleeping for %s seconds...", 
self.poll_interval)
       await asyncio.sleep(self.poll_interval)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to