hussein-awala opened a new pull request, #31985:
URL: https://github.com/apache/airflow/pull/31985

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   related: #31703
   
   In #31703, we decided to remove certain return statements under the 
assumption that they were unnecessary after yielding an event. However, upon 
testing this strategy, it appears that in certain cases (specifically when the 
triggerer service is overloaded), we end up generating a significant number of 
events. As a result, we continue with unnecessary processing, which further 
burdens the triggerer.
   
   To test that, I have created a new module `airflow/tests.py`:
   ```python
   from __future__ import annotations
   
   from airflow.models import BaseOperator
   from airflow.triggers.base import BaseTrigger, TriggerEvent
   
   
   class TestTrigger(BaseTrigger):
       def serialize(self) -> tuple[str, dict]:
           return ("airflow.tests.TestTrigger", {} )
   
       async def run(self):
           ind = 0
           while True:
               yield TriggerEvent(
                   {
                       "message": f"Hello, world! {ind}",
                   }
               )
               ind += 1
               # retrun
   
   
   class TestOperator(BaseOperator):
       def execute(self, context):
           self.defer(
               trigger=TestTrigger(),
               method_name="execute_complete",
           )
   
       def execute_complete(self, context: dict | None, event: dict):
           print(event)
   ```
   And this small dag:
   ```python
   from datetime import datetime
   
   from airflow.models import DAG
   from airflow.tests import TestOperator
   
   with DAG(
       dag_id="test_trigger",
       start_date=datetime(2021, 1, 1),
       schedule_interval=None,
   
   ) as dag:
       TestOperator(task_id="test_trigger")
   ```
   which I tested in breeze.
   
   When I tested it without a `return`, I got:
   ```
   [2023-06-17, 21:33:05 UTC] {taskinstance.py:1135} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
test_trigger.test_trigger manual__2023-06-17T21:33:05.186752+00:00 [queued]>
   [2023-06-17, 21:33:05 UTC] {taskinstance.py:1135} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
test_trigger.test_trigger manual__2023-06-17T21:33:05.186752+00:00 [queued]>
   [2023-06-17, 21:33:05 UTC] {taskinstance.py:1338} INFO - Starting attempt 1 
of 1
   [2023-06-17, 21:33:05 UTC] {taskinstance.py:1359} INFO - Executing 
<Task(TestOperator): test_trigger> on 2023-06-17 21:33:05.186752+00:00
   [2023-06-17, 21:33:05 UTC] {standard_task_runner.py:57} INFO - Started 
process 4802 to run task
   [2023-06-17, 21:33:05 UTC] {standard_task_runner.py:84} INFO - Running: 
['***', 'tasks', 'run', 'test_trigger', 'test_trigger', 
'manual__2023-06-17T21:33:05.186752+00:00', '--job-id', '137', '--raw', 
'--subdir', 'DAGS_FOLDER/trigger.py', '--cfg-path', '/tmp/tmppovv3qxe']
   [2023-06-17, 21:33:05 UTC] {standard_task_runner.py:85} INFO - Job 137: 
Subtask test_trigger
   [2023-06-17, 21:33:06 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: test_trigger.test_trigger 
manual__2023-06-17T21:33:05.186752+00:00 [running]> on host d3e0d78cfce9
   [2023-06-17, 21:33:06 UTC] {taskinstance.py:1637} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_trigger' 
AIRFLOW_CTX_TASK_ID='test_trigger' 
AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T21:33:05.186752+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T21:33:05.186752+00:00'
   [2023-06-17, 21:33:06 UTC] {taskinstance.py:1505} INFO - Pausing task as 
DEFERRED. dag_id=test_trigger, task_id=test_trigger, 
execution_date=20230617T213305, start_date=20230617T213305
   [2023-06-17, 21:33:06 UTC] {local_task_job_runner.py:222} INFO - Task exited 
with return code 100 (task deferral)
   [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 0'}>
   [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 1'}>
   [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 2'}>
   [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 3'}>
   [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 4'}>
   [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 5'}>
   [2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 6'}>
   ...
   [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 26651'}>
   [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 26652'}>
   [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 26653'}>
   [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 26654'}>
   [2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) 
fired: TriggerEvent<{'message': 'Hello, world! 26655'}>
   [2023-06-17, 21:33:09 UTC] {logging_mixin.py:152} INFO - {'message': 'Hello, 
world! 0'}
   [2023-06-17, 21:33:09 UTC] {taskinstance.py:1377} INFO - Marking task as 
SUCCESS. dag_id=test_trigger, task_id=test_trigger, 
execution_date=20230617T213305, start_date=20230617T213305, 
end_date=20230617T213309
   [2023-06-17, 21:33:09 UTC] {local_task_job_runner.py:225} INFO - Task exited 
with return code 0
   [2023-06-17, 21:33:09 UTC] {taskinstance.py:2752} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   and even more than 26k in other runs when the triggerer was overloaded.
   
   However, when using a return statement, I consistently observed a single 
sent event. Moreover, I noticed that this approach consumes less memory as 
there are fewer events being added to the deque `events`.
   
   ```
   [2023-06-17, 21:32:45 UTC] {taskinstance.py:1135} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]>
   [2023-06-17, 21:32:45 UTC] {taskinstance.py:1135} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]>
   [2023-06-17, 21:32:45 UTC] {taskinstance.py:1338} INFO - Starting attempt 1 
