amoghrajesh commented on PR #54449: URL: https://github.com/apache/airflow/pull/54449#issuecomment-3209421767
Some peace of mind tests that I ran to gain confidence in the PR: DAG used for testing some cases: ``` from __future__ import annotations import logging from airflow import DAG from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk import Variable, Connection def my_function() -> None: conn = Connection.get("my_connection") logging.getLogger(__name__).info(conn.password) print("via print", conn.password) with DAG("subprocess_dag") as dag: start = EmptyOperator(task_id="start") py_func = PythonOperator(task_id="py_func", python_callable=my_function) end = EmptyOperator(task_id="end") start >> py_func >> end ``` Env set: ``` export AIRFLOW_CONN_MY_CONNECTION="mysql://testuser:testpassword123@localhost:3306/testdb ``` 1. Running a task through CLI: `airflow tasks test subprocess_dag py_fu` ``` [2025-08-21T07:52:11.216+0000] {dag.py:2329} INFO - created dagrun <DagRun subprocess_dag @ None: __airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__, state:running, queued_at: None. run_type: manual> [2025-08-21T07:52:11.222+0000] {dag.py:1293} INFO - [DAG TEST] starting task_id=py_func map_index=-1 [2025-08-21T07:52:11.680+0000] {dag.py:1296} INFO - [DAG TEST] running task <TaskInstance: subprocess_dag.py_func __airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__ [None]> 2025-08-21 07:52:12 [debug ] Starting task instance run hostname=e2cc5b7dc023 pid=450 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 unixname=root 2025-08-21 07:52:12 [debug ] Retrieved task instance details dag_id=subprocess_dag state=queued task_id=py_func ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 2025-08-21 07:52:12 [info ] Task started hostname=e2cc5b7dc023 previous_state=queued ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 2025-08-21 07:52:12 [info ] Task instance state updated rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 [2025-08-21T07:52:12.497+0000] {_client.py:1025} INFO - HTTP Request: PATCH http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/run "HTTP/1.1 200 OK" 2025-08-21 07:52:12 [debug ] Sending request msg=SetRenderedFields(rendered_fields={'templates_dict': None, 'op_args': [], 'op_kwargs': {}}, type='SetRenderedFields') 2025-08-21 07:52:12 [debug ] Received message from task runner [supervisor] msg=SetRenderedFields(rendered_fields={'templates_dict': None, 'op_args': [], 'op_kwargs': {}}, type='SetRenderedFields') 2025-08-21 07:52:12 [info ] Updating RenderedTaskInstanceFields field_count=3 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 2025-08-21 07:52:12 [debug ] RenderedTaskInstanceFields updated successfully ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 [2025-08-21T07:52:12.509+0000] {_client.py:1025} INFO - HTTP Request: PUT http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/rtif "HTTP/1.1 201 Created" 2025-08-21 07:52:12 [debug ] Sending request msg=MaskSecret(value='***', name=None, type='MaskSecret') 2025-08-21 07:52:12 [debug ] Received message from task runner (body omitted) [supervisor] msg=<class 'airflow.sdk.execution_time.comms.MaskSecret'> [2025-08-21T07:52:12.513+0000] {subprocess_mask_var.py:13} INFO - *** via print *** [2025-08-21T07:52:12.513+0000] {python.py:218} INFO - Done. Returned value was: None 2025-08-21 07:52:12 [debug ] Sending request msg=SucceedTask(state='success', end_date=datetime.datetime(2025, 8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[], outlet_events=[], rendered_map_index=None, type='SucceedTask') 2025-08-21 07:52:12 [debug ] Received message from task runner [supervisor] msg=SucceedTask(state='success', end_date=datetime.datetime(2025, 8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[], outlet_events=[], rendered_map_index=None, type='SucceedTask') 2025-08-21 07:52:12 [debug ] Updating task instance state new_state=success ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 2025-08-21 07:52:12 [debug ] Retrieved current task instance state max_tries=0 previous_state=running ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 try_number=0 2025-08-21 07:52:12 [info ] Task instance state updated new_state=success rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 [2025-08-21T07:52:12.522+0000] {_client.py:1025} INFO - HTTP Request: PATCH http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/state "HTTP/1.1 204 No Content" 2025-08-21 07:52:12 [debug ] Running finalizers [task] ti=RuntimeTaskInstance(id=UUID('0198cb9d-28d2-7800-82b0-7161dc72f022'), task_id='py_func', dag_id='subprocess_dag', run_id='__airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__', try_number=0, dag_version_id=UUID('0198cb9b-d3d6-74f3-a561-083d897ed8cd'), map_index=-1, hostname='e2cc5b7dc023', context_carrier=None, task=<Task(PythonOperator): py_func>, max_tries=0, start_date=datetime.datetime(2025, 8, 21, 7, 52, 11, 880733, tzinfo=datetime.timezone.utc), end_date=datetime.datetime(2025, 8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), state=<TaskInstanceState.SUCCESS: 'success'>, is_mapped=False, rendered_map_index=None, log_url=None) ``` This is specifically to test the recent issue: https://github.com/apache/airflow/commit/1f4c55c0e38fae6f46b71255f0a4dae2157f0991 2. Running a trigger with a dag that will log using dag processor, root logger, structlog etc Trigger: ``` from __future__ import annotations import asyncio import logging import structlog from airflow.sdk import Connection, Variable from airflow.sdk.log import mask_secret from airflow.triggers.base import BaseTrigger, TriggerEvent print(f"{__file__=} loaded") class CustomTrigger(BaseTrigger): async def run(self, **args): from asgiref.sync import sync_to_async await sync_to_async(Variable.set)("my_api_key", "password1") x = await sync_to_async(Variable.get)("my_api_key") logging.getLogger(__name__).info("my_api_key=%s", x) secret = "some-secret-value" await sync_to_async(mask_secret)(secret) await sync_to_async(mask_secret)("some-secret-value") logging.getLogger(__name__).info("after manual mask %s", secret) await structlog.get_logger().ainfo("Testing structlog", val=secret, api_key="abcdef", x=x) yield TriggerEvent({"Hi": "from trigger"}) def serialize(self): return ( f"{type(self).__module__}.{type(self).__qualname__}", {}, ) ``` DAG: ``` from airflow.exceptions import AirflowRescheduleException, TaskDeferred from airflow.sdk import dag, task from airflow.sdk import Variable import logging x = Variable.get("toplevel_api_key", default="secret_api") print(f"{x=}") logging.root.info("toplevel=%s", x) @dag() def trigger_a_gag(): @task def trigger(event=None) -> None: if event: print(event) else: from triggera import CustomTrigger raise TaskDeferred(trigger=CustomTrigger(), method_name="execute") trigger() trigger_a_gag() ``` <img width="1704" height="247" alt="image" src="https://github.com/user-attachments/assets/2842464e-becd-44f6-abee-5bf67c8071f7" /> Logs: ``` [2025-08-21, 13:24:55] INFO - __file__='/files/plugins/triggera.py' loaded: chan="stdout": source="task" [2025-08-21, 13:24:55] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager" [2025-08-21, 13:24:55] INFO - Filling up the DagBag from /files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag" [2025-08-21, 13:24:55] INFO - toplevel=***: source="root" [2025-08-21, 13:24:55] INFO - x='***': chan="stdout": source="task" [2025-08-21, 13:24:55] INFO - Pausing task as DEFERRED. : dag_id="trigger_a_gag": task_id="trigger": run_id="manual__2025-08-21T07:54:54.321189+00:00": source="task" [2025-08-21, 13:24:57] INFO - trigger trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1) starting [2025-08-21, 13:24:59] INFO - my_api_key=***: source="triggera" [2025-08-21, 13:24:59] INFO - after manual mask ***: source="triggera" [2025-08-21, 13:24:59] INFO - Testing structlog: val="***": api_key="***": x="***" [2025-08-21, 13:24:59] INFO - Trigger fired event: name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1)": result="TriggerEvent<{'Hi': 'from trigger'}>" [2025-08-21, 13:24:59] INFO - trigger completed: name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1)" [2025-08-21, 13:25:00] INFO - __file__='/files/plugins/triggera.py' loaded: chan="stdout": source="task" [2025-08-21, 13:25:00] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager" [2025-08-21, 13:25:00] INFO - Filling up the DagBag from /files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag" [2025-08-21, 13:25:00] INFO - toplevel=***: source="root" [2025-08-21, 13:25:00] INFO - x='***': chan="stdout": source="task" [2025-08-21, 13:25:00] ERROR - Task failed with exception: source="task" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org