This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 274d9c3508 Avoid logging sensitive information in triggerer job log 
(#30110)
274d9c3508 is described below

commit 274d9c3508179ae8b0f705d9787e8200be7718e1
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Apr 6 13:59:17 2023 +0200

    Avoid logging sensitive information in triggerer job log (#30110)
    
    * Change trigger name to task id instead of repr(trigger) to avoid logging 
sensitive information
---
 airflow/jobs/triggerer_job.py    |  8 +++-
 tests/jobs/test_triggerer_job.py | 82 ++++++++++++++++++++++++++++++++++------
 2 files changed, 77 insertions(+), 13 deletions(-)

diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index 3120ac5cde..2875906f16 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -484,9 +484,15 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         while self.to_create:
             trigger_id, trigger_instance = self.to_create.popleft()
             if trigger_id not in self.triggers:
+                task_instance: TaskInstance = trigger_instance.task_instance
+                dag_id = task_instance.dag_id
+                run_id = task_instance.run_id
+                task_id = task_instance.task_id
+                map_index = task_instance.map_index
+                try_number = task_instance.try_number
                 self.triggers[trigger_id] = {
                     "task": asyncio.create_task(self.run_trigger(trigger_id, 
trigger_instance)),
-                    "name": f"{trigger_instance!r} (ID {trigger_id})",
+                    "name": 
f"{dag_id}/{run_id}/{task_id}/{map_index}/{try_number} (ID {trigger_id})",
                     "events": 0,
                 }
             else:
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index f560caeed1..8b32d4d1fe 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -24,12 +24,15 @@ import time
 from threading import Thread
 from unittest.mock import patch
 
+import pendulum
 import pytest
 
+from airflow import DAG
 from airflow.config_templates import airflow_local_settings
 from airflow.jobs.triggerer_job import TriggererJob, TriggerRunner, 
setup_queue_listener
 from airflow.logging_config import configure_logging
 from airflow.models import DagModel, DagRun, TaskInstance, Trigger
+from airflow.models.baseoperator import BaseOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
 from airflow.triggers.base import TriggerEvent
@@ -40,6 +43,7 @@ from airflow.utils.log.logging_mixin import RedirectStdHandler
 from airflow.utils.log.trigger_handler import LocalQueueHandler
 from airflow.utils.session import create_session
 from airflow.utils.state import State, TaskInstanceState
+from airflow.utils.types import DagRunType
 from tests.core.test_logging_config import reset_logging
 from tests.test_utils.db import clear_db_dags, clear_db_runs
 
@@ -80,6 +84,69 @@ def session():
         yield session
 
 
+def create_trigger_in_db(session, trigger, operator=None):
+    dag_model = DagModel(dag_id="test_dag")
+    dag = DAG(dag_id=dag_model.dag_id, start_date=pendulum.datetime(2023, 1, 
1))
+    run = DagRun(
+        dag_id=dag_model.dag_id,
+        run_id="test_run",
+        execution_date=pendulum.datetime(2023, 1, 1),
+        run_type=DagRunType.MANUAL,
+    )
+    trigger_orm = Trigger.from_object(trigger)
+    trigger_orm.id = 1
+    if operator:
+        operator.dag = dag
+    else:
+        operator = BaseOperator(task_id="test_ti", dag=dag)
+    task_instance = TaskInstance(operator, execution_date=run.execution_date, 
run_id=run.run_id)
+    task_instance.trigger_id = trigger_orm.id
+    session.add(dag_model)
+    session.add(run)
+    session.add(trigger_orm)
+    session.add(task_instance)
+    session.commit()
+    return dag_model, run, trigger_orm, task_instance
+
+
+def test_trigger_logging_sensitive_info(session, capsys):
+    """
+    Checks that when a trigger fires, it doesn't log any sensitive
+    information from arguments
+    """
+
+    class SensitiveArgOperator(BaseOperator):
+        def __init__(self, password, **kwargs):
+            self.password = password
+            super().__init__(**kwargs)
+
+    # Use a trigger that will immediately succeed
+    trigger = SuccessTrigger()
+    op = SensitiveArgOperator(task_id="sensitive_arg_task", 
password="some_password")
+    create_trigger_in_db(session, trigger, operator=op)
+    # Make a TriggererJob and have it retrieve DB tasks
+    job = TriggererJob()
+    job.load_triggers()
+    # Now, start TriggerRunner up (and set it as a daemon thread during tests)
+    job.runner.daemon = True
+    job.runner.start()
+    try:
+        # Wait for up to 3 seconds for it to fire and appear in the event queue
+        for _ in range(30):
+            if job.runner.events:
+                assert list(job.runner.events) == [(1, TriggerEvent(True))]
+                break
+            time.sleep(0.1)
+        else:
+            pytest.fail("TriggerRunner never sent the trigger event out")
+    finally:
+        # We always have to stop the runner
+        job.runner.stop = True
+    stdout = capsys.readouterr().out
+    assert "test_dag/test_run/sensitive_arg_task/-1/1 (ID 1) starting" in 
stdout
+    assert "some_password" not in stdout
+
+
 def test_is_alive():
     """Checks the heartbeat logic"""
     # Current time
@@ -148,10 +215,7 @@ def test_trigger_lifecycle(session):
     # Use a trigger that will not fire for the lifetime of the test
     # (we want to avoid it firing and deleting itself)
     trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
-    trigger_orm = Trigger.from_object(trigger)
-    trigger_orm.id = 1
-    session.add(trigger_orm)
-    session.commit()
+    dag_model, run, trigger_orm, task_instance = create_trigger_in_db(session, 
trigger)
     # Make a TriggererJob and have it retrieve DB tasks
     job = TriggererJob()
     job.load_triggers()
@@ -343,10 +407,7 @@ def test_trigger_firing(session):
     """
     # Use a trigger that will immediately succeed
     trigger = SuccessTrigger()
-    trigger_orm = Trigger.from_object(trigger)
-    trigger_orm.id = 1
-    session.add(trigger_orm)
-    session.commit()
+    create_trigger_in_db(session, trigger)
     # Make a TriggererJob and have it retrieve DB tasks
     job = TriggererJob()
     job.load_triggers()
@@ -374,10 +435,7 @@ def test_trigger_failing(session):
     """
     # Use a trigger that will immediately fail
     trigger = FailureTrigger()
-    trigger_orm = Trigger.from_object(trigger)
-    trigger_orm.id = 1
-    session.add(trigger_orm)
-    session.commit()
+    create_trigger_in_db(session, trigger)
     # Make a TriggererJob and have it retrieve DB tasks
     job = TriggererJob()
     job.load_triggers()

Reply via email to