This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5923470562c Fix AwaitMessageSensor not honoring timeout (#62104)
5923470562c is described below

commit 5923470562ccd9bb047fac499348935a64b28390
Author: Pranay Kumar Karvi <[email protected]>
AuthorDate: Thu Mar 12 23:31:14 2026 +0530

    Fix AwaitMessageSensor not honoring timeout (#62104)
---
 .../providers/apache/kafka/sensors/kafka.py        |  6 ++++
 .../apache/kafka/sensors/test_await_message.py     | 38 ++++++++++++++++++++++
 2 files changed, 44 insertions(+)

diff --git 
a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py 
b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
index 74bb12804f8..35b35454a67 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 from collections.abc import Callable, Sequence
+from datetime import timedelta
 from typing import Any
 
 from airflow.providers.apache.kafka.triggers.await_message import 
AwaitMessageTrigger
@@ -103,6 +104,10 @@ class AwaitMessageSensor(BaseSensorOperator):
         self.commit_offset = commit_offset
 
     def execute(self, context) -> Any:
+        if isinstance(self.timeout, (int, float)):
+            timeout = timedelta(seconds=self.timeout)
+        else:
+            timeout = self.timeout
         self.defer(
             trigger=AwaitMessageTrigger(
                 topics=self.topics,
@@ -115,6 +120,7 @@ class AwaitMessageSensor(BaseSensorOperator):
                 commit_offset=self.commit_offset,
             ),
             method_name="execute_complete",
+            timeout=timeout,
         )
 
     def execute_complete(self, context, event=None):
diff --git 
a/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_await_message.py 
b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_await_message.py
new file mode 100644
index 00000000000..00412d1ce12
--- /dev/null
+++ 
b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_await_message.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import timedelta
+
+from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
+
+
+def test_await_message_sensor_passes_timeout(mocker):
+    """Regression test for #62097: user-provided timeout must be passed to 
defer()."""
+    sensor = AwaitMessageSensor(
+        task_id="test",
+        topics=["test"],
+        apply_function="builtins.print",
+        timeout=timedelta(seconds=30),
+    )
+
+    defer_mock = mocker.patch.object(sensor, "defer")
+
+    sensor.execute({})
+
+    defer_mock.assert_called_once()
+    assert defer_mock.call_args.kwargs["timeout"] == timedelta(seconds=30)

Reply via email to