kaxil commented on code in PR #62790:
URL: https://github.com/apache/airflow/pull/62790#discussion_r3307496547


##########
providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py:
##########
@@ -0,0 +1,569 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import json
+import threading
+from contextlib import contextmanager, suppress
+from typing import TYPE_CHECKING, Any
+
+from asgiref.sync import sync_to_async
+
+from airflow.providers.common.compat.connection import get_async_connection
+from airflow.providers.common.compat.sdk import BaseHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from airflow.providers.common.compat.sdk import Connection
+
+# Backoff parameters for transient consume failures
+_BACKOFF_BASE: float = 1.0
+_BACKOFF_MAX: float = 60.0
+_BACKOFF_FACTOR: float = 2.0
+_TRANSIENT_REASON_NAMES = frozenset(
+    {
+        "MQRC_CONNECTION_BROKEN",
+        "MQRC_Q_MGR_QUIESCING",
+        "MQRC_Q_MGR_NOT_AVAILABLE",
+        "MQRC_HOST_NOT_AVAILABLE",
+        "MQRC_CONNECTION_QUIESCING",
+    }
+)
+
+
+class IBMMQError(Exception):
+    """
+    Lightweight wrapper for IBM MQ errors raised by the consumer thread.
+
+    This allows the async event-loop code in :meth:`IBMMQHook.aconsume` to
+    handle MQ errors **without importing the heavy ibmmq C extension** on the
+    event-loop thread.
+
+    :param reason: The integer MQ reason code (e.g. 
``MQRC_CONNECTION_BROKEN``).
+    :param comp: The integer MQ completion code.
+    :param transient: Whether this error is considered transient (eligible for 
retry).
+    :param message: Human-readable description of the error.
+    """
+
+    def __init__(self, reason: int, comp: int, transient: bool, message: str):
+        super().__init__(message)
+        self.reason = reason
+        self.comp = comp
+        self.transient = transient
+
+
+class IBMMQConsumer(threading.Thread, LoggingMixin):
+    """
+    Thread worker that consumes one message from an IBM MQ queue.
+
+    The consumer is used by :meth:`IBMMQHook.aconsume` to execute blocking MQ
+    calls in a dedicated thread because the IBM MQ C client requires handles to
+    be used from the thread that created them. The result (or exception) is
+    forwarded to the asyncio event loop through ``future``.
+
+    :param hook: Hook instance that provides connection and queue helpers.
+    :param connection: Airflow connection object resolved from 
``hook.conn_id``.
+    :param queue_name: Queue to consume from.
+    :param poll_interval: Maximum wait time (seconds) for each ``q.get`` call.
+    :param loop: Event loop that owns ``future``.
+    :param future: Future completed with the decoded message or an exception.
+    :param stop_event: Signal used to stop polling after cancellation.
+    """
+
+    def __init__(
+        self,
+        hook: IBMMQHook,
+        connection: Connection,
+        queue_name: str,
+        poll_interval: float,
+        loop: asyncio.AbstractEventLoop,
+        future: asyncio.Future,
+        stop_event: threading.Event,
+    ):
+        super().__init__(daemon=True)
+        self.hook = hook
+        self.connection = connection
+        self.queue_name = queue_name
+        self.poll_interval = poll_interval
+        self.loop = loop
+        self.future = future
+        self.stop_event = stop_event
+
+    def _process_message(self, message: bytes) -> str:
+        """
+        Process a raw MQ message.
+
+        If the message contains an RFH2 header, the header is unpacked and the
+        payload following the header is returned. If unpacking fails, the raw
+        message is returned decoded as UTF-8.
+
+        :param message: Raw message received from IBM MQ.
+        :return: Decoded message payload.
+        """
+        import ibmmq
+
+        try:
+            rfh2 = ibmmq.RFH2()
+            rfh2.unpack(message)
+
+            payload_offset = rfh2.get_length()
+            payload = message[payload_offset:]
+
+            decoded = payload.decode("utf-8", errors="ignore")
+            self.log.info("Message received from MQ (RFH2 decoded): %s", 
decoded)

Review Comment:
   This was flagged in the previous review and still logs the full decoded MQ 
payload at INFO. IBM MQ in regulated industries (banking, insurance) routinely 
carries PII / PCI / PHI -- INFO-level payload logging ships sensitive content 
to every logging sink with no opt-out. The fallback path at line 133-136 logs 
the raw bytes at WARNING too. Drop both to DEBUG and truncate (e.g., first 200 
chars), or log only metadata (queue name, message id, size).



