This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 274d9c3508 Avoid logging sensitive information in triggerer job log
(#30110)
274d9c3508 is described below
commit 274d9c3508179ae8b0f705d9787e8200be7718e1
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Apr 6 13:59:17 2023 +0200
Avoid logging sensitive information in triggerer job log (#30110)
* Change trigger name to task id instead of repr(trigger) to avoid logging
sensitive information
---
airflow/jobs/triggerer_job.py | 8 +++-
tests/jobs/test_triggerer_job.py | 82 ++++++++++++++++++++++++++++++++++------
2 files changed, 77 insertions(+), 13 deletions(-)
diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index 3120ac5cde..2875906f16 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -484,9 +484,15 @@ class TriggerRunner(threading.Thread, LoggingMixin):
while self.to_create:
trigger_id, trigger_instance = self.to_create.popleft()
if trigger_id not in self.triggers:
+ task_instance: TaskInstance = trigger_instance.task_instance
+ dag_id = task_instance.dag_id
+ run_id = task_instance.run_id
+ task_id = task_instance.task_id
+ map_index = task_instance.map_index
+ try_number = task_instance.try_number
self.triggers[trigger_id] = {
"task": asyncio.create_task(self.run_trigger(trigger_id,
trigger_instance)),
- "name": f"{trigger_instance!r} (ID {trigger_id})",
+ "name":
f"{dag_id}/{run_id}/{task_id}/{map_index}/{try_number} (ID {trigger_id})",
"events": 0,
}
else:
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index f560caeed1..8b32d4d1fe 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -24,12 +24,15 @@ import time
from threading import Thread
from unittest.mock import patch
+import pendulum
import pytest
+from airflow import DAG
from airflow.config_templates import airflow_local_settings
from airflow.jobs.triggerer_job import TriggererJob, TriggerRunner,
setup_queue_listener
from airflow.logging_config import configure_logging
from airflow.models import DagModel, DagRun, TaskInstance, Trigger
+from airflow.models.baseoperator import BaseOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.triggers.base import TriggerEvent
@@ -40,6 +43,7 @@ from airflow.utils.log.logging_mixin import RedirectStdHandler
from airflow.utils.log.trigger_handler import LocalQueueHandler
from airflow.utils.session import create_session
from airflow.utils.state import State, TaskInstanceState
+from airflow.utils.types import DagRunType
from tests.core.test_logging_config import reset_logging
from tests.test_utils.db import clear_db_dags, clear_db_runs
@@ -80,6 +84,69 @@ def session():
yield session
+def create_trigger_in_db(session, trigger, operator=None):
+ dag_model = DagModel(dag_id="test_dag")
+ dag = DAG(dag_id=dag_model.dag_id, start_date=pendulum.datetime(2023, 1,
1))
+ run = DagRun(
+ dag_id=dag_model.dag_id,
+ run_id="test_run",
+ execution_date=pendulum.datetime(2023, 1, 1),
+ run_type=DagRunType.MANUAL,
+ )
+ trigger_orm = Trigger.from_object(trigger)
+ trigger_orm.id = 1
+ if operator:
+ operator.dag = dag
+ else:
+ operator = BaseOperator(task_id="test_ti", dag=dag)
+ task_instance = TaskInstance(operator, execution_date=run.execution_date,
run_id=run.run_id)
+ task_instance.trigger_id = trigger_orm.id
+ session.add(dag_model)
+ session.add(run)
+ session.add(trigger_orm)
+ session.add(task_instance)
+ session.commit()
+ return dag_model, run, trigger_orm, task_instance
+
+
+def test_trigger_logging_sensitive_info(session, capsys):
+ """
+ Checks that when a trigger fires, it doesn't log any sensitive
+ information from arguments
+ """
+
+ class SensitiveArgOperator(BaseOperator):
+ def __init__(self, password, **kwargs):
+ self.password = password
+ super().__init__(**kwargs)
+
+ # Use a trigger that will immediately succeed
+ trigger = SuccessTrigger()
+ op = SensitiveArgOperator(task_id="sensitive_arg_task",
password="some_password")
+ create_trigger_in_db(session, trigger, operator=op)
+ # Make a TriggererJob and have it retrieve DB tasks
+ job = TriggererJob()
+ job.load_triggers()
+ # Now, start TriggerRunner up (and set it as a daemon thread during tests)
+ job.runner.daemon = True
+ job.runner.start()
+ try:
+ # Wait for up to 3 seconds for it to fire and appear in the event queue
+ for _ in range(30):
+ if job.runner.events:
+ assert list(job.runner.events) == [(1, TriggerEvent(True))]
+ break
+ time.sleep(0.1)
+ else:
+ pytest.fail("TriggerRunner never sent the trigger event out")
+ finally:
+ # We always have to stop the runner
+ job.runner.stop = True
+ stdout = capsys.readouterr().out
+ assert "test_dag/test_run/sensitive_arg_task/-1/1 (ID 1) starting" in
stdout
+ assert "some_password" not in stdout
+
+
def test_is_alive():
"""Checks the heartbeat logic"""
# Current time
@@ -148,10 +215,7 @@ def test_trigger_lifecycle(session):
# Use a trigger that will not fire for the lifetime of the test
# (we want to avoid it firing and deleting itself)
trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
- trigger_orm = Trigger.from_object(trigger)
- trigger_orm.id = 1
- session.add(trigger_orm)
- session.commit()
+ dag_model, run, trigger_orm, task_instance = create_trigger_in_db(session,
trigger)
# Make a TriggererJob and have it retrieve DB tasks
job = TriggererJob()
job.load_triggers()
@@ -343,10 +407,7 @@ def test_trigger_firing(session):
"""
# Use a trigger that will immediately succeed
trigger = SuccessTrigger()
- trigger_orm = Trigger.from_object(trigger)
- trigger_orm.id = 1
- session.add(trigger_orm)
- session.commit()
+ create_trigger_in_db(session, trigger)
# Make a TriggererJob and have it retrieve DB tasks
job = TriggererJob()
job.load_triggers()
@@ -374,10 +435,7 @@ def test_trigger_failing(session):
"""
# Use a trigger that will immediately fail
trigger = FailureTrigger()
- trigger_orm = Trigger.from_object(trigger)
- trigger_orm.id = 1
- session.add(trigger_orm)
- session.commit()
+ create_trigger_in_db(session, trigger)
# Make a TriggererJob and have it retrieve DB tasks
job = TriggererJob()
job.load_triggers()