amoghrajesh commented on PR #54449:
URL: https://github.com/apache/airflow/pull/54449#issuecomment-3209421767

   Some peace of mind tests that I ran to gain confidence in the PR:
   
   DAG used for testing some cases:
   ```
   from __future__ import annotations
   
   import logging
   
   from airflow import DAG
   from airflow.providers.standard.operators.empty import EmptyOperator
   from airflow.providers.standard.operators.python import PythonOperator
   from airflow.sdk import Variable, Connection
   
   
   def my_function() -> None:
       conn = Connection.get("my_connection")
       logging.getLogger(__name__).info(conn.password)
       print("via print", conn.password)
   
   
   with DAG("subprocess_dag") as dag:
       start = EmptyOperator(task_id="start")
   
       py_func = PythonOperator(task_id="py_func", python_callable=my_function)
   
       end = EmptyOperator(task_id="end")
   
       start >> py_func >> end
   
   ```
   
   Env set:
   ```
   export 
AIRFLOW_CONN_MY_CONNECTION="mysql://testuser:testpassword123@localhost:3306/testdb
   ```
   
   1. Running a task through CLI: `airflow tasks test subprocess_dag py_fu`
   ```
   [2025-08-21T07:52:11.216+0000] {dag.py:2329} INFO - created dagrun <DagRun 
subprocess_dag @ None: 
__airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__, state:running, 
queued_at: None. run_type: manual>
   [2025-08-21T07:52:11.222+0000] {dag.py:1293} INFO - [DAG TEST] starting 
task_id=py_func map_index=-1
   [2025-08-21T07:52:11.680+0000] {dag.py:1296} INFO - [DAG TEST] running task 
<TaskInstance: subprocess_dag.py_func 
__airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__ [None]>
   2025-08-21 07:52:12 [debug    ] Starting task instance run     
hostname=e2cc5b7dc023 pid=450 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 
unixname=root
   2025-08-21 07:52:12 [debug    ] Retrieved task instance details 
dag_id=subprocess_dag state=queued task_id=py_func 
ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
   2025-08-21 07:52:12 [info     ] Task started                   
hostname=e2cc5b7dc023 previous_state=queued 
ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
   2025-08-21 07:52:12 [info     ] Task instance state updated    
rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
   [2025-08-21T07:52:12.497+0000] {_client.py:1025} INFO - HTTP Request: PATCH 
http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/run
 "HTTP/1.1 200 OK"
   2025-08-21 07:52:12 [debug    ] Sending request                
msg=SetRenderedFields(rendered_fields={'templates_dict': None, 'op_args': [], 
'op_kwargs': {}}, type='SetRenderedFields')
   2025-08-21 07:52:12 [debug    ] Received message from task runner 
[supervisor] msg=SetRenderedFields(rendered_fields={'templates_dict': None, 
'op_args': [], 'op_kwargs': {}}, type='SetRenderedFields')
   2025-08-21 07:52:12 [info     ] Updating RenderedTaskInstanceFields 
field_count=3 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
   2025-08-21 07:52:12 [debug    ] RenderedTaskInstanceFields updated 
successfully ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
   [2025-08-21T07:52:12.509+0000] {_client.py:1025} INFO - HTTP Request: PUT 
http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/rtif
 "HTTP/1.1 201 Created"
   2025-08-21 07:52:12 [debug    ] Sending request                
msg=MaskSecret(value='***', name=None, type='MaskSecret')
   2025-08-21 07:52:12 [debug    ] Received message from task runner (body 
omitted) [supervisor] msg=<class 'airflow.sdk.execution_time.comms.MaskSecret'>
   [2025-08-21T07:52:12.513+0000] {subprocess_mask_var.py:13} INFO - ***
   via print ***
   [2025-08-21T07:52:12.513+0000] {python.py:218} INFO - Done. Returned value 
was: None
   2025-08-21 07:52:12 [debug    ] Sending request                
msg=SucceedTask(state='success', end_date=datetime.datetime(2025, 8, 21, 7, 52, 
12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[], outlet_events=[], 
rendered_map_index=None, type='SucceedTask')
   2025-08-21 07:52:12 [debug    ] Received message from task runner 
[supervisor] msg=SucceedTask(state='success', end_date=datetime.datetime(2025, 
8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[], 
outlet_events=[], rendered_map_index=None, type='SucceedTask')
   2025-08-21 07:52:12 [debug    ] Updating task instance state   
new_state=success ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
   2025-08-21 07:52:12 [debug    ] Retrieved current task instance state 
max_tries=0 previous_state=running ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 
try_number=0
   2025-08-21 07:52:12 [info     ] Task instance state updated    
new_state=success rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
   [2025-08-21T07:52:12.522+0000] {_client.py:1025} INFO - HTTP Request: PATCH 
http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/state
 "HTTP/1.1 204 No Content"
   2025-08-21 07:52:12 [debug    ] Running finalizers             [task] 
ti=RuntimeTaskInstance(id=UUID('0198cb9d-28d2-7800-82b0-7161dc72f022'), 
task_id='py_func', dag_id='subprocess_dag', 
run_id='__airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__', 
try_number=0, dag_version_id=UUID('0198cb9b-d3d6-74f3-a561-083d897ed8cd'), 
map_index=-1, hostname='e2cc5b7dc023', context_carrier=None, 
task=<Task(PythonOperator): py_func>, max_tries=0, 
start_date=datetime.datetime(2025, 8, 21, 7, 52, 11, 880733, 
tzinfo=datetime.timezone.utc), end_date=datetime.datetime(2025, 8, 21, 7, 52, 
12, 513546, tzinfo=datetime.timezone.utc), state=<TaskInstanceState.SUCCESS: 
'success'>, is_mapped=False, rendered_map_index=None, log_url=None)
   ```
   
   This is specifically to test the recent issue: 
