This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5923470562c Fix AwaitMessageSensor not honoring timeout (#62104)
5923470562c is described below
commit 5923470562ccd9bb047fac499348935a64b28390
Author: Pranay Kumar Karvi <[email protected]>
AuthorDate: Thu Mar 12 23:31:14 2026 +0530
Fix AwaitMessageSensor not honoring timeout (#62104)
---
.../providers/apache/kafka/sensors/kafka.py | 6 ++++
.../apache/kafka/sensors/test_await_message.py | 38 ++++++++++++++++++++++
2 files changed, 44 insertions(+)
diff --git
a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
index 74bb12804f8..35b35454a67 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
@@ -17,6 +17,7 @@
from __future__ import annotations
from collections.abc import Callable, Sequence
+from datetime import timedelta
from typing import Any
from airflow.providers.apache.kafka.triggers.await_message import
AwaitMessageTrigger
@@ -103,6 +104,10 @@ class AwaitMessageSensor(BaseSensorOperator):
self.commit_offset = commit_offset
def execute(self, context) -> Any:
+ if isinstance(self.timeout, (int, float)):
+ timeout = timedelta(seconds=self.timeout)
+ else:
+ timeout = self.timeout
self.defer(
trigger=AwaitMessageTrigger(
topics=self.topics,
@@ -115,6 +120,7 @@ class AwaitMessageSensor(BaseSensorOperator):
commit_offset=self.commit_offset,
),
method_name="execute_complete",
+ timeout=timeout,
)
def execute_complete(self, context, event=None):
diff --git
a/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_await_message.py
b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_await_message.py
new file mode 100644
index 00000000000..00412d1ce12
--- /dev/null
+++
b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_await_message.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import timedelta
+
+from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
+
+
+def test_await_message_sensor_passes_timeout(mocker):
+ """Regression test for #62097: user-provided timeout must be passed to
defer()."""
+ sensor = AwaitMessageSensor(
+ task_id="test",
+ topics=["test"],
+ apply_function="builtins.print",
+ timeout=timedelta(seconds=30),
+ )
+
+ defer_mock = mocker.patch.object(sensor, "defer")
+
+ sensor.execute({})
+
+ defer_mock.assert_called_once()
+ assert defer_mock.call_args.kwargs["timeout"] == timedelta(seconds=30)