hussein-awala opened a new pull request, #31985: URL: https://github.com/apache/airflow/pull/31985
<!-- Thank you for contributing! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! In case of an existing issue, reference it using one of the following: closes: #ISSUE related: #ISSUE How to write a good git commit message: http://chris.beams.io/posts/git-commit/ --> related: #31703 In #31703, we decided to remove certain return statements under the assumption that they were unnecessary after yielding an event. However, upon testing this strategy, it appears that in certain cases (specifically when the triggerer service is overloaded), we end up generating a significant number of events. As a result, we continue with unnecessary processing, which further burdens the triggerer. To test that, I have created a new module `airflow/tests.py`: ```python from __future__ import annotations from airflow.models import BaseOperator from airflow.triggers.base import BaseTrigger, TriggerEvent class TestTrigger(BaseTrigger): def serialize(self) -> tuple[str, dict]: return ("airflow.tests.TestTrigger", {} ) async def run(self): ind = 0 while True: yield TriggerEvent( { "message": f"Hello, world! {ind}", } ) ind += 1 # retrun class TestOperator(BaseOperator): def execute(self, context): self.defer( trigger=TestTrigger(), method_name="execute_complete", ) def execute_complete(self, context: dict | None, event: dict): print(event) ``` And this small dag: ```python from datetime import datetime from airflow.models import DAG from airflow.tests import TestOperator with DAG( dag_id="test_trigger", start_date=datetime(2021, 1, 1), schedule_interval=None, ) as dag: TestOperator(task_id="test_trigger") ``` which I tested in breeze. When I tested it without a `return`, I got: ``` [2023-06-17, 21:33:05 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:33:05.186752+00:00 [queued]> [2023-06-17, 21:33:05 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:33:05.186752+00:00 [queued]> [2023-06-17, 21:33:05 UTC] {taskinstance.py:1338} INFO - Starting attempt 1 of 1 [2023-06-17, 21:33:05 UTC] {taskinstance.py:1359} INFO - Executing <Task(TestOperator): test_trigger> on 2023-06-17 21:33:05.186752+00:00 [2023-06-17, 21:33:05 UTC] {standard_task_runner.py:57} INFO - Started process 4802 to run task [2023-06-17, 21:33:05 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_trigger', 'test_trigger', 'manual__2023-06-17T21:33:05.186752+00:00', '--job-id', '137', '--raw', '--subdir', 'DAGS_FOLDER/trigger.py', '--cfg-path', '/tmp/tmppovv3qxe'] [2023-06-17, 21:33:05 UTC] {standard_task_runner.py:85} INFO - Job 137: Subtask test_trigger [2023-06-17, 21:33:06 UTC] {task_command.py:410} INFO - Running <TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:33:05.186752+00:00 [running]> on host d3e0d78cfce9 [2023-06-17, 21:33:06 UTC] {taskinstance.py:1637} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_trigger' AIRFLOW_CTX_TASK_ID='test_trigger' AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T21:33:05.186752+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T21:33:05.186752+00:00' [2023-06-17, 21:33:06 UTC] {taskinstance.py:1505} INFO - Pausing task as DEFERRED. dag_id=test_trigger, task_id=test_trigger, execution_date=20230617T213305, start_date=20230617T213305 [2023-06-17, 21:33:06 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral) [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 0'}> [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 1'}> [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 2'}> [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 3'}> [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 4'}> [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 5'}> [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 6'}> ... [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26651'}> [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26652'}> [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26653'}> [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26654'}> [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26655'}> [2023-06-17, 21:33:09 UTC] {logging_mixin.py:152} INFO - {'message': 'Hello, world! 0'} [2023-06-17, 21:33:09 UTC] {taskinstance.py:1377} INFO - Marking task as SUCCESS. dag_id=test_trigger, task_id=test_trigger, execution_date=20230617T213305, start_date=20230617T213305, end_date=20230617T213309 [2023-06-17, 21:33:09 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0 [2023-06-17, 21:33:09 UTC] {taskinstance.py:2752} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` and even more than 26k in other runs when the triggerer was overloaded. However, when using a return statement, I consistently observed a single sent event. Moreover, I noticed that this approach consumes less memory as there are fewer events being added to the deque `events`. ``` [2023-06-17, 21:32:45 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]> [2023-06-17, 21:32:45 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]> [2023-06-17, 21:32:45 UTC] {taskinstance.py:1338} INFO - Starting attempt 1 of 1 [2023-06-17, 21:32:45 UTC] {taskinstance.py:1359} INFO - Executing <Task(TestOperator): test_trigger> on 2023-06-17 21:32:45.026487+00:00 [2023-06-17, 21:32:45 UTC] {standard_task_runner.py:57} INFO - Started process 4788 to run task [2023-06-17, 21:32:45 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_trigger', 'test_trigger', 'manual__2023-06-17T21:32:45.026487+00:00', '--job-id', '131', '--raw', '--subdir', 'DAGS_FOLDER/trigger.py', '--cfg-path', '/tmp/tmpikm55djk'] [2023-06-17, 21:32:45 UTC] {standard_task_runner.py:85} INFO - Job 131: Subtask test_trigger [2023-06-17, 21:32:46 UTC] {task_command.py:410} INFO - Running <TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [running]> on host d3e0d78cfce9 [2023-06-17, 21:32:46 UTC] {taskinstance.py:1637} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_trigger' AIRFLOW_CTX_TASK_ID='test_trigger' AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T21:32:45.026487+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T21:32:45.026487+00:00' [2023-06-17, 21:32:46 UTC] {taskinstance.py:1505} INFO - Pausing task as DEFERRED. dag_id=test_trigger, task_id=test_trigger, execution_date=20230617T213245, start_date=20230617T213245 [2023-06-17, 21:32:46 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral) [2023-06-17, 21:32:47 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:32:45.026487+00:00/test_trigger/-1/1 (ID 63) fired: TriggerEvent<{'message': 'Hello, world! 0'}> [2023-06-17, 21:32:48 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]> [2023-06-17, 21:32:48 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]> [2023-06-17, 21:32:48 UTC] {taskinstance.py:1336} INFO - Resuming after deferral [2023-06-17, 21:32:48 UTC] {taskinstance.py:1359} INFO - Executing <Task(TestOperator): test_trigger> on 2023-06-17 21:32:45.026487+00:00 [2023-06-17, 21:32:48 UTC] {standard_task_runner.py:57} INFO - Started process 4796 to run task [2023-06-17, 21:32:48 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_trigger', 'test_trigger', 'manual__2023-06-17T21:32:45.026487+00:00', '--job-id', '134', '--raw', '--subdir', 'DAGS_FOLDER/trigger.py', '--cfg-path', '/tmp/tmpbtzk4o5s'] [2023-06-17, 21:32:48 UTC] {standard_task_runner.py:85} INFO - Job 134: Subtask test_trigger [2023-06-17, 21:32:48 UTC] {task_command.py:410} INFO - Running <TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [running]> on host d3e0d78cfce9 [2023-06-17, 21:32:48 UTC] {logging_mixin.py:152} INFO - {'message': 'Hello, world! 0'} [2023-06-17, 21:32:48 UTC] {taskinstance.py:1377} INFO - Marking task as SUCCESS. dag_id=test_trigger, task_id=test_trigger, execution_date=20230617T213245, start_date=20230617T213245, end_date=20230617T213248 [2023-06-17, 21:32:48 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0 [2023-06-17, 21:32:48 UTC] {taskinstance.py:2752} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
