This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 31f14d34a2b1ca8211ded9ea3491f50b0fb6193f Author: Wei Lee <[email protected]> AuthorDate: Wed Jun 21 02:55:04 2023 +0800 Catch the exception that triggerer initialization failed (#31999) * fix(jobs/triggerer_job_runner): catch the exception that triggerer initialization failed * test(jobs/triggerer_job_runner): add test case for checking TriggerRunner.update_trigger catch triggerer change exception * refactor(jobs/triggerer_job_runner): catch multiple exception cases in the same try catch block when initialing trigger * refactor(jobs/triggerer_job_runner): remove unnecessary TypeError catching Co-authored-by: Tzu-ping Chung <[email protected]> * Revert "refactor(jobs/triggerer_job_runner): remove unnecessary TypeError catching" This reverts commit ac561a2416f16a8f9a3b211715fd4a7e012b47ad. * Revert "refactor(jobs/triggerer_job_runner): catch multiple exception cases in the same try catch block when initialing trigger" This reverts commit 4f3b24b8a7698973fe319618b6ff9f459dc0155a. --------- Co-authored-by: Tzu-ping Chung <[email protected]> (cherry picked from commit a1ba15570219d2fe77466367e84fafa92cbdb24e) --- airflow/jobs/triggerer_job_runner.py | 9 ++++++++- tests/jobs/test_triggerer_job.py | 22 +++++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 7ca3fc2e94..8cbc17b228 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -688,7 +688,14 @@ class TriggerRunner(threading.Thread, LoggingMixin): # Either the trigger code or the path to it is bad. Fail the trigger. self.failed_triggers.append((new_id, e)) continue - new_trigger_instance = trigger_class(**new_trigger_orm.kwargs) + + try: + new_trigger_instance = trigger_class(**new_trigger_orm.kwargs) + except TypeError as err: + self.log.error("Trigger failed; message=%s", err) + self.failed_triggers.append((new_id, err)) + continue + self.set_trigger_logging_metadata(new_trigger_orm.task_instance, new_id, new_trigger_instance) self.to_create.append((new_id, new_trigger_instance)) # Enqueue orphaned triggers for cancellation diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index d0ce887a57..3e43d8f520 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -37,7 +37,7 @@ from airflow.models.baseoperator import BaseOperator from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator from airflow.triggers.base import TriggerEvent -from airflow.triggers.temporal import TimeDeltaTrigger +from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.triggers.testing import FailureTrigger, SuccessTrigger from airflow.utils import timezone from airflow.utils.log.logging_mixin import RedirectStdHandler @@ -281,6 +281,26 @@ class TestTriggerRunner: await trigger_runner.run_trigger(1, mock_trigger) assert "Trigger cancelled due to timeout" in caplog.text + @patch("airflow.models.trigger.Trigger.bulk_fetch") + @patch( + "airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath", + return_value=DateTimeTrigger, + ) + def test_update_trigger_with_triggerer_argument_change( + self, mock_bulk_fetch, mock_get_trigger_by_classpath, session, caplog + ) -> None: + trigger_runner = TriggerRunner() + mock_trigger_orm = MagicMock() + mock_trigger_orm.kwargs = {"moment": ..., "not_exists_arg": ...} + mock_get_trigger_by_classpath.return_value = {1: mock_trigger_orm} + + trigger_runner.update_triggers({1}) + + assert ( + "Trigger failed; message=__init__() got an unexpected keyword argument 'not_exists_arg'" + in caplog.text + ) + def test_trigger_create_race_condition_18392(session, tmp_path): """
