This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1d8b969522a Fix AwaitMessageSensor to accept timeout and soft_fail
parameters (#57863) (#58070)
1d8b969522a is described below
commit 1d8b969522a3a47a4821dead6dc13baa62c92cb9
Author: Ashir Alam <[email protected]>
AuthorDate: Fri Nov 14 15:47:15 2025 -0500
Fix AwaitMessageSensor to accept timeout and soft_fail parameters (#57863)
(#58070)
- Changed AwaitMessageSensor and AwaitMessageTriggerFunctionSensor to
inherit from BaseSensorOperator instead of BaseOperator
- This enables both sensors to accept standard sensor parameters like
timeout and soft_fail
- Added comprehensive test coverage for timeout and soft_fail parameters
- Updated documentation to reflect the new parameters
Fixes #57863
---
.../providers/apache/kafka/sensors/kafka.py | 14 ++++--
.../tests/unit/apache/kafka/sensors/test_kafka.py | 50 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 3 deletions(-)
diff --git
a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
index 5a55530ebd5..42c249b6f58 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
@@ -20,12 +20,12 @@ from collections.abc import Callable, Sequence
from typing import Any
from airflow.providers.apache.kafka.triggers.await_message import
AwaitMessageTrigger
-from airflow.providers.common.compat.sdk import BaseOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
-class AwaitMessageSensor(BaseOperator):
+class AwaitMessageSensor(BaseSensorOperator):
"""
An Airflow sensor that defers until a specific message is published to
Kafka.
@@ -53,6 +53,10 @@ class AwaitMessageSensor(BaseOperator):
:param poll_interval: How long the kafka consumer should sleep after
reaching the end of the Kafka log,
defaults to 5
:param xcom_push_key: the name of a key to push the returned message to,
defaults to None
+ :param soft_fail: Set to true to mark the task as SKIPPED on failure
+ :param timeout: Time elapsed before the task times out and fails (in
seconds)
+ :param poke_interval: This parameter is inherited but not used in this
deferrable implementation
+ :param mode: This parameter is inherited but not used in this deferrable
implementation
"""
@@ -111,7 +115,7 @@ class AwaitMessageSensor(BaseOperator):
return event
-class AwaitMessageTriggerFunctionSensor(BaseOperator):
+class AwaitMessageTriggerFunctionSensor(BaseSensorOperator):
"""
Defer until a specific message is published to Kafka, trigger a registered
function, then resume waiting.
@@ -137,6 +141,10 @@ class AwaitMessageTriggerFunctionSensor(BaseOperator):
cluster, defaults to 1
:param poll_interval: How long the kafka consumer should sleep after
reaching the end of the Kafka log,
defaults to 5
+ :param soft_fail: Set to true to mark the task as SKIPPED on failure
+ :param timeout: Time elapsed before the task times out and fails (in
seconds)
+ :param poke_interval: This parameter is inherited but not used in this
deferrable implementation
+ :param mode: This parameter is inherited but not used in this deferrable
implementation
"""
diff --git
a/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py
b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py
index 817b8bdb2fe..eab4b968d87 100644
--- a/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py
+++ b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py
@@ -91,3 +91,53 @@ class TestSensors:
# task should immediately come out of deferred
with pytest.raises(TaskDeferred):
sensor.execute_complete(context={})
+
+ def test_await_message_with_timeout_parameter(self):
+ """Test that AwaitMessageSensor accepts timeout parameter."""
+ sensor = AwaitMessageSensor(
+ kafka_config_id="kafka_d",
+ topics=["test"],
+ task_id="test",
+ apply_function=_return_true,
+ timeout=600, # This should now work without errors
+ )
+
+ assert sensor.timeout == 600
+
+ def test_await_message_with_soft_fail_parameter(self):
+ """Test that AwaitMessageSensor accepts soft_fail parameter."""
+ sensor = AwaitMessageSensor(
+ kafka_config_id="kafka_d",
+ topics=["test"],
+ task_id="test",
+ apply_function=_return_true,
+ soft_fail=True, # This should now work without errors
+ )
+
+ assert sensor.soft_fail is True
+
+ def test_await_message_trigger_function_with_timeout_parameter(self):
+ """Test that AwaitMessageTriggerFunctionSensor accepts timeout
parameter."""
+ sensor = AwaitMessageTriggerFunctionSensor(
+ kafka_config_id="kafka_d",
+ topics=["test"],
+ task_id="test",
+ apply_function=_return_true,
+ event_triggered_function=_return_true,
+ timeout=600,
+ )
+
+ assert sensor.timeout == 600
+
+ def test_await_message_trigger_function_with_soft_fail_parameter(self):
+ """Test that AwaitMessageTriggerFunctionSensor accepts soft_fail
parameter."""
+ sensor = AwaitMessageTriggerFunctionSensor(
+ kafka_config_id="kafka_d",
+ topics=["test"],
+ task_id="test",
+ apply_function=_return_true,
+ event_triggered_function=_return_true,
+ soft_fail=True,
+ )
+
+ assert sensor.soft_fail is True