Lee-W commented on PR #38674:
URL: https://github.com/apache/airflow/pull/38674#issuecomment-2047289208

   In the current design, I introduced `start_trigger` and `next_method` to the 
`BaseOperator`. When `start_trigger` is not `None`, the scheduler will call the 
`defer_task` method of the task instance object, and everything should work as 
it does when `TaskDeferred` is raised.
   
   Developers who want to implement an operator that starts the execution in a 
triggerer instead of a worker can write something like the following.
   
   ```python
   from airflow.models.baseoperator import BaseOperator
   from airflow.triggers.testing import SuccessTrigger
   
   
   class AsyncOperator(BaseOperator):
       start_trigger = SuccessTrigger()
       next_method = "execute_complete"
   
       def execute_complete(self, context, event=None) -> None:
           self.log.info("execute complete")
   ```
   
   However, it raises a few issues.
   
   1. The DAG developer can overwrite the functionality of the whole operator 
by passing `start_trigger` and `next_method` like the following. Imagine the 
user user `RdsCreateDbSnapshotOperator` but passing a `GCSBlobTrigger.` We 
probably don't want the DAG developer to change the behavior of the operator, 
so maybe we can try something like 
[inherits_from_empty_operator](https://github.com/apache/airflow/blob/768e1169b1946fe536c02ee968a95594d43ebba2/airflow/models/baseoperator.py#L1687)
   
   ```python
   with DAG(...) as dag:
       task = SomeOperator(
           task_id="task",
           start_trigger=SuccessTrigger(),
           next_method="execute_complete",
       )
   ```
   
   2. If we're trying to do something mentioned in point 1, which does not 
allow the DAG developer to change the operator behavior, will we let them 
decide whether they want to run `op.execute` or `start_trigger` implemented by 
the operator author? In the current implementation, if `start_trigger` is not 
`None`, `op.execute` will be ignored. This raises another issue. What if the 
deployed airflow does not have a triggerer running? Should we try something 
like 
[_run_inline_trigger](https://github.com/apache/airflow/blob/0af5d923d99591576b3758ab3c694d02dbe152bf/airflow/models/dag.py#L4171),
 or should we just fallback to `op.execute`? Or maybe we should add back the 
flag (was introduced a few commits ago in this PR) to decide whether to start 
the execution from triggerer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to