This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 31f14d34a2b1ca8211ded9ea3491f50b0fb6193f
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 21 02:55:04 2023 +0800

    Catch the exception that triggerer initialization failed (#31999)
    
    * fix(jobs/triggerer_job_runner): catch the exception that triggerer 
initialization failed
    
    * test(jobs/triggerer_job_runner): add test case for checking 
TriggerRunner.update_trigger catch triggerer change exception
    
    * refactor(jobs/triggerer_job_runner): catch multiple exception cases in 
the same try catch block when initialing trigger
    
    * refactor(jobs/triggerer_job_runner): remove unnecessary TypeError catching
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    
    * Revert "refactor(jobs/triggerer_job_runner): remove unnecessary TypeError 
catching"
    
    This reverts commit ac561a2416f16a8f9a3b211715fd4a7e012b47ad.
    
    * Revert "refactor(jobs/triggerer_job_runner): catch multiple exception 
cases in the same try catch block when initialing trigger"
    
    This reverts commit 4f3b24b8a7698973fe319618b6ff9f459dc0155a.
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    (cherry picked from commit a1ba15570219d2fe77466367e84fafa92cbdb24e)
---
 airflow/jobs/triggerer_job_runner.py |  9 ++++++++-
 tests/jobs/test_triggerer_job.py     | 22 +++++++++++++++++++++-
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index 7ca3fc2e94..8cbc17b228 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -688,7 +688,14 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 # Either the trigger code or the path to it is bad. Fail the 
trigger.
                 self.failed_triggers.append((new_id, e))
                 continue
-            new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
+
+            try:
+                new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
+            except TypeError as err:
+                self.log.error("Trigger failed; message=%s", err)
+                self.failed_triggers.append((new_id, err))
+                continue
+
             self.set_trigger_logging_metadata(new_trigger_orm.task_instance, 
new_id, new_trigger_instance)
             self.to_create.append((new_id, new_trigger_instance))
         # Enqueue orphaned triggers for cancellation
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index d0ce887a57..3e43d8f520 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -37,7 +37,7 @@ from airflow.models.baseoperator import BaseOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
 from airflow.triggers.base import TriggerEvent
-from airflow.triggers.temporal import TimeDeltaTrigger
+from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
 from airflow.triggers.testing import FailureTrigger, SuccessTrigger
 from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import RedirectStdHandler
@@ -281,6 +281,26 @@ class TestTriggerRunner:
             await trigger_runner.run_trigger(1, mock_trigger)
         assert "Trigger cancelled due to timeout" in caplog.text
 
+    @patch("airflow.models.trigger.Trigger.bulk_fetch")
+    @patch(
+        
"airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath",
+        return_value=DateTimeTrigger,
+    )
+    def test_update_trigger_with_triggerer_argument_change(
+        self, mock_bulk_fetch, mock_get_trigger_by_classpath, session, caplog
+    ) -> None:
+        trigger_runner = TriggerRunner()
+        mock_trigger_orm = MagicMock()
+        mock_trigger_orm.kwargs = {"moment": ..., "not_exists_arg": ...}
+        mock_get_trigger_by_classpath.return_value = {1: mock_trigger_orm}
+
+        trigger_runner.update_triggers({1})
+
+        assert (
+            "Trigger failed; message=__init__() got an unexpected keyword 
argument 'not_exists_arg'"
+            in caplog.text
+        )
+
 
 def test_trigger_create_race_condition_18392(session, tmp_path):
     """

Reply via email to