https://github.com/apache/airflow/commit/1f4c55c0e38fae6f46b71255f0a4dae2157f0991
   
   
   2. Running a trigger with a dag that will log using dag processor, root 
logger, structlog etc
   
   Trigger:
   ```
   from __future__ import annotations
   
   import asyncio
   import logging
   
   import structlog
   
   from airflow.sdk import Connection, Variable
   from airflow.sdk.log import mask_secret
   from airflow.triggers.base import BaseTrigger, TriggerEvent
   
   
   print(f"{__file__=} loaded")
   
   
   class CustomTrigger(BaseTrigger):
       async def run(self, **args):
           from asgiref.sync import sync_to_async
   
           await sync_to_async(Variable.set)("my_api_key", "password1")
           x = await sync_to_async(Variable.get)("my_api_key")
   
           logging.getLogger(__name__).info("my_api_key=%s", x)
           secret = "some-secret-value"
           await sync_to_async(mask_secret)(secret)
           await sync_to_async(mask_secret)("some-secret-value")
   
           logging.getLogger(__name__).info("after manual mask %s", secret)
   
           await structlog.get_logger().ainfo("Testing structlog", val=secret, 
api_key="abcdef", x=x)
   
           yield TriggerEvent({"Hi": "from trigger"})
   
       def serialize(self):
           return (
               f"{type(self).__module__}.{type(self).__qualname__}",
               {},
           )
   
   ```
   
   
   DAG:
   ```
   from airflow.exceptions import AirflowRescheduleException, TaskDeferred
   from airflow.sdk import dag, task
   from airflow.sdk import Variable
   import logging
   
   x = Variable.get("toplevel_api_key", default="secret_api")
   print(f"{x=}")
   logging.root.info("toplevel=%s", x)
   
   @dag()
   def trigger_a_gag():
       @task
       def trigger(event=None) -> None:
           if event:
               print(event)
           else:
               from triggera import CustomTrigger
               raise TaskDeferred(trigger=CustomTrigger(), 
method_name="execute")
   
       trigger()
   
   
   trigger_a_gag()
   
   ```
   
   <img width="1704" height="247" alt="image" 
src="https://github.com/user-attachments/assets/2842464e-becd-44f6-abee-5bf67c8071f7";
 />
   
   
   Logs:
   ```
   [2025-08-21, 13:24:55] INFO - __file__='/files/plugins/triggera.py' loaded: 
chan="stdout": source="task"
   [2025-08-21, 13:24:55] INFO - DAG bundles loaded: dags-folder: 
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
   [2025-08-21, 13:24:55] INFO - Filling up the DagBag from 
/files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag"
   [2025-08-21, 13:24:55] INFO - toplevel=***: source="root"
   [2025-08-21, 13:24:55] INFO - x='***': chan="stdout": source="task"
   [2025-08-21, 13:24:55] INFO - Pausing task as DEFERRED. : 
dag_id="trigger_a_gag": task_id="trigger": 
run_id="manual__2025-08-21T07:54:54.321189+00:00": source="task"
   [2025-08-21, 13:24:57] INFO - trigger 
trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1) 
starting
   [2025-08-21, 13:24:59] INFO - my_api_key=***: source="triggera"
   [2025-08-21, 13:24:59] INFO - after manual mask ***: source="triggera"
   [2025-08-21, 13:24:59] INFO - Testing structlog: val="***": api_key="***": 
x="***"
   [2025-08-21, 13:24:59] INFO - Trigger fired event: 
name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 
1)": result="TriggerEvent<{'Hi': 'from trigger'}>"
   [2025-08-21, 13:24:59] INFO - trigger completed: 
name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 
1)"
   [2025-08-21, 13:25:00] INFO - __file__='/files/plugins/triggera.py' loaded: 
chan="stdout": source="task"
   [2025-08-21, 13:25:00] INFO - DAG bundles loaded: dags-folder: 
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
   [2025-08-21, 13:25:00] INFO - Filling up the DagBag from 
/files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag"
   [2025-08-21, 13:25:00] INFO - toplevel=***: source="root"
   [2025-08-21, 13:25:00] INFO - x='***': chan="stdout": source="task"
   [2025-08-21, 13:25:00] ERROR - Task failed with exception: source="task"
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to