##########
providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py:
##########
@@ -0,0 +1,569 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import json
+import threading
+from contextlib import contextmanager, suppress
+from typing import TYPE_CHECKING, Any
+
+from asgiref.sync import sync_to_async
+
+from airflow.providers.common.compat.connection import get_async_connection
+from airflow.providers.common.compat.sdk import BaseHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from airflow.providers.common.compat.sdk import Connection
+
+# Backoff parameters for transient consume failures
+_BACKOFF_BASE: float = 1.0
+_BACKOFF_MAX: float = 60.0
+_BACKOFF_FACTOR: float = 2.0
+_TRANSIENT_REASON_NAMES = frozenset(
+    {
+        "MQRC_CONNECTION_BROKEN",
+        "MQRC_Q_MGR_QUIESCING",
+        "MQRC_Q_MGR_NOT_AVAILABLE",
+        "MQRC_HOST_NOT_AVAILABLE",
+        "MQRC_CONNECTION_QUIESCING",
+    }
+)
+
+
+class IBMMQError(Exception):
+    """
+    Lightweight wrapper for IBM MQ errors raised by the consumer thread.
+
+    This allows the async event-loop code in :meth:`IBMMQHook.aconsume` to
+    handle MQ errors **without importing the heavy ibmmq C extension** on the
+    event-loop thread.
+
+    :param reason: The integer MQ reason code (e.g. 
``MQRC_CONNECTION_BROKEN``).
+    :param comp: The integer MQ completion code.
+    :param transient: Whether this error is considered transient (eligible for 
retry).
+    :param message: Human-readable description of the error.
+    """
+
+    def __init__(self, reason: int, comp: int, transient: bool, message: str):
+        super().__init__(message)
+        self.reason = reason
+        self.comp = comp
+        self.transient = transient
+
+
+class IBMMQConsumer(threading.Thread, LoggingMixin):
+    """
+    Thread worker that consumes one message from an IBM MQ queue.
+
+    The consumer is used by :meth:`IBMMQHook.aconsume` to execute blocking MQ
+    calls in a dedicated thread because the IBM MQ C client requires handles to
+    be used from the thread that created them. The result (or exception) is
+    forwarded to the asyncio event loop through ``future``.
+
+    :param hook: Hook instance that provides connection and queue helpers.
+    :param connection: Airflow connection object resolved from 
``hook.conn_id``.
+    :param queue_name: Queue to consume from.
+    :param poll_interval: Maximum wait time (seconds) for each ``q.get`` call.
+    :param loop: Event loop that owns ``future``.
+    :param future: Future completed with the decoded message or an exception.
+    :param stop_event: Signal used to stop polling after cancellation.
+    """
+
+    def __init__(
+        self,
+        hook: IBMMQHook,
+        connection: Connection,
+        queue_name: str,
+        poll_interval: float,
+        loop: asyncio.AbstractEventLoop,
+        future: asyncio.Future,
+        stop_event: threading.Event,
+    ):
+        super().__init__(daemon=True)
+        self.hook = hook
+        self.connection = connection
+        self.queue_name = queue_name
+        self.poll_interval = poll_interval
+        self.loop = loop
+        self.future = future
+        self.stop_event = stop_event
+
+    def _process_message(self, message: bytes) -> str:
+        """
+        Process a raw MQ message.
+
+        If the message contains an RFH2 header, the header is unpacked and the
+        payload following the header is returned. If unpacking fails, the raw
+        message is returned decoded as UTF-8.
+
+        :param message: Raw message received from IBM MQ.
+        :return: Decoded message payload.
+        """
+        import ibmmq
+
+        try:
+            rfh2 = ibmmq.RFH2()
+            rfh2.unpack(message)
+
+            payload_offset = rfh2.get_length()
+            payload = message[payload_offset:]
+
+            decoded = payload.decode("utf-8", errors="ignore")
+            self.log.info("Message received from MQ (RFH2 decoded): %s", 
decoded)
+            return decoded
+        except ibmmq.PYIFError as error:  # RFH2 header not present or unpack 
failed
+            self.log.warning(
+                "Failed to unpack RFH2 header (%s). Returning raw message 
payload: %s",
+                error,
+                message,
+            )
+            return message.decode("utf-8", errors="ignore")
+
+    def consume(
+        self,
+        queue_name: str,
+        poll_interval: float,
+        stop_event: threading.Event,
+    ) -> str | None:
+        """
+        Blocking implementation that consumes a single message from the given 
IBM MQ queue.
+
+        All IBM MQ handles (queue manager connection, queue) are created **and 
used** within
+        this method, satisfying the thread-affinity requirement of the IBM MQ 
C client library.
+        The 'stop_event' is checked between 'q.get' calls so the thread 
terminates promptly
+        after the coroutine side is canceled.  Reads are performed with
+        ``MQGMO_NO_SYNCPOINT``, so this method provides at-most-once delivery:
+        once ``q.get`` returns successfully, IBM MQ has already committed the
+        message removal from the queue.
+
+        MQ-specific exceptions are caught and re-raised as :class:`IBMMQError` 
so that
+        the async caller never needs to import the heavy ``ibmmq`` C extension.
+
+        For an asynchronous interface see :meth:`IBMMQHook.aconsume`.
+        """
+        import ibmmq
+
+        transient_reasons = frozenset(
+            getattr(ibmmq.CMQC, name) for name in _TRANSIENT_REASON_NAMES if 
hasattr(ibmmq.CMQC, name)
+        )
+
+        od = ibmmq.OD()
+        od.ObjectName = queue_name
+
+        md = ibmmq.MD()
+        md.Format = ibmmq.CMQC.MQFMT_STRING
+        md.CodedCharSetId = 1208
+        md.Encoding = ibmmq.CMQC.MQENC_NATIVE
+
+        gmo = ibmmq.GMO()
+        gmo.Options = ibmmq.CMQC.MQGMO_WAIT | ibmmq.CMQC.MQGMO_NO_SYNCPOINT | 
ibmmq.CMQC.MQGMO_CONVERT
+        gmo.WaitInterval = int(poll_interval * 1000)
+
+        try:
+            with self.hook.get_conn(connection=self.connection) as conn:
+                q = ibmmq.Queue(conn, od, 
self.hook.get_open_options(queue_name=queue_name))
+                try:
+                    # WaitInterval already blocks for poll_interval seconds 
when no message is
+                    # available, so no additional sleep is needed between 
iterations.
+                    while not stop_event.is_set():
+                        try:
+                            message = q.get(None, md, gmo)
+                            if message:
+                                return self._process_message(message)
+                        except ibmmq.MQMIError as e:
+                            if e.reason == ibmmq.CMQC.MQRC_NO_MSG_AVAILABLE:
+                                self.log.info(
+                                    "No message available on queue '%s' 
(reason=%s)",
+                                    queue_name,
+                                    e.reason,
+                                )
+                                continue
+                            self.log.error(
+                                "IBM MQ error on queue '%s': 
completion_code=%s reason_code=%s (%s)",
+                                queue_name,
+                                e.comp,
+                                e.reason,
+                                e,
+                            )
+                            raise
+                finally:
+                    with suppress(Exception):
+                        q.close()
+        except ibmmq.MQMIError as e:
+            raise IBMMQError(
+                reason=e.reason,
+                comp=e.comp,
+                transient=e.reason in transient_reasons,
+                message=str(e),
+            ) from e
+        except ibmmq.PYIFError as e:
+            raise IBMMQError(
+                reason=0,
+                comp=0,
+                transient=False,
+                message=str(e),
+            ) from e
+        except ConnectionError as e:
+            # _connect() wraps ibmmq.MQMIError as ConnectionError; treat as 
transient
+            # so aconsume retries with backoff instead of killing the trigger.
+            raise IBMMQError(
+                reason=0,
+                comp=0,
+                transient=True,
+                message=str(e),
+            ) from e
+        return None
+
+    def run(self):
+        try:
+            result = self.consume(
+                queue_name=self.queue_name,
+                poll_interval=self.poll_interval,
+                stop_event=self.stop_event,
+            )
+
+            if not self.future.cancelled():
+                self.loop.call_soon_threadsafe(self.future.set_result, result)
+        except Exception as e:
+            if not self.future.cancelled():
+                self.loop.call_soon_threadsafe(self.future.set_exception, e)
+
+
+class IBMMQHook(BaseHook):
+    """
+    Interact with IBM MQ queue managers to consume and produce messages.
+
+    This hook wraps the ``ibmmq`` C client and manages connection
+    lifecycle, queue open/close, and message serialization.  Both synchronous
+    (context-manager) and asynchronous (``consume`` / ``produce``) interfaces
+    are provided.
+
+    The asynchronous consume path intentionally uses ``MQGMO_NO_SYNCPOINT``.
+    That keeps reads non-transactional and therefore provides **at-most-once**
+    delivery semantics for trigger-based consumption: once ``q.get`` returns,
+    IBM MQ has already removed the message from the queue.  If the coroutine is
+    canceled after that point but before Airflow yields a ``TriggerEvent``, the
+    message can be lost.  The stop-event machinery only prevents additional
+    ``q.get`` calls after cancellation; it cannot make the non-transactional
+    get atomic with TriggerEvent emission.
+
+    Connection parameters (host, port, login, password) are read from the
+    Airflow connection identified by *conn_id*.  ``queue_manager``, 
``channel``,
+    and ``open_options`` can be supplied either as constructor arguments or via
+    the connection's *extra* JSON — constructor arguments take precedence.
+
+    :param conn_id: Airflow connection ID for the IBM MQ instance.
+        Defaults to ``"mq_default"``.
+    :param queue_manager: Name of the IBM MQ queue manager to connect to.
+        If not provided, the value is read from the ``queue_manager`` key in
+        the connection's *extra* JSON.
+    :param channel: MQ channel name used for the connection.
+        If not provided, the value is read from the ``channel`` key in the
+        connection's *extra* JSON.
+    :param open_options: Integer bitmask of ``MQOO_*`` open options passed
+        when opening a queue (e.g.,
+        ``ibmmq.CMQC.MQOO_INPUT_SHARED | ibmmq.CMQC.MQOO_FAIL_IF_QUIESCING``).
+        If not provided, the value is resolved from the ``open_options`` key
+        in the connection's *extra* JSON, falling back to
+        ``MQOO_INPUT_SHARED``.
+    """
+
+    conn_name_attr = "conn_id"
+    default_conn_name = "mq_default"
+    conn_type = "ibmmq"
+    hook_name = "IBM MQ"
+    default_open_options = "MQOO_INPUT_SHARED"
+
+    def __init__(
+        self,
+        conn_id: str = default_conn_name,
+        queue_manager: str | None = None,
+        channel: str | None = None,
+        open_options: int | None = None,
+    ):
+        super().__init__()
+        self.conn_id = conn_id
+        self.queue_manager = queue_manager
+        self.channel = channel
+        self.open_options = open_options
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Return custom UI field behaviour for IBM MQ Connection."""
+        return {
+            "hidden_fields": ["schema"],
+            "placeholders": {
+                "host": "mq.example.com",
+                "port": "1414",
+                "login": "app_user",
+                "extra": json.dumps(
+                    {
+                        "queue_manager": "QM1",
+                        "channel": "DEV.APP.SVRCONN",
+                        "open_options": cls.default_open_options,
+                    },
+                    indent=2,
+                ),
+            },
+        }
+
+    @classmethod
+    def get_open_options_flags(cls, open_options: int) -> list[str]:
+        """
+        Return the symbolic MQ open option flags set in a given bitmask.
+
+        Each flag corresponds to a constant in ``ibmmq.CMQC`` that starts with 
``MQOO_``.
+
+        :param open_options: The integer bitmask used when opening an MQ queue
+                             (e.g., ``MQOO_INPUT_EXCLUSIVE | 
MQOO_FAIL_IF_QUIESCING``).
+
+        :return: A list of the names of the MQ open flags that are set in the 
bitmask.
+                 For example, ``['MQOO_INPUT_EXCLUSIVE', 
'MQOO_FAIL_IF_QUIESCING']``.
+
+        Example:
+            >>> open_options = ibmmq.CMQC.MQOO_INPUT_SHARED | 
ibmmq.CMQC.MQOO_FAIL_IF_QUIESCING
+            >>> cls.get_open_options_flags(open_options)
+            ['MQOO_INPUT_SHARED', 'MQOO_FAIL_IF_QUIESCING']
+        """
+        import ibmmq
+
+        return [
+            name
+            for name, value in vars(ibmmq.CMQC).items()
+            if name.startswith("MQOO_") and (open_options & value)
+        ]
+
+    def get_open_options(self, queue_name: str) -> int | None:
+        if self.open_options is not None:
+            flag_names = self.get_open_options_flags(self.open_options)
+            self.log.info(
+                "Opening MQ queue '%s' with open_options=%s (%s)",
+                queue_name,
+                self.open_options,
+                ", ".join(flag_names),
+            )
+        return self.open_options
+
+    @staticmethod
+    def _connect(queue_manager: str, channel: str, conn_info: str, csp):
+        """
+        Connect to the IBM MQ queue manager.
+
+        Connection errors from the C client are caught and re-raised as a
+        :class:`ConnectionError` with a human-readable message.
+
+        :return: IBM MQ connection object
+        """
+        import ibmmq
+
+        try:
+            return ibmmq.connect(queue_manager, channel, conn_info, csp=csp)
+        except (ibmmq.MQMIError, ibmmq.PYIFError) as e:
+            raise ConnectionError(
+                f"Failed to connect to IBM MQ queue manager '{queue_manager}' "
+                f"at {conn_info} on channel '{channel}': {e}"
+            ) from e
+
+    @contextmanager
+    def get_conn(self, connection: Connection | None = None):
+        """
+        Sync context manager for IBM MQ connection lifecycle.
+
+        Must be called from the executor thread (not the event loop thread).
+        Retrieves the Airflow connection (or uses the explicitly supplied one),
+        extracts MQ parameters, and manages the IBM MQ connection lifecycle.
+
+        :param connection: Optional Airflow connection object. When omitted,
+            the connection is resolved from ``self.conn_id``.
+        :yield: IBM MQ connection object
+        """
+        import ibmmq
+
+        connection = connection or BaseHook.get_connection(self.conn_id)
+        config = connection.extra_dejson
+        queue_manager = self.queue_manager or config.get("queue_manager")
+        channel = self.channel or config.get("channel")
+
+        if not queue_manager:
+            raise ValueError("queue_manager must be set in Connection extra 
config or hook init")
+        if not channel:
+            raise ValueError("channel must be set in Connection extra config 
or hook init")
+
+        if self.open_options is None:
+            self.open_options = getattr(

Review Comment:
   The `get_conn` open_options resolution still has the side-effect mutation 
flagged in the previous review (writing to `self.open_options` makes the hook 
non-idempotent across `get_conn` calls), and the `getattr(ibmmq.CMQC, 
config.get("open_options", self.default_open_options), 
ibmmq.CMQC.MQOO_INPUT_SHARED)` call swallows three failure modes silently:
   
   1. **Integer in JSON extra** (`"open_options": 2`) -> `TypeError: attribute 
name must be string, not 'int'`. The `get_open_options_flags` docstring uses 
ints, so users will reasonably try this.
   2. **Pipe-delimited symbol** (`"MQOO_INPUT_SHARED | 
MQOO_FAIL_IF_QUIESCING"`) -> silently falls back to `MQOO_INPUT_SHARED`, 
dropping `FAIL_IF_QUIESCING`. The URI parser in `queues/mq.py` accepts this 
format, so users will expect the connection-extra path to match.
   3. **Typo'd symbol** -> silent fallback to `MQOO_INPUT_SHARED`. No log, no 
error.
   
   The most dangerous case: a user who configures `MQOO_INPUT_EXCLUSIVE` with a 
typo silently gets `MQOO_INPUT_SHARED`, allowing multiple consumers to race. 
Suggest extracting the parser from `queues/mq.py:trigger_kwargs` into a shared 
helper that handles int, single symbol, and pipe/comma combinations, and raises 
on unknown tokens. Same fix also gets rid of the side-effect mutation by 
resolving into a local variable.



##########
providers/ibm/mq/pyproject.toml:
##########
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN!
+
+# IF YOU WANT TO MODIFY THIS FILE EXCEPT DEPENDENCIES, YOU SHOULD MODIFY THE 
TEMPLATE
+# `pyproject_TEMPLATE.toml.jinja2` IN the 
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
+[build-system]
+requires = ["flit_core==3.12.0"]
+build-backend = "flit_core.buildapi"
+
+[project]
+name = "apache-airflow-providers-ibm-mq"
+version = "0.1.0"
+description = "Provider package apache-airflow-providers-ibm-mq for Apache 
Airflow"
+readme = "README.rst"
+license = "Apache-2.0"
+license-files = ['LICENSE', 'NOTICE']
+authors = [
+    {name="Apache Software Foundation", email="[email protected]"},
+]
+maintainers = [
+    {name="Apache Software Foundation", email="[email protected]"},
+]
+keywords = [ "airflow-provider", "ibm.mq", "airflow", "integration" ]
+classifiers = [
+    "Development Status :: 5 - Production/Stable",
+    "Environment :: Console",
+    "Environment :: Web Environment",
+    "Intended Audience :: Developers",
+    "Intended Audience :: System Administrators",
+    "Framework :: Apache Airflow",
+    "Framework :: Apache Airflow :: Provider",
+    "Programming Language :: Python :: 3.10",
+    "Programming Language :: Python :: 3.11",
+    "Programming Language :: Python :: 3.12",
+    "Programming Language :: Python :: 3.13",
+    "Programming Language :: Python :: 3.14",
+    "Topic :: System :: Monitoring",
+]
+requires-python = ">=3.10"
+
+# The dependencies should be modified in place in the generated file.
+# Any change in the dependencies is preserved when the file is regenerated
+# Make sure to run ``prek update-providers-dependencies --all-files``
+# After you modify the dependencies, and rebuild your Breeze CI image with 
``breeze ci-image build``
+dependencies = [

Review Comment:
   The base `dependencies` list doesn't include 
`apache-airflow-providers-common-messaging`, but `queues/mq.py:24` imports 
`BaseMessageQueueProvider` unconditionally at module top. `providers_manager` 
swallows the resulting `ImportError` in `_correctness_check`, so users who 
install `apache-airflow-providers-ibm-mq` without the `[common.messaging]` 
extra get a silently-disabled queue provider with no error. The README (lines 
69-72) also lists `common-messaging>=2.0.0` as required. Either add it to base 
`dependencies` (and regenerate the README) or make the import lazy in 
`queues/mq.py` and raise `AirflowOptionalProviderFeatureException` on missing.



##########
providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py:
##########
@@ -0,0 +1,569 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import json
+import threading
+from contextlib import contextmanager, suppress
+from typing import TYPE_CHECKING, Any
+
+from asgiref.sync import sync_to_async
+
+from airflow.providers.common.compat.connection import get_async_connection
+from airflow.providers.common.compat.sdk import BaseHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from airflow.providers.common.compat.sdk import Connection
+
+# Backoff parameters for transient consume failures
+_BACKOFF_BASE: float = 1.0
+_BACKOFF_MAX: float = 60.0
+_BACKOFF_FACTOR: float = 2.0
+_TRANSIENT_REASON_NAMES = frozenset(
+    {
+        "MQRC_CONNECTION_BROKEN",
+        "MQRC_Q_MGR_QUIESCING",
+        "MQRC_Q_MGR_NOT_AVAILABLE",
+        "MQRC_HOST_NOT_AVAILABLE",
+        "MQRC_CONNECTION_QUIESCING",
+    }
+)
+
+
+class IBMMQError(Exception):
+    """
+    Lightweight wrapper for IBM MQ errors raised by the consumer thread.
+
+    This allows the async event-loop code in :meth:`IBMMQHook.aconsume` to
+    handle MQ errors **without importing the heavy ibmmq C extension** on the
+    event-loop thread.
+
+    :param reason: The integer MQ reason code (e.g. 
``MQRC_CONNECTION_BROKEN``).
+    :param comp: The integer MQ completion code.
+    :param transient: Whether this error is considered transient (eligible for 
retry).
+    :param message: Human-readable description of the error.
+    """
+
+    def __init__(self, reason: int, comp: int, transient: bool, message: str):
+        super().__init__(message)
+        self.reason = reason
+        self.comp = comp
+        self.transient = transient
+
+
+class IBMMQConsumer(threading.Thread, LoggingMixin):
+    """
+    Thread worker that consumes one message from an IBM MQ queue.
+
+    The consumer is used by :meth:`IBMMQHook.aconsume` to execute blocking MQ
+    calls in a dedicated thread because the IBM MQ C client requires handles to
+    be used from the thread that created them. The result (or exception) is
+    forwarded to the asyncio event loop through ``future``.
+
+    :param hook: Hook instance that provides connection and queue helpers.
+    :param connection: Airflow connection object resolved from 
``hook.conn_id``.
+    :param queue_name: Queue to consume from.
+    :param poll_interval: Maximum wait time (seconds) for each ``q.get`` call.
+    :param loop: Event loop that owns ``future``.
+    :param future: Future completed with the decoded message or an exception.
+    :param stop_event: Signal used to stop polling after cancellation.
+    """
+
+    def __init__(
+        self,
+        hook: IBMMQHook,
+        connection: Connection,
+        queue_name: str,
+        poll_interval: float,
+        loop: asyncio.AbstractEventLoop,
+        future: asyncio.Future,
+        stop_event: threading.Event,
+    ):
+        super().__init__(daemon=True)
+        self.hook = hook
+        self.connection = connection
+        self.queue_name = queue_name
+        self.poll_interval = poll_interval
+        self.loop = loop
+        self.future = future
+        self.stop_event = stop_event
+
+    def _process_message(self, message: bytes) -> str:
+        """
+        Process a raw MQ message.
+
+        If the message contains an RFH2 header, the header is unpacked and the
+        payload following the header is returned. If unpacking fails, the raw
+        message is returned decoded as UTF-8.
+
+        :param message: Raw message received from IBM MQ.
+        :return: Decoded message payload.
+        """
+        import ibmmq
+
+        try:
+            rfh2 = ibmmq.RFH2()
+            rfh2.unpack(message)
+
+            payload_offset = rfh2.get_length()
+            payload = message[payload_offset:]
+
+            decoded = payload.decode("utf-8", errors="ignore")
+            self.log.info("Message received from MQ (RFH2 decoded): %s", 
decoded)
+            return decoded
+        except ibmmq.PYIFError as error:  # RFH2 header not present or unpack 
failed

Review Comment:
   The `except ibmmq.PYIFError` catch is narrow but the rest of 
`_process_message` is doing real work that can raise other things -- 
`rfh2.get_length()` returning a value larger than `len(message)` produces a 
silent empty slice, and any other exception from `unpack` (struct error, 
encoding error) bubbles up and aborts `consume()`. Because the message was 
already removed from the queue (`MQGMO_NO_SYNCPOINT`), one malformed message 
kills the watcher AND loses the message. Either widen the except or add a 
defensive guard around `payload_offset` and slicing.



##########
providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py:
##########
@@ -0,0 +1,569 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import json
+import threading
+from contextlib import contextmanager, suppress
+from typing import TYPE_CHECKING, Any
+
+from asgiref.sync import sync_to_async
+
+from airflow.providers.common.compat.connection import get_async_connection
+from airflow.providers.common.compat.sdk import BaseHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from airflow.providers.common.compat.sdk import Connection
+
+# Backoff parameters for transient consume failures
+_BACKOFF_BASE: float = 1.0
+_BACKOFF_MAX: float = 60.0
+_BACKOFF_FACTOR: float = 2.0
+_TRANSIENT_REASON_NAMES = frozenset(
+    {
+        "MQRC_CONNECTION_BROKEN",
+        "MQRC_Q_MGR_QUIESCING",
+        "MQRC_Q_MGR_NOT_AVAILABLE",
+        "MQRC_HOST_NOT_AVAILABLE",
+        "MQRC_CONNECTION_QUIESCING",
+    }
+)
+
+
+class IBMMQError(Exception):
+    """
+    Lightweight wrapper for IBM MQ errors raised by the consumer thread.
+
+    This allows the async event-loop code in :meth:`IBMMQHook.aconsume` to
+    handle MQ errors **without importing the heavy ibmmq C extension** on the
+    event-loop thread.
+
+    :param reason: The integer MQ reason code (e.g. 
``MQRC_CONNECTION_BROKEN``).
+    :param comp: The integer MQ completion code.
+    :param transient: Whether this error is considered transient (eligible for 
retry).
+    :param message: Human-readable description of the error.
+    """
+
+    def __init__(self, reason: int, comp: int, transient: bool, message: str):
+        super().__init__(message)
+        self.reason = reason
+        self.comp = comp
+        self.transient = transient
+
+
+class IBMMQConsumer(threading.Thread, LoggingMixin):
+    """
+    Thread worker that consumes one message from an IBM MQ queue.
+
+    The consumer is used by :meth:`IBMMQHook.aconsume` to execute blocking MQ
+    calls in a dedicated thread because the IBM MQ C client requires handles to
+    be used from the thread that created them. The result (or exception) is
+    forwarded to the asyncio event loop through ``future``.
+
+    :param hook: Hook instance that provides connection and queue helpers.
+    :param connection: Airflow connection object resolved from 
``hook.conn_id``.
+    :param queue_name: Queue to consume from.
+    :param poll_interval: Maximum wait time (seconds) for each ``q.get`` call.
+    :param loop: Event loop that owns ``future``.
+    :param future: Future completed with the decoded message or an exception.
+    :param stop_event: Signal used to stop polling after cancellation.
+    """
+
+    def __init__(
+        self,
+        hook: IBMMQHook,
+        connection: Connection,
+        queue_name: str,
+        poll_interval: float,
+        loop: asyncio.AbstractEventLoop,
+        future: asyncio.Future,
+        stop_event: threading.Event,
+    ):
+        super().__init__(daemon=True)
+        self.hook = hook
+        self.connection = connection
+        self.queue_name = queue_name
+        self.poll_interval = poll_interval
+        self.loop = loop
+        self.future = future
+        self.stop_event = stop_event
+
+    def _process_message(self, message: bytes) -> str:
+        """
+        Process a raw MQ message.
+
+        If the message contains an RFH2 header, the header is unpacked and the
+        payload following the header is returned. If unpacking fails, the raw
+        message is returned decoded as UTF-8.
+
+        :param message: Raw message received from IBM MQ.
+        :return: Decoded message payload.
+        """
+        import ibmmq
+
+        try:
+            rfh2 = ibmmq.RFH2()
+            rfh2.unpack(message)
+
+            payload_offset = rfh2.get_length()
+            payload = message[payload_offset:]
+
+            decoded = payload.decode("utf-8", errors="ignore")
+            self.log.info("Message received from MQ (RFH2 decoded): %s", 
decoded)
+            return decoded
+        except ibmmq.PYIFError as error:  # RFH2 header not present or unpack 
failed
+            self.log.warning(
+                "Failed to unpack RFH2 header (%s). Returning raw message 
payload: %s",
+                error,
+                message,
+            )
+            return message.decode("utf-8", errors="ignore")
+
+    def consume(
+        self,
+        queue_name: str,
+        poll_interval: float,
+        stop_event: threading.Event,
+    ) -> str | None:
+        """
+        Blocking implementation that consumes a single message from the given 
IBM MQ queue.
+
+        All IBM MQ handles (queue manager connection, queue) are created **and 
used** within
+        this method, satisfying the thread-affinity requirement of the IBM MQ 
C client library.
+        The 'stop_event' is checked between 'q.get' calls so the thread 
terminates promptly
+        after the coroutine side is canceled.  Reads are performed with
+        ``MQGMO_NO_SYNCPOINT``, so this method provides at-most-once delivery:
+        once ``q.get`` returns successfully, IBM MQ has already committed the
+        message removal from the queue.
+
+        MQ-specific exceptions are caught and re-raised as :class:`IBMMQError` 
so that
+        the async caller never needs to import the heavy ``ibmmq`` C extension.
+
+        For an asynchronous interface see :meth:`IBMMQHook.aconsume`.
+        """
+        import ibmmq
+
+        transient_reasons = frozenset(
+            getattr(ibmmq.CMQC, name) for name in _TRANSIENT_REASON_NAMES if 
hasattr(ibmmq.CMQC, name)
+        )
+
+        od = ibmmq.OD()
+        od.ObjectName = queue_name
+
+        md = ibmmq.MD()
+        md.Format = ibmmq.CMQC.MQFMT_STRING
+        md.CodedCharSetId = 1208
+        md.Encoding = ibmmq.CMQC.MQENC_NATIVE
+
+        gmo = ibmmq.GMO()
+        gmo.Options = ibmmq.CMQC.MQGMO_WAIT | ibmmq.CMQC.MQGMO_NO_SYNCPOINT | 
ibmmq.CMQC.MQGMO_CONVERT
+        gmo.WaitInterval = int(poll_interval * 1000)
+
+        try:
+            with self.hook.get_conn(connection=self.connection) as conn:
+                q = ibmmq.Queue(conn, od, 
self.hook.get_open_options(queue_name=queue_name))
+                try:
+                    # WaitInterval already blocks for poll_interval seconds 
when no message is
+                    # available, so no additional sleep is needed between 
iterations.
+                    while not stop_event.is_set():
+                        try:
+                            message = q.get(None, md, gmo)
+                            if message:
+                                return self._process_message(message)
+                        except ibmmq.MQMIError as e:
+                            if e.reason == ibmmq.CMQC.MQRC_NO_MSG_AVAILABLE:
+                                self.log.info(
+                                    "No message available on queue '%s' 
(reason=%s)",
+                                    queue_name,
+                                    e.reason,
+                                )
+                                continue
+                            self.log.error(
+                                "IBM MQ error on queue '%s': 
completion_code=%s reason_code=%s (%s)",
+                                queue_name,
+                                e.comp,
+                                e.reason,
+                                e,
+                            )
+                            raise
+                finally:
+                    with suppress(Exception):
+                        q.close()
+        except ibmmq.MQMIError as e:
+            raise IBMMQError(
+                reason=e.reason,
+                comp=e.comp,
+                transient=e.reason in transient_reasons,
+                message=str(e),
+            ) from e
+        except ibmmq.PYIFError as e:
+            raise IBMMQError(
+                reason=0,
+                comp=0,
+                transient=False,
+                message=str(e),
+            ) from e
+        except ConnectionError as e:
+            # _connect() wraps ibmmq.MQMIError as ConnectionError; treat as 
transient
+            # so aconsume retries with backoff instead of killing the trigger.
+            raise IBMMQError(
+                reason=0,
+                comp=0,
+                transient=True,
+                message=str(e),
+            ) from e
+        return None
+
+    def run(self):
+        try:
+            result = self.consume(
+                queue_name=self.queue_name,
+                poll_interval=self.poll_interval,
+                stop_event=self.stop_event,
+            )
+
+            if not self.future.cancelled():
+                self.loop.call_soon_threadsafe(self.future.set_result, result)
+        except Exception as e:
+            if not self.future.cancelled():
+                self.loop.call_soon_threadsafe(self.future.set_exception, e)
+
+
+class IBMMQHook(BaseHook):
+    """
+    Interact with IBM MQ queue managers to consume and produce messages.
+
+    This hook wraps the ``ibmmq`` C client and manages connection
+    lifecycle, queue open/close, and message serialization.  Both synchronous
+    (context-manager) and asynchronous (``consume`` / ``produce``) interfaces
+    are provided.
+
+    The asynchronous consume path intentionally uses ``MQGMO_NO_SYNCPOINT``.
+    That keeps reads non-transactional and therefore provides **at-most-once**
+    delivery semantics for trigger-based consumption: once ``q.get`` returns,
+    IBM MQ has already removed the message from the queue.  If the coroutine is
+    canceled after that point but before Airflow yields a ``TriggerEvent``, the
+    message can be lost.  The stop-event machinery only prevents additional
+    ``q.get`` calls after cancellation; it cannot make the non-transactional
+    get atomic with TriggerEvent emission.
+
+    Connection parameters (host, port, login, password) are read from the
+    Airflow connection identified by *conn_id*.  ``queue_manager``, 
``channel``,
+    and ``open_options`` can be supplied either as constructor arguments or via
+    the connection's *extra* JSON — constructor arguments take precedence.
+
+    :param conn_id: Airflow connection ID for the IBM MQ instance.
+        Defaults to ``"mq_default"``.
+    :param queue_manager: Name of the IBM MQ queue manager to connect to.
+        If not provided, the value is read from the ``queue_manager`` key in
+        the connection's *extra* JSON.
+    :param channel: MQ channel name used for the connection.
+        If not provided, the value is read from the ``channel`` key in the
+        connection's *extra* JSON.
+    :param open_options: Integer bitmask of ``MQOO_*`` open options passed
+        when opening a queue (e.g.,
+        ``ibmmq.CMQC.MQOO_INPUT_SHARED | ibmmq.CMQC.MQOO_FAIL_IF_QUIESCING``).
+        If not provided, the value is resolved from the ``open_options`` key
+        in the connection's *extra* JSON, falling back to
+        ``MQOO_INPUT_SHARED``.
+    """
+
+    conn_name_attr = "conn_id"
+    default_conn_name = "mq_default"
+    conn_type = "ibmmq"
+    hook_name = "IBM MQ"
+    default_open_options = "MQOO_INPUT_SHARED"
+
+    def __init__(
+        self,
+        conn_id: str = default_conn_name,
+        queue_manager: str | None = None,
+        channel: str | None = None,
+        open_options: int | None = None,
+    ):
+        super().__init__()
+        self.conn_id = conn_id
+        self.queue_manager = queue_manager
+        self.channel = channel
+        self.open_options = open_options
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Return custom UI field behaviour for IBM MQ Connection."""
+        return {
+            "hidden_fields": ["schema"],
+            "placeholders": {
+                "host": "mq.example.com",
+                "port": "1414",
+                "login": "app_user",
+                "extra": json.dumps(
+                    {
+                        "queue_manager": "QM1",
+                        "channel": "DEV.APP.SVRCONN",
+                        "open_options": cls.default_open_options,
+                    },
+                    indent=2,
+                ),
+            },
+        }
+
+    @classmethod
+    def get_open_options_flags(cls, open_options: int) -> list[str]:
+        """
+        Return the symbolic MQ open option flags set in a given bitmask.
+
+        Each flag corresponds to a constant in ``ibmmq.CMQC`` that starts with 
``MQOO_``.
+
+        :param open_options: The integer bitmask used when opening an MQ queue
+                             (e.g., ``MQOO_INPUT_EXCLUSIVE | 
MQOO_FAIL_IF_QUIESCING``).
+
+        :return: A list of the names of the MQ open flags that are set in the 
bitmask.
+                 For example, ``['MQOO_INPUT_EXCLUSIVE', 
'MQOO_FAIL_IF_QUIESCING']``.
+
+        Example:
+            >>> open_options = ibmmq.CMQC.MQOO_INPUT_SHARED | 
ibmmq.CMQC.MQOO_FAIL_IF_QUIESCING
+            >>> cls.get_open_options_flags(open_options)
+            ['MQOO_INPUT_SHARED', 'MQOO_FAIL_IF_QUIESCING']
+        """
+        import ibmmq
+
+        return [
+            name
+            for name, value in vars(ibmmq.CMQC).items()
+            if name.startswith("MQOO_") and (open_options & value)
+        ]
+
+    def get_open_options(self, queue_name: str) -> int | None:
+        if self.open_options is not None:
+            flag_names = self.get_open_options_flags(self.open_options)
+            self.log.info(
+                "Opening MQ queue '%s' with open_options=%s (%s)",
+                queue_name,
+                self.open_options,
+                ", ".join(flag_names),
+            )
+        return self.open_options
+
+    @staticmethod
+    def _connect(queue_manager: str, channel: str, conn_info: str, csp):
+        """
+        Connect to the IBM MQ queue manager.
+
+        Connection errors from the C client are caught and re-raised as a
+        :class:`ConnectionError` with a human-readable message.
+
+        :return: IBM MQ connection object
+        """
+        import ibmmq
+
+        try:
+            return ibmmq.connect(queue_manager, channel, conn_info, csp=csp)
+        except (ibmmq.MQMIError, ibmmq.PYIFError) as e:
+            raise ConnectionError(
+                f"Failed to connect to IBM MQ queue manager '{queue_manager}' "
+                f"at {conn_info} on channel '{channel}': {e}"
+            ) from e
+
+    @contextmanager
+    def get_conn(self, connection: Connection | None = None):
+        """
+        Sync context manager for IBM MQ connection lifecycle.
+
+        Must be called from the executor thread (not the event loop thread).
+        Retrieves the Airflow connection (or uses the explicitly supplied one),
+        extracts MQ parameters, and manages the IBM MQ connection lifecycle.
+
+        :param connection: Optional Airflow connection object. When omitted,
+            the connection is resolved from ``self.conn_id``.
+        :yield: IBM MQ connection object
+        """
+        import ibmmq
+
+        connection = connection or BaseHook.get_connection(self.conn_id)
+        config = connection.extra_dejson
+        queue_manager = self.queue_manager or config.get("queue_manager")
+        channel = self.channel or config.get("channel")
+
+        if not queue_manager:
+            raise ValueError("queue_manager must be set in Connection extra 
config or hook init")
+        if not channel:
+            raise ValueError("channel must be set in Connection extra config 
or hook init")
+
+        if self.open_options is None:
+            self.open_options = getattr(
+                ibmmq.CMQC,
+                config.get("open_options", self.default_open_options),
+                ibmmq.CMQC.MQOO_INPUT_SHARED,
+            )
+
+        csp = ibmmq.CSP()
+        csp.CSPUserId = connection.login
+        csp.CSPPassword = connection.password
+
+        conn_info = f"{connection.host}({connection.port})"
+        conn = self._connect(queue_manager, channel, conn_info, csp)
+        try:
+            yield conn
+        finally:
+            with suppress(Exception):
+                conn.disconnect()
+
+    async def aconsume(self, queue_name: str, poll_interval: float = 5) -> str 
| None:
+        """
+        Asynchronous version of :meth:`consume`.
+
+        Wait for a single message from the specified IBM MQ queue and return 
its decoded payload.
+
+        The method retries with exponential back-off whenever the underlying
+        :meth:`consume` returns ``None`` (connection broken, timeout) or raises
+        an unexpected exception, so that an AssetWatcher is never silently 
killed
+        by a transient failure.
+
+        All blocking IBM MQ operations ('connect', 'open', 'get', 'close', 
'disconnect') run in a
+        separate thread via 'sync_to_async' to satisfy the IBM MQ C client's 
thread-affinity
+        requirement — every operation on a connection must happen from the 
thread that created it.
+
+        A :class:`threading.Event` stop signal is passed to the worker so 
that, when this
+        coroutine is canceled (e.g. because the Airflow triggerer reassigns 
the watcher to
+        another pod), the background thread exits cleanly after the current 
'q.get' call
+        times out (at most 'poll_interval' seconds).  This prevents orphaned
+        threads from continuing to poll after cancellation, but it does **not**
+        change the delivery guarantee: :meth:`consume` uses
+        ``MQGMO_NO_SYNCPOINT``, so cancellation after ``q.get`` returns but
+        before the trigger yields an event can still lose that message.
+
+        :param queue_name: Name of the IBM MQ queue to consume messages from.
+        :param poll_interval: Interval in seconds used to wait for messages 
and to control
+            how long the underlying MQ 'get' operation blocks before checking 
again.
+        :return: The decoded message payload.
+        """
+        connection = await get_async_connection(self.conn_id)
+        backoff = _BACKOFF_BASE
+        while True:
+            loop = asyncio.get_running_loop()
+            future = loop.create_future()
+            stop_event = threading.Event()
+            thread = IBMMQConsumer(
+                hook=self,
+                connection=connection,
+                queue_name=queue_name,
+                poll_interval=poll_interval,
+                loop=loop,
+                future=future,
+                stop_event=stop_event,
+            )
+            thread.start()
+
+            try:
+                result = await future
+
+                if result is not None:
+                    return result
+            except asyncio.CancelledError:
+                stop_event.set()
+                raise
+            except IBMMQError as e:
+                if e.transient:
+                    self.log.warning(
+                        "Transient MQ error on queue '%s': completion_code=%s 
reason_code=%s (%s); retrying in %.1fs",
+                        queue_name,
+                        e.comp,
+                        e.reason,
+                        e,
+                        backoff,
+                    )
+                    await asyncio.sleep(backoff)
+                    backoff = min(backoff * _BACKOFF_FACTOR, _BACKOFF_MAX)
+                    continue
+                self.log.error(
+                    "Permanent MQ error on queue '%s': completion_code=%s 
reason_code=%s (%s) -- not retrying",
+                    queue_name,
+                    e.comp,
+                    e.reason,
+                    e,
+                )
+                raise
+            except Exception:
+                # Programming errors should not be retried
+                self.log.exception(
+                    "Unexpected error in IBM MQ consume for queue '%s' -- not 
retrying",
+                    queue_name,
+                )
+                raise
+            finally:
+                stop_event.set()
+                thread.join(timeout=0)

Review Comment:
   `thread.join(timeout=0)` returns immediately without waiting, so a worker 
thread blocked in `q.get(WaitInterval=poll_interval*1000)` keeps the MQ 
connection open for up to `poll_interval` seconds after `aconsume` returns. On 
`CancelledError` (triggerer reassigning a watcher), the next watcher instance 
creates a fresh connection while the old one is still consuming a server-side 
handle. In churn-heavy fleets (rebalance, scale-down) this piles up handles on 
the queue manager. Suggest `thread.join(timeout=poll_interval + 1)` so the 
thread gets a chance to observe `stop_event` and exit cleanly.



##########
providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py:
##########
@@ -0,0 +1,569 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import json
+import threading
+from contextlib import contextmanager, suppress
+from typing import TYPE_CHECKING, Any
+
+from asgiref.sync import sync_to_async
+
+from airflow.providers.common.compat.connection import get_async_connection
+from airflow.providers.common.compat.sdk import BaseHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from airflow.providers.common.compat.sdk import Connection
+
+# Backoff parameters for transient consume failures
+_BACKOFF_BASE: float = 1.0
+_BACKOFF_MAX: float = 60.0
+_BACKOFF_FACTOR: float = 2.0
+_TRANSIENT_REASON_NAMES = frozenset(
+    {
+        "MQRC_CONNECTION_BROKEN",
+        "MQRC_Q_MGR_QUIESCING",
+        "MQRC_Q_MGR_NOT_AVAILABLE",
+        "MQRC_HOST_NOT_AVAILABLE",
+        "MQRC_CONNECTION_QUIESCING",
+    }
+)
+
+
+class IBMMQError(Exception):
+    """
+    Lightweight wrapper for IBM MQ errors raised by the consumer thread.
+
+    This allows the async event-loop code in :meth:`IBMMQHook.aconsume` to
+    handle MQ errors **without importing the heavy ibmmq C extension** on the
+    event-loop thread.
+
+    :param reason: The integer MQ reason code (e.g. 
``MQRC_CONNECTION_BROKEN``).
+    :param comp: The integer MQ completion code.
+    :param transient: Whether this error is considered transient (eligible for 
retry).
+    :param message: Human-readable description of the error.
+    """
+
+    def __init__(self, reason: int, comp: int, transient: bool, message: str):
+        super().__init__(message)
+        self.reason = reason
+        self.comp = comp
+        self.transient = transient
+
+
+class IBMMQConsumer(threading.Thread, LoggingMixin):
+    """
+    Thread worker that consumes one message from an IBM MQ queue.
+
+    The consumer is used by :meth:`IBMMQHook.aconsume` to execute blocking MQ
+    calls in a dedicated thread because the IBM MQ C client requires handles to
+    be used from the thread that created them. The result (or exception) is
+    forwarded to the asyncio event loop through ``future``.
+
+    :param hook: Hook instance that provides connection and queue helpers.
+    :param connection: Airflow connection object resolved from 
``hook.conn_id``.
+    :param queue_name: Queue to consume from.
+    :param poll_interval: Maximum wait time (seconds) for each ``q.get`` call.
+    :param loop: Event loop that owns ``future``.
+    :param future: Future completed with the decoded message or an exception.
+    :param stop_event: Signal used to stop polling after cancellation.
+    """
+
+    def __init__(
+        self,
+        hook: IBMMQHook,
+        connection: Connection,
+        queue_name: str,
+        poll_interval: float,
+        loop: asyncio.AbstractEventLoop,
+        future: asyncio.Future,
+        stop_event: threading.Event,
+    ):
+        super().__init__(daemon=True)
+        self.hook = hook
+        self.connection = connection
+        self.queue_name = queue_name
+        self.poll_interval = poll_interval
+        self.loop = loop
+        self.future = future
+        self.stop_event = stop_event
+
+    def _process_message(self, message: bytes) -> str:
+        """
+        Process a raw MQ message.
+
+        If the message contains an RFH2 header, the header is unpacked and the
+        payload following the header is returned. If unpacking fails, the raw
+        message is returned decoded as UTF-8.
+
+        :param message: Raw message received from IBM MQ.
+        :return: Decoded message payload.
+        """
+        import ibmmq
+
+        try:
+            rfh2 = ibmmq.RFH2()
+            rfh2.unpack(message)
+
+            payload_offset = rfh2.get_length()
+            payload = message[payload_offset:]
+
+            decoded = payload.decode("utf-8", errors="ignore")
+            self.log.info("Message received from MQ (RFH2 decoded): %s", 
decoded)
+            return decoded
+        except ibmmq.PYIFError as error:  # RFH2 header not present or unpack 
failed
+            self.log.warning(
+                "Failed to unpack RFH2 header (%s). Returning raw message 
payload: %s",
+                error,
+                message,
+            )
+            return message.decode("utf-8", errors="ignore")
+
+    def consume(
+        self,
+        queue_name: str,
+        poll_interval: float,
+        stop_event: threading.Event,
+    ) -> str | None:
+        """
+        Blocking implementation that consumes a single message from the given 
IBM MQ queue.
+
+        All IBM MQ handles (queue manager connection, queue) are created **and 
used** within
+        this method, satisfying the thread-affinity requirement of the IBM MQ 
C client library.
+        The 'stop_event' is checked between 'q.get' calls so the thread 
terminates promptly
+        after the coroutine side is canceled.  Reads are performed with
+        ``MQGMO_NO_SYNCPOINT``, so this method provides at-most-once delivery:
+        once ``q.get`` returns successfully, IBM MQ has already committed the
+        message removal from the queue.
+
+        MQ-specific exceptions are caught and re-raised as :class:`IBMMQError` 
so that
+        the async caller never needs to import the heavy ``ibmmq`` C extension.
+
+        For an asynchronous interface see :meth:`IBMMQHook.aconsume`.
+        """
+        import ibmmq
+
+        transient_reasons = frozenset(
+            getattr(ibmmq.CMQC, name) for name in _TRANSIENT_REASON_NAMES if 
hasattr(ibmmq.CMQC, name)
+        )
+
+        od = ibmmq.OD()
+        od.ObjectName = queue_name
+
+        md = ibmmq.MD()
+        md.Format = ibmmq.CMQC.MQFMT_STRING
+        md.CodedCharSetId = 1208
+        md.Encoding = ibmmq.CMQC.MQENC_NATIVE
+
+        gmo = ibmmq.GMO()
+        gmo.Options = ibmmq.CMQC.MQGMO_WAIT | ibmmq.CMQC.MQGMO_NO_SYNCPOINT | 
ibmmq.CMQC.MQGMO_CONVERT
+        gmo.WaitInterval = int(poll_interval * 1000)
+
+        try:
+            with self.hook.get_conn(connection=self.connection) as conn:
+                q = ibmmq.Queue(conn, od, 
self.hook.get_open_options(queue_name=queue_name))
+                try:
+                    # WaitInterval already blocks for poll_interval seconds 
when no message is
+                    # available, so no additional sleep is needed between 
iterations.
+                    while not stop_event.is_set():
+                        try:
+                            message = q.get(None, md, gmo)
+                            if message:
+                                return self._process_message(message)
+                        except ibmmq.MQMIError as e:
+                            if e.reason == ibmmq.CMQC.MQRC_NO_MSG_AVAILABLE:
+                                self.log.info(
+                                    "No message available on queue '%s' 
(reason=%s)",
+                                    queue_name,
+                                    e.reason,
+                                )
+                                continue
+                            self.log.error(
+                                "IBM MQ error on queue '%s': 
completion_code=%s reason_code=%s (%s)",
+                                queue_name,
+                                e.comp,
+                                e.reason,
+                                e,
+                            )
+                            raise
+                finally:
+                    with suppress(Exception):
+                        q.close()
+        except ibmmq.MQMIError as e:
+            raise IBMMQError(
+                reason=e.reason,
+                comp=e.comp,
+                transient=e.reason in transient_reasons,
+                message=str(e),
+            ) from e
+        except ibmmq.PYIFError as e:
+            raise IBMMQError(
+                reason=0,

Review Comment:
   The `PYIFError` and `ConnectionError` branches wrap into 
`IBMMQError(reason=0, comp=0, ...)`. Downstream callers that look at `.reason` 
/ `.comp` for classification (logging dashboards, Sentry tags) see 0/0 and 
can't distinguish a real "reason code 0" from "this was a non-MQMIError 
wrapped". Either extract reason/comp from the underlying exception where 
available, or use a sentinel like `-1` and document it.



##########
providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py:
##########
@@ -0,0 +1,569 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import json
+import threading
+from contextlib import contextmanager, suppress
+from typing import TYPE_CHECKING, Any
+
+from asgiref.sync import sync_to_async
+
+from airflow.providers.common.compat.connection import get_async_connection
+from airflow.providers.common.compat.sdk import BaseHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from airflow.providers.common.compat.sdk import Connection
+
+# Backoff parameters for transient consume failures
+_BACKOFF_BASE: float = 1.0
+_BACKOFF_MAX: float = 60.0
+_BACKOFF_FACTOR: float = 2.0
+_TRANSIENT_REASON_NAMES = frozenset(
+    {
+        "MQRC_CONNECTION_BROKEN",
+        "MQRC_Q_MGR_QUIESCING",
+        "MQRC_Q_MGR_NOT_AVAILABLE",
+        "MQRC_HOST_NOT_AVAILABLE",
+        "MQRC_CONNECTION_QUIESCING",
+    }
+)
+
+
+class IBMMQError(Exception):
+    """
+    Lightweight wrapper for IBM MQ errors raised by the consumer thread.
+
+    This allows the async event-loop code in :meth:`IBMMQHook.aconsume` to
+    handle MQ errors **without importing the heavy ibmmq C extension** on the
+    event-loop thread.
+
+    :param reason: The integer MQ reason code (e.g. 
``MQRC_CONNECTION_BROKEN``).
+    :param comp: The integer MQ completion code.
+    :param transient: Whether this error is considered transient (eligible for 
retry).
+    :param message: Human-readable description of the error.
+    """
+
+    def __init__(self, reason: int, comp: int, transient: bool, message: str):
+        super().__init__(message)
+        self.reason = reason
+        self.comp = comp
+        self.transient = transient
+
+
+class IBMMQConsumer(threading.Thread, LoggingMixin):
+    """
+    Thread worker that consumes one message from an IBM MQ queue.
+
+    The consumer is used by :meth:`IBMMQHook.aconsume` to execute blocking MQ
+    calls in a dedicated thread because the IBM MQ C client requires handles to
+    be used from the thread that created them. The result (or exception) is
+    forwarded to the asyncio event loop through ``future``.
+
+    :param hook: Hook instance that provides connection and queue helpers.
+    :param connection: Airflow connection object resolved from 
``hook.conn_id``.
+    :param queue_name: Queue to consume from.
+    :param poll_interval: Maximum wait time (seconds) for each ``q.get`` call.
+    :param loop: Event loop that owns ``future``.
+    :param future: Future completed with the decoded message or an exception.
+    :param stop_event: Signal used to stop polling after cancellation.
+    """
+
+    def __init__(
+        self,
+        hook: IBMMQHook,
+        connection: Connection,
+        queue_name: str,
+        poll_interval: float,
+        loop: asyncio.AbstractEventLoop,
+        future: asyncio.Future,
+        stop_event: threading.Event,
+    ):
+        super().__init__(daemon=True)
+        self.hook = hook
+        self.connection = connection
+        self.queue_name = queue_name
+        self.poll_interval = poll_interval
+        self.loop = loop
+        self.future = future
+        self.stop_event = stop_event
+
+    def _process_message(self, message: bytes) -> str:
+        """
+        Process a raw MQ message.
+
+        If the message contains an RFH2 header, the header is unpacked and the
+        payload following the header is returned. If unpacking fails, the raw
+        message is returned decoded as UTF-8.
+
+        :param message: Raw message received from IBM MQ.
+        :return: Decoded message payload.
+        """
+        import ibmmq
+
+        try:
+            rfh2 = ibmmq.RFH2()
+            rfh2.unpack(message)
+
+            payload_offset = rfh2.get_length()
+            payload = message[payload_offset:]
+
+            decoded = payload.decode("utf-8", errors="ignore")
+            self.log.info("Message received from MQ (RFH2 decoded): %s", 
decoded)
+            return decoded
+        except ibmmq.PYIFError as error:  # RFH2 header not present or unpack 
failed
+            self.log.warning(
+                "Failed to unpack RFH2 header (%s). Returning raw message 
payload: %s",
+                error,
+                message,
+            )
+            return message.decode("utf-8", errors="ignore")
+
+    def consume(
+        self,
+        queue_name: str,
+        poll_interval: float,
+        stop_event: threading.Event,
+    ) -> str | None:
+        """
+        Blocking implementation that consumes a single message from the given 
IBM MQ queue.
+
+        All IBM MQ handles (queue manager connection, queue) are created **and 
used** within
+        this method, satisfying the thread-affinity requirement of the IBM MQ 
C client library.
+        The 'stop_event' is checked between 'q.get' calls so the thread 
terminates promptly
+        after the coroutine side is canceled.  Reads are performed with
+        ``MQGMO_NO_SYNCPOINT``, so this method provides at-most-once delivery:
+        once ``q.get`` returns successfully, IBM MQ has already committed the
+        message removal from the queue.
+
+        MQ-specific exceptions are caught and re-raised as :class:`IBMMQError` 
so that
+        the async caller never needs to import the heavy ``ibmmq`` C extension.
+
+        For an asynchronous interface see :meth:`IBMMQHook.aconsume`.
+        """
+        import ibmmq
+
+        transient_reasons = frozenset(
+            getattr(ibmmq.CMQC, name) for name in _TRANSIENT_REASON_NAMES if 
hasattr(ibmmq.CMQC, name)
+        )
+
+        od = ibmmq.OD()
+        od.ObjectName = queue_name
+
+        md = ibmmq.MD()
+        md.Format = ibmmq.CMQC.MQFMT_STRING
+        md.CodedCharSetId = 1208
+        md.Encoding = ibmmq.CMQC.MQENC_NATIVE
+
+        gmo = ibmmq.GMO()
+        gmo.Options = ibmmq.CMQC.MQGMO_WAIT | ibmmq.CMQC.MQGMO_NO_SYNCPOINT | 
ibmmq.CMQC.MQGMO_CONVERT
+        gmo.WaitInterval = int(poll_interval * 1000)
+
+        try:
+            with self.hook.get_conn(connection=self.connection) as conn:
+                q = ibmmq.Queue(conn, od, 
self.hook.get_open_options(queue_name=queue_name))
+                try:
+                    # WaitInterval already blocks for poll_interval seconds 
when no message is
+                    # available, so no additional sleep is needed between 
iterations.
+                    while not stop_event.is_set():
+                        try:
+                            message = q.get(None, md, gmo)
+                            if message:
+                                return self._process_message(message)
+                        except ibmmq.MQMIError as e:
+                            if e.reason == ibmmq.CMQC.MQRC_NO_MSG_AVAILABLE:
+                                self.log.info(
+                                    "No message available on queue '%s' 
(reason=%s)",
+                                    queue_name,
+                                    e.reason,
+                                )
+                                continue
+                            self.log.error(
+                                "IBM MQ error on queue '%s': 
completion_code=%s reason_code=%s (%s)",
+                                queue_name,
+                                e.comp,
+                                e.reason,
+                                e,
+                            )
+                            raise
+                finally:
+                    with suppress(Exception):
+                        q.close()
+        except ibmmq.MQMIError as e:
+            raise IBMMQError(
+                reason=e.reason,
+                comp=e.comp,
+                transient=e.reason in transient_reasons,
+                message=str(e),
+            ) from e
+        except ibmmq.PYIFError as e:
+            raise IBMMQError(
+                reason=0,
+                comp=0,
+                transient=False,
+                message=str(e),
+            ) from e
+        except ConnectionError as e:
+            # _connect() wraps ibmmq.MQMIError as ConnectionError; treat as 
transient
+            # so aconsume retries with backoff instead of killing the trigger.
+            raise IBMMQError(
+                reason=0,
+                comp=0,
+                transient=True,
+                message=str(e),
+            ) from e
+        return None
+
+    def run(self):
+        try:
+            result = self.consume(
+                queue_name=self.queue_name,
+                poll_interval=self.poll_interval,
+                stop_event=self.stop_event,
+            )
+
+            if not self.future.cancelled():
+                self.loop.call_soon_threadsafe(self.future.set_result, result)
+        except Exception as e:
+            if not self.future.cancelled():
+                self.loop.call_soon_threadsafe(self.future.set_exception, e)
+
+
+class IBMMQHook(BaseHook):
+    """
+    Interact with IBM MQ queue managers to consume and produce messages.
+
+    This hook wraps the ``ibmmq`` C client and manages connection
+    lifecycle, queue open/close, and message serialization.  Both synchronous
+    (context-manager) and asynchronous (``consume`` / ``produce``) interfaces
+    are provided.
+
+    The asynchronous consume path intentionally uses ``MQGMO_NO_SYNCPOINT``.
+    That keeps reads non-transactional and therefore provides **at-most-once**
+    delivery semantics for trigger-based consumption: once ``q.get`` returns,
+    IBM MQ has already removed the message from the queue.  If the coroutine is
+    canceled after that point but before Airflow yields a ``TriggerEvent``, the
+    message can be lost.  The stop-event machinery only prevents additional
+    ``q.get`` calls after cancellation; it cannot make the non-transactional
+    get atomic with TriggerEvent emission.
+
+    Connection parameters (host, port, login, password) are read from the
+    Airflow connection identified by *conn_id*.  ``queue_manager``, 
``channel``,
+    and ``open_options`` can be supplied either as constructor arguments or via
+    the connection's *extra* JSON — constructor arguments take precedence.
+
+    :param conn_id: Airflow connection ID for the IBM MQ instance.
+        Defaults to ``"mq_default"``.
+    :param queue_manager: Name of the IBM MQ queue manager to connect to.
+        If not provided, the value is read from the ``queue_manager`` key in
+        the connection's *extra* JSON.
+    :param channel: MQ channel name used for the connection.
+        If not provided, the value is read from the ``channel`` key in the
+        connection's *extra* JSON.
+    :param open_options: Integer bitmask of ``MQOO_*`` open options passed
+        when opening a queue (e.g.,
+        ``ibmmq.CMQC.MQOO_INPUT_SHARED | ibmmq.CMQC.MQOO_FAIL_IF_QUIESCING``).
+        If not provided, the value is resolved from the ``open_options`` key
+        in the connection's *extra* JSON, falling back to
+        ``MQOO_INPUT_SHARED``.
+    """
+
+    conn_name_attr = "conn_id"
+    default_conn_name = "mq_default"
+    conn_type = "ibmmq"
+    hook_name = "IBM MQ"
+    default_open_options = "MQOO_INPUT_SHARED"
+
+    def __init__(
+        self,
+        conn_id: str = default_conn_name,
+        queue_manager: str | None = None,
+        channel: str | None = None,
+        open_options: int | None = None,
+    ):
+        super().__init__()
+        self.conn_id = conn_id
+        self.queue_manager = queue_manager
+        self.channel = channel
+        self.open_options = open_options
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Return custom UI field behaviour for IBM MQ Connection."""
+        return {
+            "hidden_fields": ["schema"],
+            "placeholders": {
+                "host": "mq.example.com",
+                "port": "1414",
+                "login": "app_user",
+                "extra": json.dumps(
+                    {
+                        "queue_manager": "QM1",
+                        "channel": "DEV.APP.SVRCONN",
+                        "open_options": cls.default_open_options,
+                    },
+                    indent=2,
+                ),
+            },
+        }
+
+    @classmethod
+    def get_open_options_flags(cls, open_options: int) -> list[str]:
+        """
+        Return the symbolic MQ open option flags set in a given bitmask.
+
+        Each flag corresponds to a constant in ``ibmmq.CMQC`` that starts with 
``MQOO_``.
+
+        :param open_options: The integer bitmask used when opening an MQ queue
+                             (e.g., ``MQOO_INPUT_EXCLUSIVE | 
MQOO_FAIL_IF_QUIESCING``).
+
+        :return: A list of the names of the MQ open flags that are set in the 
bitmask.
+                 For example, ``['MQOO_INPUT_EXCLUSIVE', 
'MQOO_FAIL_IF_QUIESCING']``.
+
+        Example:
+            >>> open_options = ibmmq.CMQC.MQOO_INPUT_SHARED | 
ibmmq.CMQC.MQOO_FAIL_IF_QUIESCING
+            >>> cls.get_open_options_flags(open_options)
+            ['MQOO_INPUT_SHARED', 'MQOO_FAIL_IF_QUIESCING']
+        """
+        import ibmmq
+
+        return [
+            name
+            for name, value in vars(ibmmq.CMQC).items()
+            if name.startswith("MQOO_") and (open_options & value)
+        ]
+
+    def get_open_options(self, queue_name: str) -> int | None:
+        if self.open_options is not None:
+            flag_names = self.get_open_options_flags(self.open_options)
+            self.log.info(
+                "Opening MQ queue '%s' with open_options=%s (%s)",
+                queue_name,
+                self.open_options,
+                ", ".join(flag_names),
+            )
+        return self.open_options
+
+    @staticmethod
+    def _connect(queue_manager: str, channel: str, conn_info: str, csp):
+        """
+        Connect to the IBM MQ queue manager.
+
+        Connection errors from the C client are caught and re-raised as a
+        :class:`ConnectionError` with a human-readable message.
+
+        :return: IBM MQ connection object
+        """
+        import ibmmq
+
+        try:
+            return ibmmq.connect(queue_manager, channel, conn_info, csp=csp)
+        except (ibmmq.MQMIError, ibmmq.PYIFError) as e:
+            raise ConnectionError(
+                f"Failed to connect to IBM MQ queue manager '{queue_manager}' "
+                f"at {conn_info} on channel '{channel}': {e}"
+            ) from e
+
+    @contextmanager
+    def get_conn(self, connection: Connection | None = None):
+        """
+        Sync context manager for IBM MQ connection lifecycle.
+
+        Must be called from the executor thread (not the event loop thread).
+        Retrieves the Airflow connection (or uses the explicitly supplied one),
+        extracts MQ parameters, and manages the IBM MQ connection lifecycle.
+
+        :param connection: Optional Airflow connection object. When omitted,
+            the connection is resolved from ``self.conn_id``.
+        :yield: IBM MQ connection object
+        """
+        import ibmmq
+
+        connection = connection or BaseHook.get_connection(self.conn_id)
+        config = connection.extra_dejson
+        queue_manager = self.queue_manager or config.get("queue_manager")
+        channel = self.channel or config.get("channel")
+
+        if not queue_manager:
+            raise ValueError("queue_manager must be set in Connection extra 
config or hook init")
+        if not channel:
+            raise ValueError("channel must be set in Connection extra config 
or hook init")
+
+        if self.open_options is None:
+            self.open_options = getattr(
+                ibmmq.CMQC,
+                config.get("open_options", self.default_open_options),
+                ibmmq.CMQC.MQOO_INPUT_SHARED,
+            )
+
+        csp = ibmmq.CSP()
+        csp.CSPUserId = connection.login
+        csp.CSPPassword = connection.password
+
+        conn_info = f"{connection.host}({connection.port})"
+        conn = self._connect(queue_manager, channel, conn_info, csp)
+        try:
+            yield conn
+        finally:
+            with suppress(Exception):
+                conn.disconnect()
+
+    async def aconsume(self, queue_name: str, poll_interval: float = 5) -> str 
| None:
+        """
+        Asynchronous version of :meth:`consume`.
+
+        Wait for a single message from the specified IBM MQ queue and return 
its decoded payload.
+
+        The method retries with exponential back-off whenever the underlying
+        :meth:`consume` returns ``None`` (connection broken, timeout) or raises
+        an unexpected exception, so that an AssetWatcher is never silently 
killed
+        by a transient failure.
+
+        All blocking IBM MQ operations ('connect', 'open', 'get', 'close', 
'disconnect') run in a
+        separate thread via 'sync_to_async' to satisfy the IBM MQ C client's 
thread-affinity
+        requirement — every operation on a connection must happen from the 
thread that created it.
+
+        A :class:`threading.Event` stop signal is passed to the worker so 
that, when this
+        coroutine is canceled (e.g. because the Airflow triggerer reassigns 
the watcher to
+        another pod), the background thread exits cleanly after the current 
'q.get' call
+        times out (at most 'poll_interval' seconds).  This prevents orphaned
+        threads from continuing to poll after cancellation, but it does **not**
+        change the delivery guarantee: :meth:`consume` uses
+        ``MQGMO_NO_SYNCPOINT``, so cancellation after ``q.get`` returns but
+        before the trigger yields an event can still lose that message.
+
+        :param queue_name: Name of the IBM MQ queue to consume messages from.
+        :param poll_interval: Interval in seconds used to wait for messages 
and to control
+            how long the underlying MQ 'get' operation blocks before checking 
again.
+        :return: The decoded message payload.
+        """
+        connection = await get_async_connection(self.conn_id)
+        backoff = _BACKOFF_BASE
+        while True:
+            loop = asyncio.get_running_loop()
+            future = loop.create_future()
+            stop_event = threading.Event()
+            thread = IBMMQConsumer(
+                hook=self,
+                connection=connection,
+                queue_name=queue_name,
+                poll_interval=poll_interval,
+                loop=loop,
+                future=future,
+                stop_event=stop_event,
+            )
+            thread.start()
+
+            try:
+                result = await future
+
+                if result is not None:
+                    return result
+            except asyncio.CancelledError:
+                stop_event.set()
+                raise
+            except IBMMQError as e:
+                if e.transient:
+                    self.log.warning(
+                        "Transient MQ error on queue '%s': completion_code=%s 
reason_code=%s (%s); retrying in %.1fs",
+                        queue_name,
+                        e.comp,
+                        e.reason,
+                        e,
+                        backoff,
+                    )
+                    await asyncio.sleep(backoff)
+                    backoff = min(backoff * _BACKOFF_FACTOR, _BACKOFF_MAX)
+                    continue
+                self.log.error(
+                    "Permanent MQ error on queue '%s': completion_code=%s 
reason_code=%s (%s) -- not retrying",
+                    queue_name,
+                    e.comp,
+                    e.reason,
+                    e,
+                )
+                raise
+            except Exception:
+                # Programming errors should not be retried
+                self.log.exception(
+                    "Unexpected error in IBM MQ consume for queue '%s' -- not 
retrying",
+                    queue_name,
+                )
+                raise
+            finally:
+                stop_event.set()
+                thread.join(timeout=0)
+
+            self.log.info(
+                "IBM MQ consume returned no event for queue '%s'; queue may be 
quiet. Retrying in %.1fs",
+                queue_name,
+                backoff,
+            )
+            await asyncio.sleep(backoff)
+            backoff = min(backoff * _BACKOFF_FACTOR, _BACKOFF_MAX)
+
+    async def aproduce(self, queue_name: str, payload: str) -> None:
+        """
+        Asynchronous version of :meth:`produce`.
+
+        Put a message onto the specified IBM MQ queue.
+
+        All blocking IBM MQ operations run in a separate thread via 
'sync_to_async' for the same
+        thread-safety reasons as :meth:`aconsume`.
+
+        :param queue_name: Name of the IBM MQ queue to which the message 
should be sent.
+        :param payload: Message payload to send. The payload will be encoded 
as UTF-8
+            before being placed on the queue.
+        :return: None
+        """
+        connection = await get_async_connection(self.conn_id)
+        await sync_to_async(self.produce, thread_sensitive=False)(connection, 
queue_name, payload)
+
+    def produce(
+        self,
+        connection: Connection,
+        queue_name: str,
+        payload: str,
+    ) -> None:
+        """Blocking implementation of :meth:`aproduce`."""
+        import ibmmq
+
+        od = ibmmq.OD()
+        od.ObjectName = queue_name
+
+        md = ibmmq.MD()
+        md.Format = ibmmq.CMQC.MQFMT_STRING
+        md.CodedCharSetId = 1208
+        md.Encoding = ibmmq.CMQC.MQENC_NATIVE
+
+        try:
+            with self.get_conn(connection=connection) as conn:
+                q = ibmmq.Queue(conn, od, ibmmq.CMQC.MQOO_OUTPUT)

Review Comment:
   `produce` hard-codes `ibmmq.CMQC.MQOO_OUTPUT`, ignoring `self.open_options`. 
A producer that wants `MQOO_OUTPUT | MQOO_FAIL_IF_QUIESCING` (sensible default 
for production) has no way to express it. Suggest using the same 
`self.get_open_options(queue_name)` path that `consume` uses, or accept a 
per-call `open_options: int | None = None`.



##########
providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py:
##########
@@ -0,0 +1,569 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import json
+import threading
+from contextlib import contextmanager, suppress
+from typing import TYPE_CHECKING, Any
+
+from asgiref.sync import sync_to_async
+
+from airflow.providers.common.compat.connection import get_async_connection
+from airflow.providers.common.compat.sdk import BaseHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from airflow.providers.common.compat.sdk import Connection
+
+# Backoff parameters for transient consume failures
+_BACKOFF_BASE: float = 1.0
+_BACKOFF_MAX: float = 60.0
+_BACKOFF_FACTOR: float = 2.0
+_TRANSIENT_REASON_NAMES = frozenset(
+    {
+        "MQRC_CONNECTION_BROKEN",
+        "MQRC_Q_MGR_QUIESCING",
+        "MQRC_Q_MGR_NOT_AVAILABLE",
+        "MQRC_HOST_NOT_AVAILABLE",
+        "MQRC_CONNECTION_QUIESCING",
+    }
+)
+
+
+class IBMMQError(Exception):
+    """
+    Lightweight wrapper for IBM MQ errors raised by the consumer thread.
+
+    This allows the async event-loop code in :meth:`IBMMQHook.aconsume` to
+    handle MQ errors **without importing the heavy ibmmq C extension** on the
+    event-loop thread.
+
+    :param reason: The integer MQ reason code (e.g. 
``MQRC_CONNECTION_BROKEN``).
+    :param comp: The integer MQ completion code.
+    :param transient: Whether this error is considered transient (eligible for 
retry).
+    :param message: Human-readable description of the error.
+    """
+
+    def __init__(self, reason: int, comp: int, transient: bool, message: str):
+        super().__init__(message)
+        self.reason = reason
+        self.comp = comp
+        self.transient = transient
+
+
+class IBMMQConsumer(threading.Thread, LoggingMixin):
+    """
+    Thread worker that consumes one message from an IBM MQ queue.
+
+    The consumer is used by :meth:`IBMMQHook.aconsume` to execute blocking MQ
+    calls in a dedicated thread because the IBM MQ C client requires handles to
+    be used from the thread that created them. The result (or exception) is
+    forwarded to the asyncio event loop through ``future``.
+
+    :param hook: Hook instance that provides connection and queue helpers.
+    :param connection: Airflow connection object resolved from 
``hook.conn_id``.
+    :param queue_name: Queue to consume from.
+    :param poll_interval: Maximum wait time (seconds) for each ``q.get`` call.
+    :param loop: Event loop that owns ``future``.
+    :param future: Future completed with the decoded message or an exception.
+    :param stop_event: Signal used to stop polling after cancellation.
+    """
+
+    def __init__(
+        self,
+        hook: IBMMQHook,
+        connection: Connection,
+        queue_name: str,
+        poll_interval: float,
+        loop: asyncio.AbstractEventLoop,
+        future: asyncio.Future,
+        stop_event: threading.Event,
+    ):
+        super().__init__(daemon=True)
+        self.hook = hook
+        self.connection = connection
+        self.queue_name = queue_name
+        self.poll_interval = poll_interval
+        self.loop = loop
+        self.future = future
+        self.stop_event = stop_event
+
+    def _process_message(self, message: bytes) -> str:
+        """
+        Process a raw MQ message.
+
+        If the message contains an RFH2 header, the header is unpacked and the
+        payload following the header is returned. If unpacking fails, the raw
+        message is returned decoded as UTF-8.
+
+        :param message: Raw message received from IBM MQ.
+        :return: Decoded message payload.
+        """
+        import ibmmq

Review Comment:
   `import ibmmq` appears inline in `_process_message`, `consume`, 
`get_open_options_flags`, `_connect`, `get_conn`, and `produce`. The "lazy 
import optional dep" reason is real, but only needs to happen once per module. 
Suggest collapsing to a single guarded module-level import (`try: import ibmmq; 
except ImportError: ibmmq = None`) with a check at first use that raises 
`AirflowOptionalProviderFeatureException` if missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to