This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d8b969522a Fix AwaitMessageSensor to accept timeout and soft_fail 
parameters (#57863) (#58070)
1d8b969522a is described below

commit 1d8b969522a3a47a4821dead6dc13baa62c92cb9
Author: Ashir Alam <[email protected]>
AuthorDate: Fri Nov 14 15:47:15 2025 -0500

    Fix AwaitMessageSensor to accept timeout and soft_fail parameters (#57863) 
(#58070)
    
    - Changed AwaitMessageSensor and AwaitMessageTriggerFunctionSensor to 
inherit from BaseSensorOperator instead of BaseOperator
    - This enables both sensors to accept standard sensor parameters like 
timeout and soft_fail
    - Added comprehensive test coverage for timeout and soft_fail parameters
    - Updated documentation to reflect the new parameters
    
    Fixes #57863
---
 .../providers/apache/kafka/sensors/kafka.py        | 14 ++++--
 .../tests/unit/apache/kafka/sensors/test_kafka.py  | 50 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 3 deletions(-)

diff --git 
a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py 
b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
index 5a55530ebd5..42c249b6f58 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
@@ -20,12 +20,12 @@ from collections.abc import Callable, Sequence
 from typing import Any
 
 from airflow.providers.apache.kafka.triggers.await_message import 
AwaitMessageTrigger
-from airflow.providers.common.compat.sdk import BaseOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 
 VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
 
 
-class AwaitMessageSensor(BaseOperator):
+class AwaitMessageSensor(BaseSensorOperator):
     """
     An Airflow sensor that defers until a specific message is published to 
Kafka.
 
@@ -53,6 +53,10 @@ class AwaitMessageSensor(BaseOperator):
     :param poll_interval: How long the kafka consumer should sleep after 
reaching the end of the Kafka log,
         defaults to 5
     :param xcom_push_key: the name of a key to push the returned message to, 
defaults to None
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :param timeout: Time elapsed before the task times out and fails (in 
seconds)
+    :param poke_interval: This parameter is inherited but not used in this 
deferrable implementation
+    :param mode: This parameter is inherited but not used in this deferrable 
implementation
 
 
     """
@@ -111,7 +115,7 @@ class AwaitMessageSensor(BaseOperator):
         return event
 
 
-class AwaitMessageTriggerFunctionSensor(BaseOperator):
+class AwaitMessageTriggerFunctionSensor(BaseSensorOperator):
     """
     Defer until a specific message is published to Kafka, trigger a registered 
function, then resume waiting.
 
@@ -137,6 +141,10 @@ class AwaitMessageTriggerFunctionSensor(BaseOperator):
         cluster, defaults to 1
     :param poll_interval: How long the kafka consumer should sleep after 
reaching the end of the Kafka log,
         defaults to 5
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :param timeout: Time elapsed before the task times out and fails (in 
seconds)
+    :param poke_interval: This parameter is inherited but not used in this 
deferrable implementation
+    :param mode: This parameter is inherited but not used in this deferrable 
implementation
 
 
     """
diff --git 
a/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py 
b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py
index 817b8bdb2fe..eab4b968d87 100644
--- a/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py
+++ b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py
@@ -91,3 +91,53 @@ class TestSensors:
         # task should immediately come out of deferred
         with pytest.raises(TaskDeferred):
             sensor.execute_complete(context={})
+
+    def test_await_message_with_timeout_parameter(self):
+        """Test that AwaitMessageSensor accepts timeout parameter."""
+        sensor = AwaitMessageSensor(
+            kafka_config_id="kafka_d",
+            topics=["test"],
+            task_id="test",
+            apply_function=_return_true,
+            timeout=600,  # This should now work without errors
+        )
+
+        assert sensor.timeout == 600
+
+    def test_await_message_with_soft_fail_parameter(self):
+        """Test that AwaitMessageSensor accepts soft_fail parameter."""
+        sensor = AwaitMessageSensor(
+            kafka_config_id="kafka_d",
+            topics=["test"],
+            task_id="test",
+            apply_function=_return_true,
+            soft_fail=True,  # This should now work without errors
+        )
+
+        assert sensor.soft_fail is True
+
+    def test_await_message_trigger_function_with_timeout_parameter(self):
+        """Test that AwaitMessageTriggerFunctionSensor accepts timeout 
parameter."""
+        sensor = AwaitMessageTriggerFunctionSensor(
+            kafka_config_id="kafka_d",
+            topics=["test"],
+            task_id="test",
+            apply_function=_return_true,
+            event_triggered_function=_return_true,
+            timeout=600,
+        )
+
+        assert sensor.timeout == 600
+
+    def test_await_message_trigger_function_with_soft_fail_parameter(self):
+        """Test that AwaitMessageTriggerFunctionSensor accepts soft_fail 
parameter."""
+        sensor = AwaitMessageTriggerFunctionSensor(
+            kafka_config_id="kafka_d",
+            topics=["test"],
+            task_id="test",
+            apply_function=_return_true,
+            event_triggered_function=_return_true,
+            soft_fail=True,
+        )
+
+        assert sensor.soft_fail is True

Reply via email to