This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a9d9736863b AIP-82: implement Google Pub/Sub message queue provider
(#54494)
a9d9736863b is described below
commit a9d9736863b45f6d37398605e23ea712bef2628a
Author: Deji Ibrahim <[email protected]>
AuthorDate: Mon Oct 6 10:43:35 2025 +0100
AIP-82: implement Google Pub/Sub message queue provider (#54494)
* AIP-82: implement Google Pub/Sub message queue provider
* fix: mypy checks
* fix: spell checks
* fix: tests
* switch to scheme based configuration
* add system tests, update docs
---
dev/breeze/tests/test_selective_checks.py | 8 +-
providers/google/docs/index.rst | 1 +
providers/google/docs/message-queues/index.rst | 70 ++++++++++++++++
providers/google/provider.yaml | 3 +
providers/google/pyproject.toml | 4 +
.../providers/google/cloud/queues/__init__.py | 16 ++++
.../providers/google/cloud/queues/pubsub.py | 68 ++++++++++++++++
.../airflow/providers/google/get_provider_info.py | 1 +
.../pubsub/example_pubsub_message_queue_trigger.py | 93 ++++++++++++++++++++++
.../tests/unit/google/cloud/queues/__init__.py | 16 ++++
.../tests/unit/google/cloud/queues/test_pubsub.py | 45 +++++++++++
11 files changed, 321 insertions(+), 4 deletions(-)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 5c00a026921..f0253231279 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -1847,7 +1847,7 @@ def test_expected_output_push(
),
{
"selected-providers-list-as-string": "amazon apache.beam
apache.cassandra apache.kafka "
- "cncf.kubernetes common.compat common.sql "
+ "cncf.kubernetes common.compat common.messaging common.sql "
"facebook google hashicorp http microsoft.azure
microsoft.mssql mysql "
"openlineage oracle postgres presto salesforce samba sftp ssh
trino",
"all-python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -1859,7 +1859,7 @@ def test_expected_output_push(
"skip-providers-tests": "false",
"docs-build": "true",
"docs-list-as-string": "apache-airflow helm-chart amazon
apache.beam apache.cassandra "
- "apache.kafka cncf.kubernetes common.compat common.sql
facebook google hashicorp http microsoft.azure "
+ "apache.kafka cncf.kubernetes common.compat common.messaging
common.sql facebook google hashicorp http microsoft.azure "
"microsoft.mssql mysql openlineage oracle postgres "
"presto salesforce samba sftp ssh trino",
"skip-prek-hooks": ALL_SKIPPED_COMMITS_IF_NO_UI,
@@ -1873,7 +1873,7 @@ def test_expected_output_push(
{
"description": "amazon...google",
"test_types": "Providers[amazon]
Providers[apache.beam,apache.cassandra,"
-
"apache.kafka,cncf.kubernetes,common.compat,common.sql,facebook,"
+
"apache.kafka,cncf.kubernetes,common.compat,common.messaging,common.sql,facebook,"
"hashicorp,http,microsoft.azure,microsoft.mssql,mysql,"
"openlineage,oracle,postgres,presto,salesforce,samba,sftp,ssh,trino] "
"Providers[google]",
@@ -2117,7 +2117,7 @@ def test_upgrade_to_newer_dependencies(
("providers/google/docs/some_file.rst",),
{
"docs-list-as-string": "amazon apache.beam apache.cassandra
apache.kafka "
- "cncf.kubernetes common.compat common.sql facebook google
hashicorp http "
+ "cncf.kubernetes common.compat common.messaging common.sql
facebook google hashicorp http "
"microsoft.azure microsoft.mssql mysql openlineage oracle "
"postgres presto salesforce samba sftp ssh trino",
},
diff --git a/providers/google/docs/index.rst b/providers/google/docs/index.rst
index 3ca89dba953..24db6cefe81 100644
--- a/providers/google/docs/index.rst
+++ b/providers/google/docs/index.rst
@@ -36,6 +36,7 @@
Connection types <connections/index>
Logging handlers <logging/index>
+ Message queues <message-queues/index>
Secrets backends <secrets-backends/google-cloud-secret-manager-backend>
API Authentication backend <api-auth-backend/google-openid>
Operators <operators/index>
diff --git a/providers/google/docs/message-queues/index.rst
b/providers/google/docs/message-queues/index.rst
new file mode 100644
index 00000000000..dcc30c55713
--- /dev/null
+++ b/providers/google/docs/message-queues/index.rst
@@ -0,0 +1,70 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Google Cloud Messaging Queues
+==============================
+
+.. contents::
+ :local:
+ :depth: 2
+
+Google Cloud Pub/Sub Queue Provider
+------------------------------------
+
+Implemented by
:class:`~airflow.providers.google.cloud.queues.pubsub.PubsubMessageQueueProvider`
+
+The Google Cloud Pub/Sub Queue Provider is a message queue provider that uses
Google Cloud Pub/Sub as the underlying message queue system.
+
+It allows you to send and receive messages using Cloud Pub/Sub in your Airflow
workflows
+with
:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger`
common message queue interface.
+
+.. include:: /../src/airflow/providers/google/cloud/queues/pubsub.py
+ :start-after: [START pubsub_message_queue_provider_description]
+ :end-before: [END pubsub_message_queue_provider_description]
+
+Pub/Sub Message Queue Trigger
+-----------------------------
+
+Implemented by
:class:`~airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger`
+
+Inherited from
:class:`~airflow.providers.common.messaging.triggers.msg_queue.MessageQueueTrigger`
+
+
+Wait for a message in a queue
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Below is an example of how you can configure an Airflow DAG to be triggered by
a message in Pub/Sub.
+
+.. exampleinclude::
/../tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py
+ :language: python
+ :start-after: [START howto_trigger_pubsub_message_queue]
+ :end-before: [END howto_trigger_pubsub_message_queue]
+
+How it works
+------------
+
+1. **Pub/Sub Message Queue Trigger**: The ``PubsubPullTrigger`` listens for
messages from a Google Cloud Pub/Sub subscription.
+
+2. **Asset and Watcher**: The ``Asset`` abstracts the external entity, the
Pub/Sub subscription in this example.
+ The ``AssetWatcher`` associate a trigger with a name. This name helps you
identify which trigger is associated to which
+ asset.
+
+3. **Event-Driven DAG**: Instead of running on a fixed schedule, the DAG
executes when the asset receives an update
+ (e.g., a new message in the queue).
+
+For how to use the trigger, refer to the documentation of the
+:ref:`Messaging Trigger <howto/trigger:MessageQueueTrigger>`
diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml
index fc04622e4a4..6005f302c17 100644
--- a/providers/google/provider.yaml
+++ b/providers/google/provider.yaml
@@ -1257,3 +1257,6 @@ auth-backends:
logging:
- airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler
-
airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler
+
+queues:
+ - airflow.providers.google.cloud.queues.pubsub.PubsubMessageQueueProvider
diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index 0bb82701d12..e5f0e86acef 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -203,6 +203,9 @@ dependencies = [
"http" = [
"apache-airflow-providers-http"
]
+"common.messaging" = [
+ "apache-airflow-providers-common-messaging"
+]
[dependency-groups]
dev = [
@@ -214,6 +217,7 @@ dev = [
"apache-airflow-providers-apache-cassandra",
"apache-airflow-providers-cncf-kubernetes",
"apache-airflow-providers-common-compat",
+ "apache-airflow-providers-common-messaging",
"apache-airflow-providers-common-sql",
"apache-airflow-providers-facebook",
"apache-airflow-providers-http",
diff --git
a/providers/google/src/airflow/providers/google/cloud/queues/__init__.py
b/providers/google/src/airflow/providers/google/cloud/queues/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/google/src/airflow/providers/google/cloud/queues/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/providers/google/src/airflow/providers/google/cloud/queues/pubsub.py
b/providers/google/src/airflow/providers/google/cloud/queues/pubsub.py
new file mode 100644
index 00000000000..884cc4d21c1
--- /dev/null
+++ b/providers/google/src/airflow/providers/google/cloud/queues/pubsub.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.exceptions import AirflowOptionalProviderFeatureException
+from airflow.providers.google.cloud.triggers.pubsub import PubsubPullTrigger
+
+try:
+ from airflow.providers.common.messaging.providers.base_provider import
BaseMessageQueueProvider
+except ImportError:
+ raise AirflowOptionalProviderFeatureException(
+ "This feature requires the 'common.messaging' provider to be installed
in version >= 1.0.1."
+ )
+
+if TYPE_CHECKING:
+ from airflow.triggers.base import BaseEventTrigger
+
+
+class PubsubMessageQueueProvider(BaseMessageQueueProvider):
+ """
+ Configuration for PubSub integration with common-messaging.
+
+ [START pubsub_message_queue_provider_description]
+ * It uses ``google+pubsub`` as the scheme for identifying the provider.
+ * For parameter definitions, take a look at
:class:`~airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger`.
+
+ .. code-block:: python
+
+ from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
+ from airflow.sdk import Asset, AssetWatcher
+
+ trigger = MessageQueueTrigger(
+ scheme="google+pubsub",
+ # Additional PubsubPullTrigger parameters as needed
+ project_id="my_project",
+ subscription="my_subscription",
+ ack_messages=True,
+ max_messages=1,
+ gcp_conn_id="google_cloud_default",
+ poke_interval=60.0,
+ )
+
+ asset = Asset("pubsub_queue_asset",
watchers=[AssetWatcher(name="pubsub_watcher", trigger=trigger)])
+
+ [END pubsub_message_queue_provider_description]
+
+ """
+
+ scheme = "google+pubsub"
+
+ def trigger_class(self) -> type[BaseEventTrigger]:
+ return PubsubPullTrigger # type: ignore[return-value]
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py
b/providers/google/src/airflow/providers/google/get_provider_info.py
index f79dcd19d70..72f747c3627 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -1518,4 +1518,5 @@ def get_provider_info():
"airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
"airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
],
+ "queues":
["airflow.providers.google.cloud.queues.pubsub.PubsubMessageQueueProvider"],
}
diff --git
a/providers/google/tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py
b/providers/google/tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py
new file mode 100644
index 00000000000..e4d92db83ad
--- /dev/null
+++
b/providers/google/tests/system/google/cloud/pubsub/example_pubsub_message_queue_trigger.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates using Google Cloud Pub/Sub with
MessageQueueTrigger
+and Asset Watchers for event-driven workflows.
+
+This example shows how to create a DAG that triggers when messages arrive in a
+Google Cloud Pub/Sub subscription using Asset Watchers.
+
+Prerequisites
+-------------
+
+Before running this example, ensure you have:
+
+1. A GCP project with Pub/Sub API enabled
+2. The following Pub/Sub resources created in your project:
+
+ - Topic: ``test-topic``
+ - Subscription: ``test-subscription``
+
+You can create these resources using:
+
+.. code-block:: bash
+
+ # Create topic
+ gcloud pubsub topics create test-topic --project={PROJECT_ID}
+
+ # Create subscription
+ gcloud pubsub subscriptions create test-subscription \\
+ --topic=test-topic --project={PROJECT_ID}
+
+How to test
+-----------
+
+1. Ensure the Pub/Sub resources exist (see Prerequisites above)
+2. Publish a message to trigger the DAG:
+
+ .. code-block:: bash
+
+ gcloud pubsub topics publish test-topic \\
+ --message="Test message" --project={PROJECT_ID}
+
+3. The DAG will be triggered automatically when the message arrives
+"""
+
+from __future__ import annotations
+
+# [START howto_trigger_pubsub_message_queue]
+from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG, Asset, AssetWatcher
+
+# Define a trigger that listens to a Google Cloud Pub/Sub subscription
+trigger = MessageQueueTrigger(
+ scheme="google+pubsub",
+ project_id="my-project",
+ subscription="test-subscription",
+ ack_messages=True,
+ max_messages=1,
+ gcp_conn_id="google_cloud_default",
+ poke_interval=60.0,
+)
+
+# Define an asset that watches for messages on the Pub/Sub subscription
+asset = Asset("pubsub_queue_asset_1",
watchers=[AssetWatcher(name="pubsub_watcher_1", trigger=trigger)])
+
+with DAG(
+ dag_id="example_pubsub_message_queue_trigger",
+ schedule=[asset],
+) as dag:
+ process_message_task = EmptyOperator(task_id="process_pubsub_message")
+# [END howto_trigger_pubsub_message_queue]
+
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/providers/google/tests/unit/google/cloud/queues/__init__.py
b/providers/google/tests/unit/google/cloud/queues/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/google/tests/unit/google/cloud/queues/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/google/tests/unit/google/cloud/queues/test_pubsub.py
b/providers/google/tests/unit/google/cloud/queues/test_pubsub.py
new file mode 100644
index 00000000000..4b80bae531b
--- /dev/null
+++ b/providers/google/tests/unit/google/cloud/queues/test_pubsub.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.google.cloud.triggers.pubsub import PubsubPullTrigger
+
+pytest.importorskip("airflow.providers.common.messaging.providers.base_provider")
+
+
+def test_message_pubsub_queue_create():
+ from airflow.providers.common.messaging.providers.base_provider import
BaseMessageQueueProvider
+ from airflow.providers.google.cloud.queues.pubsub import
PubsubMessageQueueProvider
+
+ provider = PubsubMessageQueueProvider()
+ assert isinstance(provider, BaseMessageQueueProvider)
+
+
+def test_message_pubsub_queue_trigger_class():
+ from airflow.providers.google.cloud.queues.pubsub import
PubsubMessageQueueProvider
+
+ provider = PubsubMessageQueueProvider()
+ assert provider.trigger_class() == PubsubPullTrigger
+
+
+def test_scheme_matches():
+ from airflow.providers.google.cloud.queues.pubsub import
PubsubMessageQueueProvider
+
+ provider = PubsubMessageQueueProvider()
+ assert provider.scheme_matches("google+pubsub")