of 1
   [2023-06-17, 21:32:45 UTC] {taskinstance.py:1359} INFO - Executing 
<Task(TestOperator): test_trigger> on 2023-06-17 21:32:45.026487+00:00
   [2023-06-17, 21:32:45 UTC] {standard_task_runner.py:57} INFO - Started 
process 4788 to run task
   [2023-06-17, 21:32:45 UTC] {standard_task_runner.py:84} INFO - Running: 
['***', 'tasks', 'run', 'test_trigger', 'test_trigger', 
'manual__2023-06-17T21:32:45.026487+00:00', '--job-id', '131', '--raw', 
'--subdir', 'DAGS_FOLDER/trigger.py', '--cfg-path', '/tmp/tmpikm55djk']
   [2023-06-17, 21:32:45 UTC] {standard_task_runner.py:85} INFO - Job 131: 
Subtask test_trigger
   [2023-06-17, 21:32:46 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: test_trigger.test_trigger 
manual__2023-06-17T21:32:45.026487+00:00 [running]> on host d3e0d78cfce9
   [2023-06-17, 21:32:46 UTC] {taskinstance.py:1637} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_trigger' 
AIRFLOW_CTX_TASK_ID='test_trigger' 
AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T21:32:45.026487+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T21:32:45.026487+00:00'
   [2023-06-17, 21:32:46 UTC] {taskinstance.py:1505} INFO - Pausing task as 
DEFERRED. dag_id=test_trigger, task_id=test_trigger, 
execution_date=20230617T213245, start_date=20230617T213245
   [2023-06-17, 21:32:46 UTC] {local_task_job_runner.py:222} INFO - Task exited 
with return code 100 (task deferral)
   [2023-06-17, 21:32:47 UTC] {triggerer_job_runner.py:608} INFO - Trigger 
test_trigger/manual__2023-06-17T21:32:45.026487+00:00/test_trigger/-1/1 (ID 63) 
fired: TriggerEvent<{'message': 'Hello, world! 0'}>
   [2023-06-17, 21:32:48 UTC] {taskinstance.py:1135} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]>
   [2023-06-17, 21:32:48 UTC] {taskinstance.py:1135} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]>
   [2023-06-17, 21:32:48 UTC] {taskinstance.py:1336} INFO - Resuming after 
deferral
   [2023-06-17, 21:32:48 UTC] {taskinstance.py:1359} INFO - Executing 
<Task(TestOperator): test_trigger> on 2023-06-17 21:32:45.026487+00:00
   [2023-06-17, 21:32:48 UTC] {standard_task_runner.py:57} INFO - Started 
process 4796 to run task
   [2023-06-17, 21:32:48 UTC] {standard_task_runner.py:84} INFO - Running: 
['***', 'tasks', 'run', 'test_trigger', 'test_trigger', 
'manual__2023-06-17T21:32:45.026487+00:00', '--job-id', '134', '--raw', 
'--subdir', 'DAGS_FOLDER/trigger.py', '--cfg-path', '/tmp/tmpbtzk4o5s']
   [2023-06-17, 21:32:48 UTC] {standard_task_runner.py:85} INFO - Job 134: 
Subtask test_trigger
   [2023-06-17, 21:32:48 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: test_trigger.test_trigger 
manual__2023-06-17T21:32:45.026487+00:00 [running]> on host d3e0d78cfce9
   [2023-06-17, 21:32:48 UTC] {logging_mixin.py:152} INFO - {'message': 'Hello, 
world! 0'}
   [2023-06-17, 21:32:48 UTC] {taskinstance.py:1377} INFO - Marking task as 
SUCCESS. dag_id=test_trigger, task_id=test_trigger, 
execution_date=20230617T213245, start_date=20230617T213245, 
end_date=20230617T213248
   [2023-06-17, 21:32:48 UTC] {local_task_job_runner.py:225} INFO - Task exited 
with return code 0
   [2023-06-17, 21:32:48 UTC] {taskinstance.py:2752} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to