This is an automated email from the ASF dual-hosted git repository.

vikramkoka pushed a commit to branch common-msgQ
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8f57de92f27c1626ecf4ffb37c5aee4ccd882179
Author: Vikram Koka <[email protected]>
AuthorDate: Wed Feb 12 10:16:15 2025 -0800

    Draft Common Message Queue
    
    Here is a very early draft PR to introduce and socialize the concept of a 
"common message queue" abstraction similar to the "Common SQL" and "Common IO" 
abstractions in Airflow.
    
    This will be a provider package similar to those and is intended to be an 
abstraction over Apache Kafka, Amazon SQL, and Google PubSub to begin with. It 
can then be expanded to other messaging systems based on community adoption.
    
    The initial goal with this is to provide a simple abstraction for 
integrating Event Driven Scheduling coming with Airflow 3 to message 
notification systems such as Kafka, currently being used to publish data 
availability.
    
    At this stage, this is very much a WIP draft intended to solicit input from 
the community.
---
 providers/common/msgq/README.rst                   |  75 ++++++++++++++
 .../providers/common/msgq/hooks/msg_queue.py       |  81 +++++++++++++++
 .../providers/common/msgq/operators/msg_queue.py   | 109 +++++++++++++++++++++
 .../providers/common/msgq/sensors/msg_queue.py     |   0
 4 files changed, 265 insertions(+)

