This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new e226db4ccc6 [v3-0-test] Move SQS message queue to Amazon provider
(#50057) (#50096)
e226db4ccc6 is described below
commit e226db4ccc65cb7e4a76690a09d3b960b2c45445
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri May 2 15:22:06 2025 +0200
[v3-0-test] Move SQS message queue to Amazon provider (#50057) (#50096)
The common.messaging abstraction should discover common messaging
queue providers using the same mechanism as we have for other core
extensions. Previously common.messaging had the optional (but not
really) dependencies to other providers, but that was not needed
and introduced unnecessary coupling.
By switching to our built-in discovery mechanism we get immediately
all the niceties of provider discovery mechanisms:
* queue is provided by the actual provider where the service or
integration already is implemented (sqs -> amazon provider, in
the future kafka -> kafka provider)
* queues are discovered from installed providers
* there is no coupling or imports between common.messaging and the
providers that implement messaging, the dependency is in the other
way - providers that implement messaging depend on common.messaging
* airflow providers queues CLI and providers core extensions
documentation is automatically generated
(cherry picked from commit 4f962f40274a4b2567fd48eeb27b584befbc7779)
---
.../docs/authoring-and-scheduling/connections.rst | 2 +-
airflow-core/docs/core-concepts/index.rst | 1 +
airflow-core/docs/core-concepts/message-queues.rst | 41 ++++++++++++++++
airflow-core/src/airflow/cli/cli_config.py | 6 +++
.../src/airflow/cli/commands/provider_command.py | 13 +++++
airflow-core/src/airflow/provider.yaml.schema.json | 12 +++++
airflow-core/src/airflow/provider_info.schema.json | 12 +++++
airflow-core/src/airflow/providers_manager.py | 22 +++++++++
.../auth/managers/test_base_auth_manager.py | 20 --------
.../src/sphinx_exts/operators_and_hooks_ref.py | 12 +++++
.../src/sphinx_exts/templates/queues.rst.jinja2 | 27 ++++++++++
.../core-extensions/message-queues.rst | 52 +++++---------------
providers/amazon/docs/index.rst | 3 +-
providers/amazon/docs/message-queues/index.rst | 43 ++++++++++++++++
providers/amazon/provider.yaml | 8 +++
providers/amazon/pyproject.toml | 10 ++--
.../src/airflow/providers/amazon/__init__.py | 2 +-
.../providers/amazon/aws/queues}/__init__.py | 5 --
.../airflow/providers/amazon/aws/queues}/sqs.py | 15 +++++-
.../airflow/providers/amazon/get_provider_info.py | 2 +
.../tests/unit/amazon/aws/queues}/__init__.py | 5 --
.../tests/unit/amazon/aws/queues/test_sqs.py | 57 ++++++++++++++++++++++
.../tests/unit/amazon/aws/triggers/test_sqs.py | 19 +++++++-
providers/cncf/kubernetes/provider.yaml | 2 +-
.../providers/cncf/kubernetes/get_provider_info.py | 2 +-
providers/common/messaging/docs/index.rst | 1 -
providers/common/messaging/docs/providers.rst | 22 +++++----
providers/common/messaging/provider.yaml | 1 +
providers/common/messaging/pyproject.toml | 15 +++---
.../airflow/providers/common/messaging/__init__.py | 6 +--
.../common/messaging/providers/__init__.py | 25 +++++++++-
.../common/messaging/triggers/msg_queue.py | 20 ++++++--
.../common/messaging/triggers/test_msg_queue.py | 19 +++-----
pyproject.toml | 4 +-
.../ci/pre_commit/update_airflow_pyproject_toml.py | 5 +-
.../in_container/run_provider_yaml_files_check.py | 34 +++++++------
36 files changed, 405 insertions(+), 140 deletions(-)
diff --git a/airflow-core/docs/authoring-and-scheduling/connections.rst
b/airflow-core/docs/authoring-and-scheduling/connections.rst
index ba74826519e..7f9cbaa443e 100644
--- a/airflow-core/docs/authoring-and-scheduling/connections.rst
+++ b/airflow-core/docs/authoring-and-scheduling/connections.rst
@@ -47,5 +47,5 @@ Airflow allows to define custom connection types. This is
what is described in d
:doc:`apache-airflow-providers:index` - providers give you the capability of
defining your own connections.
The connection customization can be done by any provider, but also
many of the providers managed by the community define custom connection types.
-The full list of all providers delivered by ``Apache Airflow community managed
providers`` can be found in
+The full list of all connections delivered by ``Apache Airflow community
managed providers`` can be found in
:doc:`apache-airflow-providers:core-extensions/connections`.
diff --git a/airflow-core/docs/core-concepts/index.rst
b/airflow-core/docs/core-concepts/index.rst
index fdb9c2d146a..8ba314cd367 100644
--- a/airflow-core/docs/core-concepts/index.rst
+++ b/airflow-core/docs/core-concepts/index.rst
@@ -43,6 +43,7 @@ Here you can find detailed documentation about each one of
the core concepts of
auth-manager/index
objectstorage
backfill
+ message-queues
**Communication**
diff --git a/airflow-core/docs/core-concepts/message-queues.rst
b/airflow-core/docs/core-concepts/message-queues.rst
new file mode 100644
index 00000000000..573189a24f8
--- /dev/null
+++ b/airflow-core/docs/core-concepts/message-queues.rst
@@ -0,0 +1,41 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _concepts:message-queues:
+
+Message Queues
+==============
+
+The Message Queues are a way to expose capability of external event-driven
scheduling of Dags.
+
+Apache Airflow is primarily designed for time-based and dependency-based
scheduling of workflows. However,
+modern data architectures often require near real-time processing and the
ability to react to
+events from various sources, such as message queues.
+
+Airflow has native event-driven capability, allowing users to create workflows
that can be
+triggered by external events, thus enabling more responsive data pipelines.
+
+Airflow supports poll-based event-driven scheduling, where the Triggerer can
poll
+external message queues using built-in
:class:`airflow.triggers.base.BaseTrigger` classes. This allows users
+to create workflows that can be triggered by external events, such as messages
arriving
+in a queue or changes in a database efficiently.
+
+Airflow constantly monitors the state of an external resource and updates the
asset whenever the external
+resource reaches a given state (if it does reach it). To achieve this, we
leverage Airflow Triggers.
+Triggers are small, asynchronous pieces of Python code whose job is to poll an
external resource state.
+
+The list of supported message queues is available in
:doc:`apache-airflow-providers:core-extensions/message-queues`.
diff --git a/airflow-core/src/airflow/cli/cli_config.py
b/airflow-core/src/airflow/cli/cli_config.py
index 1122033dc94..4300d8c4468 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -1590,6 +1590,12 @@ PROVIDERS_COMMANDS = (
func=lazy_load_command("airflow.cli.commands.provider_command.executors_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
+ ActionCommand(
+ name="queues",
+ help="Get information about queues provided",
+
func=lazy_load_command("airflow.cli.commands.provider_command.queues_list"),
+ args=(ARG_OUTPUT, ARG_VERBOSE),
+ ),
ActionCommand(
name="notifications",
help="Get information about notifications provided",
diff --git a/airflow-core/src/airflow/cli/commands/provider_command.py
b/airflow-core/src/airflow/cli/commands/provider_command.py
index bd03d07ee45..81a96b2a9bc 100644
--- a/airflow-core/src/airflow/cli/commands/provider_command.py
+++ b/airflow-core/src/airflow/cli/commands/provider_command.py
@@ -220,6 +220,19 @@ def executors_list(args):
)
+@suppress_logs_and_warning
+@providers_configuration_loaded
+def queues_list(args):
+ """List all queues at the command line."""
+ AirflowConsole().print_as(
+ data=list(ProvidersManager().queue_class_names),
+ output=args.output,
+ mapper=lambda x: {
+ "queue_class_names": x,
+ },
+ )
+
+
@suppress_logs_and_warning
@providers_configuration_loaded
def config_list(args):
diff --git a/airflow-core/src/airflow/provider.yaml.schema.json
b/airflow-core/src/airflow/provider.yaml.schema.json
index 75ba892569b..c35e0d9de25 100644
--- a/airflow-core/src/airflow/provider.yaml.schema.json
+++ b/airflow-core/src/airflow/provider.yaml.schema.json
@@ -467,6 +467,18 @@
}
}
},
+ "queues": {
+ "type": "array",
+ "description": "Message Queues exposed by the provider",
+ "items": {
+ "name": {
+ "type": "string"
+ },
+ "message-queue-class": {
+ "type": "string"
+ }
+ }
+ },
"source-date-epoch": {
"type": "integer",
"description": "Source date epoch - seconds since epoch (gmtime)
when the release documentation was prepared. Used to generate reproducible
package builds with flint.",
diff --git a/airflow-core/src/airflow/provider_info.schema.json
b/airflow-core/src/airflow/provider_info.schema.json
index 1785ba02ed6..3ca9756dfb2 100644
--- a/airflow-core/src/airflow/provider_info.schema.json
+++ b/airflow-core/src/airflow/provider_info.schema.json
@@ -416,6 +416,18 @@
"description": "Class to instantiate the plugin"
}
}
+ },
+ "queues": {
+ "type": "array",
+ "description": "Message Queues exposed by the provider",
+ "items": {
+ "name": {
+ "type": "string"
+ },
+ "message-queue-class": {
+ "type": "string"
+ }
+ }
}
},
"definitions": {
diff --git a/airflow-core/src/airflow/providers_manager.py
b/airflow-core/src/airflow/providers_manager.py
index 85062e9f75e..81611c8205d 100644
--- a/airflow-core/src/airflow/providers_manager.py
+++ b/airflow-core/src/airflow/providers_manager.py
@@ -416,6 +416,7 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
self._auth_manager_class_name_set: set[str] = set()
self._secrets_backend_class_name_set: set[str] = set()
self._executor_class_name_set: set[str] = set()
+ self._queue_class_name_set: set[str] = set()
self._provider_configs: dict[str, dict[str, Any]] = {}
self._trigger_info_set: set[TriggerInfo] = set()
self._notification_info_set: set[NotificationInfo] = set()
@@ -533,6 +534,12 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
self.initialize_providers_list()
self._discover_executors()
+ @provider_info_cache("queues")
+ def initialize_providers_queues(self):
+ """Lazy initialization of providers queue information."""
+ self.initialize_providers_list()
+ self._discover_queues()
+
@provider_info_cache("notifications")
def initialize_providers_notifications(self):
"""Lazy initialization of providers notifications information."""
@@ -1091,6 +1098,14 @@ class ProvidersManager(LoggingMixin,
metaclass=Singleton):
if _correctness_check(provider_package,
executors_class_name, provider):
self._executor_class_name_set.add(executors_class_name)
+ def _discover_queues(self) -> None:
+ """Retrieve all queues defined in the providers."""
+ for provider_package, provider in self._provider_dict.items():
+ if provider.data.get("queues"):
+ for queue_class_name in provider.data["queues"]:
+ if _correctness_check(provider_package, queue_class_name,
provider):
+ self._queue_class_name_set.add(queue_class_name)
+
def _discover_config(self) -> None:
"""Retrieve all configs defined in the providers."""
for provider_package, provider in self._provider_dict.items():
@@ -1221,6 +1236,11 @@ class ProvidersManager(LoggingMixin,
metaclass=Singleton):
self.initialize_providers_executors()
return sorted(self._executor_class_name_set)
+ @property
+ def queue_class_names(self) -> list[str]:
+ self.initialize_providers_queues()
+ return sorted(self._queue_class_name_set)
+
@property
def filesystem_module_names(self) -> list[str]:
self.initialize_providers_filesystems()
@@ -1268,9 +1288,11 @@ class ProvidersManager(LoggingMixin,
metaclass=Singleton):
self._auth_manager_class_name_set.clear()
self._secrets_backend_class_name_set.clear()
self._executor_class_name_set.clear()
+ self._queue_class_name_set.clear()
self._provider_configs.clear()
self._trigger_info_set.clear()
self._notification_info_set.clear()
self._plugins_set.clear()
+
self._initialized = False
self._initialization_stack_trace = None
diff --git
a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
index a5658623a70..aad69253ef2 100644
---
a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
+++
b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
@@ -33,8 +33,6 @@ from
airflow.api_fastapi.auth.managers.models.resource_details import (
)
from airflow.api_fastapi.auth.tokens import JWTGenerator, JWTValidator
from airflow.api_fastapi.common.types import MenuItem
-from airflow.providers_manager import ProvidersManager
-from airflow.utils.module_loading import import_string
if TYPE_CHECKING:
from airflow.api_fastapi.auth.managers.base_auth_manager import
ResourceMethod
@@ -367,21 +365,3 @@ class TestBaseAuthManager:
session.execute.return_value = dags
result = auth_manager.get_authorized_dag_ids(user=user,
session=session)
assert result == expected
-
-
-def test_auth_managers_have_create_token_endpoint(test_client):
- auth_managers = ProvidersManager().auth_managers
-
auth_managers.append("airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager")
-
- for auth_manager_module in auth_managers:
- auth_manager_cls = import_string(auth_manager_module)
- am = auth_manager_cls()
- am.init()
-
- response = test_client.post(
- "/auth/token",
- json={"username": "", "password": ""},
- )
- assert response.status_code not in [404, 405], (
- f"The auth manager {auth_manager_module} does not provide an
endpoint to create a JWT token. This endpoint should be POST /auth/token"
- )
diff --git a/devel-common/src/sphinx_exts/operators_and_hooks_ref.py
b/devel-common/src/sphinx_exts/operators_and_hooks_ref.py
index 2730df37bae..eab292d668b 100644
--- a/devel-common/src/sphinx_exts/operators_and_hooks_ref.py
+++ b/devel-common/src/sphinx_exts/operators_and_hooks_ref.py
@@ -481,6 +481,17 @@ class ExecutorsDirective(BaseJinjaReferenceDirective):
)
+class QueuesDirective(BaseJinjaReferenceDirective):
+ """Generate list of queues"""
+
+ def render_content(
+ self, *, tags: set[str] | None, header_separator: str =
DEFAULT_HEADER_SEPARATOR
+ ) -> str:
+ return _common_render_list_content(
+ header_separator=header_separator, resource_type="queues",
template="queues.rst.jinja2"
+ )
+
+
class DeferrableOperatorDirective(BaseJinjaReferenceDirective):
"""Generate list of deferrable operators"""
@@ -521,6 +532,7 @@ def setup(app):
app.add_directive("airflow-extra-links", ExtraLinksDirective)
app.add_directive("airflow-notifications", NotificationsDirective)
app.add_directive("airflow-executors", ExecutorsDirective)
+ app.add_directive("airflow-queues", QueuesDirective)
app.add_directive("airflow-deferrable-operators",
DeferrableOperatorDirective)
app.add_directive("airflow-deprecations", DeprecationsDirective)
app.add_directive("airflow-dataset-schemes", AssetSchemeDirective)
diff --git a/devel-common/src/sphinx_exts/templates/queues.rst.jinja2
b/devel-common/src/sphinx_exts/templates/queues.rst.jinja2
new file mode 100644
index 00000000000..914d3476906
--- /dev/null
+++ b/devel-common/src/sphinx_exts/templates/queues.rst.jinja2
@@ -0,0 +1,27 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+#}
+{%for provider, provider_dict in items.items() %}
+{{ provider_dict['name'] }}
+{{ header_separator * (provider_dict['name']|length) }}
+
+{% for queue in provider_dict['queues'] -%}
+- :class:`~{{ queue }}`
+{% endfor -%}
+
+{% endfor %}
diff --git a/airflow-core/docs/core-concepts/index.rst
b/providers-summary-docs/core-extensions/message-queues.rst
similarity index 56%
copy from airflow-core/docs/core-concepts/index.rst
copy to providers-summary-docs/core-extensions/message-queues.rst
index fdb9c2d146a..46f3f40d2a3 100644
--- a/airflow-core/docs/core-concepts/index.rst
+++ b/providers-summary-docs/core-extensions/message-queues.rst
@@ -15,47 +15,19 @@
specific language governing permissions and limitations
under the License.
-Core Concepts
-=============================
+Message Queues
+--------------
-Here you can find detailed documentation about each one of the core concepts
of Apache Airflow® and how to use them, as well as a high-level
:doc:`architectural overview <overview>`.
+This is a summary of all Apache Airflow Community provided implementations of
Queues
+exposed via community-managed providers.
-**Architecture**
+Airflow can be extended by providers with Queues. Each provider can define
their own Queues,
+that can be configured to handle executing tasks
-.. toctree::
- :maxdepth: 2
+The queues are explained in
+:doc:`apache-airflow:core-concepts/message-queues` and you can also see those
+provided by the community-managed providers:
- overview
-
-
-**Workloads**
-
-.. toctree::
- :maxdepth: 2
-
- dags
- dag-run
- tasks
- operators
- sensors
- taskflow
- executor/index
- auth-manager/index
- objectstorage
- backfill
-
-**Communication**
-
-.. toctree::
- :maxdepth: 2
-
- xcoms
- variables
- params
-
-**Debugging**
-
-.. toctree::
- :maxdepth: 1
-
- debug
+.. airflow-queues::
+ :tags: None
+ :header-separator: "
diff --git a/providers/amazon/docs/index.rst b/providers/amazon/docs/index.rst
index 013d11831c7..99c5cc218ae 100644
--- a/providers/amazon/docs/index.rst
+++ b/providers/amazon/docs/index.rst
@@ -42,7 +42,8 @@
Logging for Tasks <logging/index>
Configuration <configurations-ref>
Executors <executors/index>
- Auth manager <auth-manager/index>
+ Message Queues <message-queues/index>
+ AWS Auth manager <auth-manager/index>
CLI <cli-ref>
.. toctree::
diff --git a/providers/amazon/docs/message-queues/index.rst
b/providers/amazon/docs/message-queues/index.rst
new file mode 100644
index 00000000000..388be374c66
--- /dev/null
+++ b/providers/amazon/docs/message-queues/index.rst
@@ -0,0 +1,43 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Amazon Messaging Queues
+=======================
+
+Amazon SQS Queue Provider
+-------------------------
+
+Implemented by
:class:`~airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider`
+
+The Amazon SQS Queue Provider is a message queue provider that uses
+Amazon Simple Queue Service (SQS) as the underlying message queue system.
+It allows you to send and receive messages using SQS queues in your Airflow
workflows.
+The provider supports both standard and FIFO queues, and it provides features
+such as message visibility timeout, message retention period, and dead-letter
queues.
+
+The queue must be matching this regex:
+
+.. exampleinclude:: /../src/airflow/providers/amazon/aws/queues/sqs.py
+ :language: python
+ :dedent: 0
+ :start-after: [START queue_regexp]
+ :end-before: [END queue_regexp]
+
+
+The queue parameter is passed directly to ``sqs_queue`` parameter of the
underlying
+:class:`~airflow.providers.amazon.aws.triggers.sqs.SqsSensorTrigger` class,
and passes
+all the kwargs directly to the trigger constructor if added.
diff --git a/providers/amazon/provider.yaml b/providers/amazon/provider.yaml
index 432ed11047e..ebc4df27a35 100644
--- a/providers/amazon/provider.yaml
+++ b/providers/amazon/provider.yaml
@@ -25,6 +25,8 @@ state: ready
source-date-epoch: 1744280881
# note that those versions are maintained by release manager - do not update
them manually
versions:
+ - 9.7.0
+ - 9.6.1
- 9.6.0
- 9.5.0
- 9.4.0
@@ -1182,3 +1184,9 @@ config:
executors:
- airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor
+
+auth-managers:
+ - airflow.providers.amazon.aws.auth_manager.aws_auth_manager.AwsAuthManager
+
+queues:
+ - airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider
diff --git a/providers/amazon/pyproject.toml b/providers/amazon/pyproject.toml
index 8d4eb4622a3..b890b6feea0 100644
--- a/providers/amazon/pyproject.toml
+++ b/providers/amazon/pyproject.toml
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
[project]
name = "apache-airflow-providers-amazon"
-version = "9.6.0"
+version = "9.7.0"
description = "Provider package apache-airflow-providers-amazon for Apache
Airflow"
readme = "README.rst"
authors = [
@@ -142,6 +142,9 @@ dependencies = [
"standard" = [
"apache-airflow-providers-standard"
]
+"common.messaging" = [
+ "apache-airflow-providers-common-messaging>=1.0.1"
+]
[dependency-groups]
dev = [
@@ -151,6 +154,7 @@ dev = [
"apache-airflow-providers-apache-hive",
"apache-airflow-providers-cncf-kubernetes",
"apache-airflow-providers-common-compat",
+ "apache-airflow-providers-common-messaging",
"apache-airflow-providers-common-sql",
"apache-airflow-providers-exasol",
"apache-airflow-providers-ftp",
@@ -204,8 +208,8 @@ apache-airflow-providers-common-sql = {workspace = true}
apache-airflow-providers-standard = {workspace = true}
[project.urls]
-"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.0"
-"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.0/changelog.html"
+"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0"
+"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0/changelog.html"
"Bug Tracker" = "https://github.com/apache/airflow/issues"
"Source Code" = "https://github.com/apache/airflow"
"Slack Chat" = "https://s.apache.org/airflow-slack"
diff --git a/providers/amazon/src/airflow/providers/amazon/__init__.py
b/providers/amazon/src/airflow/providers/amazon/__init__.py
index a67f8844e6a..44e1ea686f8 100644
--- a/providers/amazon/src/airflow/providers/amazon/__init__.py
+++ b/providers/amazon/src/airflow/providers/amazon/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
__all__ = ["__version__"]
-__version__ = "9.6.0"
+__version__ = "9.7.0"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
"2.9.0"
diff --git
a/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
b/providers/amazon/src/airflow/providers/amazon/aws/queues/__init__.py
similarity index 81%
copy from
providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
copy to providers/amazon/src/airflow/providers/amazon/aws/queues/__init__.py
index b889022858b..13a83393a91 100644
---
a/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/queues/__init__.py
@@ -14,8 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from __future__ import annotations
-
-from airflow.providers.common.messaging.providers.sqs import
SqsMessageQueueProvider
-
-MESSAGE_QUEUE_PROVIDERS = [SqsMessageQueueProvider()]
diff --git
a/providers/common/messaging/src/airflow/providers/common/messaging/providers/sqs.py
b/providers/amazon/src/airflow/providers/amazon/aws/queues/sqs.py
similarity index 73%
rename from
providers/common/messaging/src/airflow/providers/common/messaging/providers/sqs.py
rename to providers/amazon/src/airflow/providers/amazon/aws/queues/sqs.py
index 97ca2dd8905..aff5b462b7f 100644
---
a/providers/common/messaging/src/airflow/providers/common/messaging/providers/sqs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/queues/sqs.py
@@ -19,18 +19,29 @@ from __future__ import annotations
import re
from typing import TYPE_CHECKING
+from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger
-from airflow.providers.common.messaging.providers.base_provider import
BaseMessageQueueProvider
+
+try:
+ from airflow.providers.common.messaging.providers.base_provider import
BaseMessageQueueProvider
+except ImportError:
+ raise AirflowOptionalProviderFeatureException(
+ "This feature requires the 'common.messaging' provider to be installed
in version >= 1.0.1."
+ )
if TYPE_CHECKING:
from airflow.triggers.base import BaseEventTrigger
+# [START queue_regexp]
+QUEUE_REGEXP = r"^https://sqs\.[^.]+\.amazonaws\.com/[0-9]+/.+"
+# [END queue_regexp]
+
class SqsMessageQueueProvider(BaseMessageQueueProvider):
"""Configuration for SQS integration with common-messaging."""
def queue_matches(self, queue: str) -> bool:
- return bool(re.match(r"^https://sqs\.[^.]+\.amazonaws\.com/[0-9]+/.+",
queue))
+ return bool(re.match(QUEUE_REGEXP, queue))
def trigger_class(self) -> type[BaseEventTrigger]:
return SqsSensorTrigger
diff --git a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
index 979e8682e98..31d1c5e8fd1 100644
--- a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
+++ b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
@@ -1297,4 +1297,6 @@ def get_provider_info():
},
},
"executors":
["airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"],
+ "auth-managers":
["airflow.providers.amazon.aws.auth_manager.aws_auth_manager.AwsAuthManager"],
+ "queues":
["airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider"],
}
diff --git
a/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
b/providers/amazon/tests/unit/amazon/aws/queues/__init__.py
similarity index 81%
copy from
providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
copy to providers/amazon/tests/unit/amazon/aws/queues/__init__.py
index b889022858b..13a83393a91 100644
---
a/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
+++ b/providers/amazon/tests/unit/amazon/aws/queues/__init__.py
@@ -14,8 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from __future__ import annotations
-
-from airflow.providers.common.messaging.providers.sqs import
SqsMessageQueueProvider
-
-MESSAGE_QUEUE_PROVIDERS = [SqsMessageQueueProvider()]
diff --git a/providers/amazon/tests/unit/amazon/aws/queues/test_sqs.py
b/providers/amazon/tests/unit/amazon/aws/queues/test_sqs.py
new file mode 100644
index 00000000000..95c28392473
--- /dev/null
+++ b/providers/amazon/tests/unit/amazon/aws/queues/test_sqs.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger
+
+pytest.importorskip("airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider")
+
+
+def test_message_sqs_queue_create():
+ from airflow.providers.amazon.aws.queues.sqs import SqsMessageQueueProvider
+ from airflow.providers.common.messaging.providers.base_provider import
BaseMessageQueueProvider
+
+ provider = SqsMessageQueueProvider()
+ assert isinstance(provider, BaseMessageQueueProvider)
+
+
+def test_message_sqs_queue_matches():
+ from airflow.providers.amazon.aws.queues.sqs import SqsMessageQueueProvider
+
+ provider = SqsMessageQueueProvider()
+ assert
provider.queue_matches("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue")
+ assert not
provider.queue_matches("https://sqs.us-east-1.amazonaws.com/123456789012")
+ assert not
provider.queue_matches("https://sqs.us-east-1.amazonaws.com/123456789012/")
+ assert not provider.queue_matches("https://sqs.us-east-1.amazonaws.com/")
+
+
+def test_message_sqs_queue_trigger_class():
+ from airflow.providers.amazon.aws.queues.sqs import SqsMessageQueueProvider
+
+ provider = SqsMessageQueueProvider()
+ assert provider.trigger_class() == SqsSensorTrigger
+
+
+def test_message_sqs_queue_trigger_kwargs():
+ from airflow.providers.amazon.aws.queues.sqs import SqsMessageQueueProvider
+
+ provider = SqsMessageQueueProvider()
+ assert
provider.trigger_kwargs("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue")
== {
+ "sqs_queue":
"https://sqs.us-east-1.amazonaws.com/123456789012/my-queue",
+ }
diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_sqs.py
b/providers/amazon/tests/unit/amazon/aws/triggers/test_sqs.py
index 97c6cde1750..11ea1c9ca0c 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_sqs.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_sqs.py
@@ -20,7 +20,7 @@ from unittest.mock import AsyncMock
import pytest
-from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
get_base_airflow_version_tuple
TEST_SQS_QUEUE = "test-sqs-queue"
TEST_AWS_CONN_ID = "test-aws-conn-id"
@@ -39,7 +39,9 @@ TEST_BOTOCORE_CONFIG = {"region_name": "us-east-1"}
class TestSqsTriggers:
@pytest.fixture(autouse=True)
- def _setup_test_cases(self):
+ def _setup_test_cases(self, cleanup_providers_manager):
+ from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger
+
self.sqs_trigger = SqsSensorTrigger(
sqs_queue=TEST_SQS_QUEUE,
aws_conn_id=TEST_AWS_CONN_ID,
@@ -92,3 +94,16 @@ class TestSqsTriggers:
mock_client.receive_message.return_value = mock_response
messages = await self.sqs_trigger.poke(client=mock_client)
assert len(messages) == 0
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Requires Airflow 3.0.+")
+class TestMessageQueueTrigger:
+ def test_provider_integrations(self, cleanup_providers_manager):
+ if get_base_airflow_version_tuple() < (3, 0, 1):
+ pytest.skip("This test is only for Airflow 3.0.1+")
+ queue = "https://sqs.us-east-1.amazonaws.com/0123456789/Test"
+ from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger
+ from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
+
+ trigger = MessageQueueTrigger(queue=queue)
+ assert isinstance(trigger.trigger, SqsSensorTrigger)
diff --git a/providers/cncf/kubernetes/provider.yaml
b/providers/cncf/kubernetes/provider.yaml
index dbdb7dee67e..53ce67ddd9a 100644
--- a/providers/cncf/kubernetes/provider.yaml
+++ b/providers/cncf/kubernetes/provider.yaml
@@ -371,4 +371,4 @@ config:
default: "0"
executors:
- - airflow.providers.cncf.kubernetes.kubernetes_executor.KubernetesExecutor
+ -
airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py
index 821f95cf614..b6f2da2ee7f 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py
@@ -279,5 +279,5 @@ def get_provider_info():
},
},
},
- "executors":
["airflow.providers.cncf.kubernetes.kubernetes_executor.KubernetesExecutor"],
+ "executors":
["airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor"],
}
diff --git a/providers/common/messaging/docs/index.rst
b/providers/common/messaging/docs/index.rst
index 0f50a2c2eb3..d2217175a89 100644
--- a/providers/common/messaging/docs/index.rst
+++ b/providers/common/messaging/docs/index.rst
@@ -44,7 +44,6 @@
Python API <_api/airflow/providers/common/messaging/index>
Base Provider
<_api/airflow/providers/common/messaging/providers/base_provider/index>
- SQS provider <_api/airflow/providers/common/messaging/providers/sqs/index>
.. toctree::
:hidden:
diff --git a/providers/common/messaging/docs/providers.rst
b/providers/common/messaging/docs/providers.rst
index c0b6b013f11..0559f9c6ece 100644
--- a/providers/common/messaging/docs/providers.rst
+++ b/providers/common/messaging/docs/providers.rst
@@ -19,20 +19,22 @@
Messaging Triggers
==================
-Operators, sensors, and triggers in the Common Messaging provider serve as
wrappers around those from other providers,
-specifically for message queues.
-They offer an abstraction layer to simplify usage and make it easier to switch
between different queue providers.
-
-Supported queue providers
-~~~~~~~~~~~~~~~~~~~~~~~~~
-
-* Amazon SQS:
:class:`~airflow.providers.common.messaging.providers.sqs.SqsMessageQueueProvider`
+Operators, sensors, and triggers in the Common Messaging provider serve as
wrappers around those
+from other providers, specifically for message queues.
+They offer an abstraction layer to simplify usage and make it easier to switch
between
+different queue providers.
Add support for a provider
~~~~~~~~~~~~~~~~~~~~~~~~~~
To support a new provider please follow the steps below:
-1. Create a new class extending
:class:`~airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider`.
+1. Create a new class in the provider extending
+:class:`~airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider`.
Make sure it implements all abstract methods
-2. Add this class to the list ``MESSAGE_QUEUE_PROVIDERS`` in
``airflow/providers/common/messaging/providers/__init__.py``
+
+2. Expose it via "queues" property in the ``provider.yaml`` file of the
provider where you add the new class.
+The ``queues`` property should be a list of fully-qualified class names of the
queues.
+
+
+The list of supported message queues is available in
:doc:`apache-airflow-providers:core-extensions/message-queues`.
diff --git a/providers/common/messaging/provider.yaml
b/providers/common/messaging/provider.yaml
index b6fd64d5ffb..71716328866 100644
--- a/providers/common/messaging/provider.yaml
+++ b/providers/common/messaging/provider.yaml
@@ -25,6 +25,7 @@ state: ready
source-date-epoch: 1741121853
# note that those versions are maintained by release manager - do not update
them manually
versions:
+ - 1.1.0
- 1.0.0
triggers:
diff --git a/providers/common/messaging/pyproject.toml
b/providers/common/messaging/pyproject.toml
index 7447ce1d3cf..8ab8b41b5fe 100644
--- a/providers/common/messaging/pyproject.toml
+++ b/providers/common/messaging/pyproject.toml
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
[project]
name = "apache-airflow-providers-common-messaging"
-version = "1.0.0"
+version = "1.1.0"
description = "Provider package apache-airflow-providers-common-messaging for
Apache Airflow"
readme = "README.rst"
authors = [
@@ -57,14 +57,17 @@ requires-python = "~=3.9"
# Make sure to run ``breeze static-checks --type update-providers-dependencies
--all-files``
# After you modify the dependencies, and rebuild your Breeze CI image with
``breeze ci-image build``
dependencies = [
- "apache-airflow>=3.0.0",
+ # Provider's manager had a missing support for common.messaging provider's
discovery before Airflow 3.0.1
+ # The new common.messaging provider is not compatible with Airflow 3.0.0,
and Airflow has to be
+ # Upgraded to 3.0.1+ to use the new provider.
+ "apache-airflow>=3.0.1",
]
# The optional dependencies should be modified in place in the generated file
# Any change in the dependencies is preserved when the file is regenerated
[project.optional-dependencies]
"amazon" = [
- "apache-airflow-providers-amazon"
+ "apache-airflow-providers-amazon>=9.7.0"
]
[dependency-groups]
@@ -72,8 +75,8 @@ dev = [
"apache-airflow",
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
- "apache-airflow-providers-amazon",
# Additional devel dependencies (do not remove this line and add extra
development dependencies)
+ "apache-airflow-providers-amazon",
]
# To build docs:
@@ -102,8 +105,8 @@ apache-airflow-providers-common-sql = {workspace = true}
apache-airflow-providers-standard = {workspace = true}
[project.urls]
-"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-common-messaging/1.0.0"
-"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-common-messaging/1.0.0/changelog.html"
+"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-common-messaging/1.1.0"
+"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-common-messaging/1.1.0/changelog.html"
"Bug Tracker" = "https://github.com/apache/airflow/issues"
"Source Code" = "https://github.com/apache/airflow"
"Slack Chat" = "https://s.apache.org/airflow-slack"
diff --git
a/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py
b/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py
index 563a134cb38..b5a8fc53a40 100644
---
a/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py
+++
b/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py
@@ -29,11 +29,11 @@ from airflow import __version__ as airflow_version
__all__ = ["__version__"]
-__version__ = "1.0.0"
+__version__ = "1.1.0"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
- "3.0.0"
+ "3.0.1"
):
raise RuntimeError(
- f"The package
`apache-airflow-providers-common-messaging:{__version__}` needs Apache Airflow
3.0.0+"
+ f"The package
`apache-airflow-providers-common-messaging:{__version__}` needs Apache Airflow
3.0.1+"
)
diff --git
a/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
b/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
index b889022858b..91a3d7ca845 100644
---
a/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
+++
b/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py
@@ -16,6 +16,27 @@
# under the License.
from __future__ import annotations
-from airflow.providers.common.messaging.providers.sqs import
SqsMessageQueueProvider
+import importlib
-MESSAGE_QUEUE_PROVIDERS = [SqsMessageQueueProvider()]
+from airflow.providers_manager import ProvidersManager
+from airflow.utils.deprecation_tools import add_deprecated_classes
+
+providers_manager = ProvidersManager()
+providers_manager.initialize_providers_queues()
+
+
+def create_class_by_name(name: str):
+ module_name, class_name = name.rsplit(".", 1)
+ module = importlib.import_module(module_name)
+ return getattr(module, class_name)
+
+
+MESSAGE_QUEUE_PROVIDERS = [create_class_by_name(name)() for name in
providers_manager.queue_class_names]
+
+__deprecated_classes = {
+ "sqs": {
+ "SqsMessageQueueProvider":
"airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider",
+ },
+}
+
+add_deprecated_classes(__deprecated_classes, __name__)
diff --git
a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py
b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py
index 35f1cfbfb39..689bbf6d7d4 100644
---
a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py
+++
b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py
@@ -21,7 +21,6 @@ from collections.abc import AsyncIterator
from functools import cached_property
from typing import Any
-from airflow.exceptions import AirflowException
from airflow.providers.common.messaging.providers import
MESSAGE_QUEUE_PROVIDERS
from airflow.triggers.base import BaseEventTrigger, TriggerEvent
@@ -48,17 +47,30 @@ class MessageQueueTrigger(BaseEventTrigger):
@cached_property
def trigger(self) -> BaseEventTrigger:
+ if len(MESSAGE_QUEUE_PROVIDERS) == 0:
+ self.log.error(
+ "No message queue providers are available. "
+ "Please ensure that you have the necessary providers
installed."
+ )
+ raise ValueError("No message queue providers are available. ")
providers = [provider for provider in MESSAGE_QUEUE_PROVIDERS if
provider.queue_matches(self.queue)]
if len(providers) == 0:
- raise ValueError(f"The queue '{self.queue}' is not recognized by
``MessageQueueTrigger``.")
+ self.log.error(
+ "The queue '%s' is not recognized by any of the registered
providers. "
+ "The available providers are: '%s'.",
+ self.queue,
+ ", ".join([provider for provider in MESSAGE_QUEUE_PROVIDERS]),
+ )
+ raise ValueError("The queue is not recognized by any of the
registered providers.")
if len(providers) > 1:
self.log.error(
"The queue '%s' is recognized by more than one provider. "
"At least two providers in ``MESSAGE_QUEUE_PROVIDERS`` are
colliding with each "
- "other.",
+ "other: '%s'",
self.queue,
+ ", ".join([provider for provider in providers]),
)
- raise AirflowException(f"The queue '{self.queue}' is recognized by
more than one provider.")
+ raise ValueError(f"The queue '{self.queue}' is recognized by more
than one provider.")
return providers[0].trigger_class()(
**providers[0].trigger_kwargs(self.queue, **self.kwargs),
**self.kwargs
)
diff --git
a/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py
b/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py
index af998ebb074..89f3fedf641 100644
---
a/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py
+++
b/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py
@@ -16,19 +16,14 @@
# under the License.
from __future__ import annotations
-import pytest
+from unittest import mock
-from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger
from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
-class TestMessageQueueTrigger:
- @pytest.mark.parametrize(
- ("queue", "expected_trigger_class"),
- [
- ("https://sqs.us-east-1.amazonaws.com/0123456789/Test",
SqsSensorTrigger),
- ],
- )
- def test_provider_integrations(self, queue, expected_trigger_class):
- trigger = MessageQueueTrigger(queue=queue)
- assert isinstance(trigger.trigger, expected_trigger_class)
[email protected](
+ "airflow.providers.common.messaging.providers.MESSAGE_QUEUE_PROVIDERS",
new_callable=mock.PropertyMock
+)
+def test_provider_integrations(_):
+ trigger = MessageQueueTrigger(queue="any queue")
+ assert trigger is not None
diff --git a/pyproject.toml b/pyproject.toml
index c6c56fbeb00..2105ae08f3c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -176,7 +176,7 @@ packages = []
"apache-airflow-providers-common-io>=1.4.2"
]
"common.messaging" = [
- "apache-airflow-providers-common-messaging>=1.0.0"
+ "apache-airflow-providers-common-messaging>=1.0.1" # Set from
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
]
"common.sql" = [
"apache-airflow-providers-common-sql>=1.18.0"
@@ -407,7 +407,7 @@ packages = []
"apache-airflow-providers-cohere>=1.4.0",
"apache-airflow-providers-common-compat>=1.2.1",
"apache-airflow-providers-common-io>=1.4.2",
- "apache-airflow-providers-common-messaging>=1.0.0",
+ "apache-airflow-providers-common-messaging>=1.0.1", # Set from
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-common-sql>=1.18.0",
"apache-airflow-providers-databricks>=6.11.0",
"apache-airflow-providers-datadog>=3.8.0",
diff --git a/scripts/ci/pre_commit/update_airflow_pyproject_toml.py
b/scripts/ci/pre_commit/update_airflow_pyproject_toml.py
index 64828883b94..8bf1fb2ca2f 100755
--- a/scripts/ci/pre_commit/update_airflow_pyproject_toml.py
+++ b/scripts/ci/pre_commit/update_airflow_pyproject_toml.py
@@ -30,9 +30,8 @@ from pathlib import Path
from packaging.version import Version, parse as parse_version
sys.path.insert(0, str(Path(__file__).parent.resolve())) # make sure
common_precommit_utils is imported
-from common_precommit_utils import console, get_all_provider_ids,
insert_documentation
+from common_precommit_utils import AIRFLOW_ROOT_PATH, console,
get_all_provider_ids, insert_documentation
-AIRFLOW_ROOT_PATH = Path(__file__).parents[3].resolve()
AIRFLOW_PYPROJECT_TOML_FILE = AIRFLOW_ROOT_PATH / "pyproject.toml"
AIRFLOW_CORE_ROOT_PATH = AIRFLOW_ROOT_PATH / "airflow-core"
AIRFLOW_CORE_PYPROJECT_TOML_FILE = AIRFLOW_CORE_ROOT_PATH / "pyproject.toml"
@@ -60,7 +59,7 @@ MIN_VERSION_OVERRIDE: dict[str, Version] = {
"fab": parse_version("2.0.2"),
"openlineage": parse_version("2.1.3"),
"git": parse_version("0.0.2"),
- "common.messaging": parse_version("1.0.0"),
+ "common.messaging": parse_version("1.0.1"),
}
diff --git a/scripts/in_container/run_provider_yaml_files_check.py
b/scripts/in_container/run_provider_yaml_files_check.py
index 348997b4888..d1d11cb4d9c 100755
--- a/scripts/in_container/run_provider_yaml_files_check.py
+++ b/scripts/in_container/run_provider_yaml_files_check.py
@@ -467,9 +467,7 @@ def check_plugin_classes(yaml_files: dict[str, dict]) ->
tuple[int, int]:
return num_plugins, num_errors
-@run_check("Checking extra-links belong to package, exist and are classes")
-def check_extra_link_classes(yaml_files: dict[str, dict]) -> tuple[int, int]:
- resource_type = "extra-links"
+def _check_simple_class_list(resource_type, yaml_files):
num_errors = 0
num_extra_links = 0
for yaml_file_path, provider_data in yaml_files.items():
@@ -483,20 +481,24 @@ def check_extra_link_classes(yaml_files: dict[str, dict])
-> tuple[int, int]:
return num_extra_links, num_errors
+@run_check("Checking extra-links belong to package, exist and are classes")
+def check_extra_link_classes(yaml_files: dict[str, dict]) -> tuple[int, int]:
+ return _check_simple_class_list("extra-links", yaml_files)
+
+
@run_check("Checking notifications belong to package, exist and are classes")
def check_notification_classes(yaml_files: dict[str, dict]) -> tuple[int, int]:
- resource_type = "notifications"
- num_errors = 0
- num_notifications = 0
- for yaml_file_path, provider_data in yaml_files.items():
- provider_package = _filepath_to_module(yaml_file_path)
- notifications = provider_data.get(resource_type)
- if notifications:
- num_notifications += len(notifications)
- num_errors += check_if_objects_exist_and_belong_to_package(
- notifications, provider_package, yaml_file_path,
resource_type, ObjectType.CLASS
- )
- return num_notifications, num_errors
+ return _check_simple_class_list("notifications", yaml_files)
+
+
+@run_check("Checking executors belong to package, exist and are classes")
+def check_executor_classes(yaml_files: dict[str, dict]) -> tuple[int, int]:
+ return _check_simple_class_list("executors", yaml_files)
+
+
+@run_check("Checking queues belong to package, exist and are classes")
+def check_queue_classes(yaml_files: dict[str, dict]) -> tuple[int, int]:
+ return _check_simple_class_list("queues", yaml_files)
@run_check("Checking for duplicates in list of transfers")
@@ -731,6 +733,8 @@ if __name__ == "__main__":
check_completeness_of_list_of_transfers(all_parsed_yaml_files)
check_hook_class_name_entries_in_connection_types(all_parsed_yaml_files)
+ check_executor_classes(all_parsed_yaml_files)
+ check_queue_classes(all_parsed_yaml_files)
check_plugin_classes(all_parsed_yaml_files)
check_extra_link_classes(all_parsed_yaml_files)
check_correctness_of_list_of_sensors_operators_hook_trigger_modules(all_parsed_yaml_files)