diff --git a/providers/common/msgq/README.rst b/providers/common/msgq/README.rst
new file mode 100644
index 00000000000..02e764c2bef
--- /dev/null
+++ b/providers/common/msgq/README.rst
@@ -0,0 +1,75 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+ .. NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN!
+
+ .. IF YOU WANT TO MODIFY TEMPLATE FOR THIS FILE, YOU SHOULD MODIFY THE 
TEMPLATE
+    `PROVIDER_README_TEMPLATE.rst.jinja2` IN the 
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
+
+
+Package ``apache-airflow-providers-common-msgq``
+
+Release: ``0.1.0``
+
+
+
+Provider package
+----------------
+
+This is a provider package for ``common.msgq`` provider. All classes for this 
provider package
+are in ``airflow.providers.common.msgq`` python package.
+
+This provider package is intended to serve as a common abstraction on top of 
popular message queue 
+providers such as Apache Kafka, Amazon SQS, and Google PubSub. 
+
+The expectation is that a common provider abstraction for message queues is 
especially useful for 
+Event Driven Scheduling, which is being introduced as part of Airflow 3.0. 
Based on conversations 
+with users of Apache Airflow, these Events for publishing of Data Assets are 
very often broadcast 
+over a publish and subscribe mechanism. The underlying technology used for 
this publish and subscribe
+mechanism varies by environment, but Apache Kafka, Amazon SQS, and Google 
PubSub are commonly used. 
+
+
+.. You can find package information and changelog for the provider
+.. in the `documentation 
<https://airflow.apache.org/docs/apache-airflow-providers-common-msgq/0.1.0/>`_.
+
+Installation
+------------
+
+You can install this package on top of an existing Airflow 3 installation (see 
``Requirements`` below
+for the minimum Airflow version supported) via
+``pip install apache-airflow-providers-common-msgq``
+
+The package supports the following python versions: 3.10,3.11,3.12
+
+Requirements
+------------
+
+==================  ==================
+PIP package         Version required
+==================  ==================
+``apache-airflow``  ``>=3.0.0``
+==================  ==================
+
+Cross provider package dependencies
+-----------------------------------
+
+None at this time
+
+
+The changelog for the provider package can be found in the
+`changelog 
<https://airflow.apache.org/docs/apache-airflow-providers-common-msgq/0.1.0/changelog.html>`_.
diff --git 
a/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py 
b/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py
new file mode 100644
index 00000000000..76a57e3aaee
--- /dev/null
+++ b/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#
+# Centralized handling of the connection mechanism for all Message Queues 
+# 
+
+from airflow.configuration import conf
+from airflow.exceptions import (
+    AirflowException,
+    AirflowOptionalProviderFeatureException,
+)
+from airflow.hooks.base import BaseHook
+
+
+class MsgQueueHook(BaseHook): 
+    """
+    Abstract base class for all Message Queue Hooks. 
+    """
+
+    # Typical parameters below
+
+    # Override to provide connection name
+    conn_name_attr: str
+    # Override to have a default connection id for a particular msg queue
+    default_conn_name = "default_conn_id"
+    # Connection type - for types of message queues
+    conn_type = "kafka"
+    # Hook name
+    hook_name = "Apache Kafka"
+
+    def __init__(self, *args, **kwargs):
+        super().__init__()
+        if not self.conn_name_attr:
+            raise AirflowException("conn_name_attr is not defined")
+        elif len(args) == 1:
+            setattr(self, self.conn_name_attr, args[0])
+        elif self.conn_name_attr not in kwargs:
+            setattr(self, self.conn_name_attr, self.default_conn_name)
+        else:
+            setattr(self, self.conn_name_attr, kwargs[self.conn_name_attr])
+
+    def get_conn_id(self) -> str:
+        return getattr(self, self.conn_name_attr)
+
+
+    def get_conn(self) -> Any:
+        """Return a connection object."""
+        queue = self.connection
+        if self.connector is None:
+            raise RuntimeError(f"{type(self).__name__} didn't have 
`self.connector` set!")
+        return self.connector.connect(host=queue.host, port=queue.port, 
username=queue.login)
+
+
+class MsgQueueConsumerHook(MsgQueueHook):
+    """
+    Abstract base class hook for creating a message queue consumer. 
+    """
+
+    :param connection configuration information, default to BaseHook 
configuration
+    :param topics: A list of topics to subscribe to on the message queue
+
+    def __init__(self, topics: Sequence[str], 
config_id=MsgQueueHook.default_conn_name) -> None:
+        super().__init__(config_id=config_id)
+        self.topics = topics
+
+
diff --git 
a/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py
 
b/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py
new file mode 100644
index 00000000000..a5481c215c3
--- /dev/null
+++ 
b/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException, AirflowFailException
+from airflow.hooks.base import BaseHook
+from airflow.models import BaseOperator
+
+_PROVIDERS_MATCHER = re.compile(r"airflow\.providers\.(.*?)\.hooks.*")
+
+_MIN_SUPPORTED_PROVIDERS_VERSION = {
+    "amazon": "4.1.0",
+    "apache.kafka": "2.1.0",
+    "google": "8.2.0",
+}
+
+class BaseMsgQueueOperator(BaseOperator):
+    """
+    This is a base class for the generic Message Queue Operator to get a Queue 
Hook.
+
+    The provided method is .get_queue_hook(). The default behavior will try to
+    retrieve the Queue hook based on connection type.
+    You can customize the behavior by overriding the .get_queue_hook() method.
+
+    :param conn_id: reference to a specific message queue providers
+    """
+
+    conn_id_field = "conn_id"
+
+    template_fields: Sequence[str] = ("conn_id", "message_queue", 
"hook_params")
+
+    def __init__(
+        self,
+        *,
+        conn_id: str | None = None,
+        message_queue: str | None = None,
+        hook_params: dict | None = None,
+        retry_on_failure: bool = True,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.conn_id = conn_id
+        self.message_queue = message_queue
+        self.hook_params = hook_params or {}
+        self.retry_on_failure = retry_on_failure
+
+    @classmethod
+    # TODO: can be removed once Airflow min version for this provider is 3.0.0 
or higher
+    def get_hook(cls, conn_id: str, hook_params: dict | None = None) -> 
BaseHook:
+        """
+        Return default hook for this connection id.
+
+        :param conn_id: connection id
+        :param hook_params: hook parameters
+        :return: default hook for this connection
+        """
+        connection = BaseHook.get_connection(conn_id)
+        return connection.get_hook(hook_params=hook_params)
+
+    @cached_property
+    def _hook(self):
+        """Get MsgQueue Hook based on connection type."""
+        conn_id = getattr(self, self.conn_id_field)
+        self.log.debug("Get connection for %s", conn_id)
+        hook = self.get_hook(conn_id=conn_id, hook_params=self.hook_params)
+        if not isinstance(hook, DbApiHook):
+            raise AirflowException(
+                f"You are trying to use `common-msgQ` with 
{hook.__class__.__name__},"
+                " but its provider does not support it. Please upgrade the 
provider to a version that"
+                " supports `common-msgQ`. The hook class should be a subclass 
of"
+                " `airflow.providers.common.msgq.hooks.msq_queue.DbApiHook`."
+                f" Got {hook.__class__.__name__} Hook with class hierarchy: 
{hook.__class__.mro()}"
+            )
+
+        if self.message_queue:
+            if hook.conn_type == "kafka":
+                hook.message_queue = self.message_queue
+            else:
+                hook.schema = self.message_queue
+
+        return hook
+
+    def get_db_hook(self) -> DbApiHook:
+        """
+        Get the message_queue hook for the connection.
+
+        :return: the message_queue hook object.
+        """
+        return self._hook
+
+    def _raise_exception(self, exception_string: str) -> NoReturn:
+        if self.retry_on_failure:
+            raise AirflowException(exception_string)
+        raise AirflowFailException(exception_string)
+
diff --git 
a/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py 
b/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py
new file mode 100644
index 00000000000..e69de29bb2d

Reply via email to