This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1240dcc167 D205 Support - Providers: GRPC to Oracle (inclusive)
(#32357)
1240dcc167 is described below
commit 1240dcc167c4b47331db81deff61fc688df118c2
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Jul 4 23:10:26 2023 -0700
D205 Support - Providers: GRPC to Oracle (inclusive) (#32357)
---
.../providers/hashicorp/_internal_client/vault_client.py | 15 +++++++--------
airflow/providers/hashicorp/secrets/vault.py | 5 +++--
airflow/providers/http/sensors/http.py | 3 +--
airflow/providers/imap/hooks/imap.py | 4 ++--
airflow/providers/jenkins/hooks/jenkins.py | 6 ++++--
.../providers/jenkins/operators/jenkins_job_trigger.py | 4 ++--
airflow/providers/microsoft/azure/hooks/adx.py | 5 +++--
airflow/providers/microsoft/azure/hooks/base_azure.py | 6 ++++--
airflow/providers/microsoft/azure/hooks/cosmos.py | 5 +----
airflow/providers/microsoft/azure/hooks/fileshare.py | 2 ++
airflow/providers/microsoft/azure/hooks/wasb.py | 2 ++
.../providers/microsoft/azure/log/wasb_task_handler.py | 13 ++++++-------
airflow/providers/microsoft/azure/operators/asb.py | 10 ++--------
.../providers/microsoft/azure/operators/data_factory.py | 4 ++--
airflow/providers/microsoft/azure/secrets/key_vault.py | 1 +
.../providers/microsoft/azure/sensors/data_factory.py | 4 ++--
airflow/providers/microsoft/azure/sensors/wasb.py | 8 ++++----
.../providers/microsoft/azure/transfers/local_to_adls.py | 1 +
.../azure/transfers/oracle_to_azure_data_lake.py | 4 +---
airflow/providers/microsoft/azure/triggers/wasb.py | 8 +++++---
airflow/providers/microsoft/azure/utils.py | 2 ++
airflow/providers/microsoft/mssql/hooks/mssql.py | 16 +++++++++-------
airflow/providers/microsoft/psrp/hooks/psrp.py | 5 +++--
airflow/providers/mongo/hooks/mongo.py | 1 +
airflow/providers/mysql/transfers/presto_to_mysql.py | 7 ++++---
airflow/providers/mysql/transfers/trino_to_mysql.py | 7 ++++---
airflow/providers/neo4j/hooks/neo4j.py | 8 ++------
airflow/providers/openlineage/extractors/bash.py | 6 ++++--
airflow/providers/openlineage/extractors/python.py | 2 ++
airflow/providers/openlineage/plugins/adapter.py | 5 +----
airflow/providers/openlineage/plugins/listener.py | 5 +----
airflow/providers/openlineage/plugins/macros.py | 11 ++++++-----
airflow/providers/openlineage/plugins/openlineage.py | 7 +++++--
airflow/providers/openlineage/sqlparser.py | 3 ++-
airflow/providers/openlineage/utils/utils.py | 5 ++++-
airflow/providers/opsgenie/hooks/opsgenie.py | 1 +
airflow/providers/opsgenie/operators/opsgenie.py | 13 +++++++++----
37 files changed, 115 insertions(+), 99 deletions(-)
diff --git a/airflow/providers/hashicorp/_internal_client/vault_client.py
b/airflow/providers/hashicorp/_internal_client/vault_client.py
index 3ec4384829..aea8bfb01d 100644
--- a/airflow/providers/hashicorp/_internal_client/vault_client.py
+++ b/airflow/providers/hashicorp/_internal_client/vault_client.py
@@ -48,10 +48,12 @@ VALID_AUTH_TYPES: list[str] = [
class _VaultClient(LoggingMixin):
"""
- Retrieves Authenticated client from Hashicorp Vault. This is purely
internal class promoting
- authentication code reuse between the Hook and the SecretBackend, it
should not be used directly in
- Airflow DAGs. Use VaultBackend for backend integration and Hook in case
you want to communicate
- with VaultHook using standard Airflow Connection definition.
+ Retrieves Authenticated client from Hashicorp Vault.
+
+ This is purely internal class promoting authentication code reuse between
the Hook and the
+ SecretBackend, it should not be used directly in Airflow DAGs. Use
VaultBackend for backend
+ integration and Hook in case you want to communicate with VaultHook using
standard Airflow
+ Connection definition.
:param url: Base URL for the Vault instance being addressed.
:param auth_type: Authentication Type for Vault. Default is ``token``.
Available values are in
@@ -172,12 +174,9 @@ class _VaultClient(LoggingMixin):
@property
def client(self):
"""
- Authentication to Vault can expire. This wrapper function checks that
- it is still authenticated to Vault, and invalidates the cache if this
- is not the case.
+ Checks that it is still authenticated to Vault and invalidates the
cache if this is not the case.
:return: Vault Client
-
"""
if not self._client.is_authenticated():
# Invalidate the cache:
diff --git a/airflow/providers/hashicorp/secrets/vault.py
b/airflow/providers/hashicorp/secrets/vault.py
index 661707d9b8..dd93aeb6c8 100644
--- a/airflow/providers/hashicorp/secrets/vault.py
+++ b/airflow/providers/hashicorp/secrets/vault.py
@@ -209,8 +209,9 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
def get_connection(self, conn_id: str) -> Connection | None:
"""
- Get connection from Vault as secret. Prioritize conn_uri if exists,
- if not fall back to normal Connection creation.
+ Get connection from Vault as secret.
+
+ Prioritize conn_uri if exists, if not fall back to normal Connection
creation.
:return: A Connection object constructed from Vault data
"""
diff --git a/airflow/providers/http/sensors/http.py
b/airflow/providers/http/sensors/http.py
index f691fd0a69..302ea98ef2 100644
--- a/airflow/providers/http/sensors/http.py
+++ b/airflow/providers/http/sensors/http.py
@@ -29,8 +29,7 @@ if TYPE_CHECKING:
class HttpSensor(BaseSensorOperator):
"""
- Executes a HTTP GET statement and returns False on failure caused by
- 404 Not Found or `response_check` returning False.
+ Execute HTTP GET statement; return False on failure 404 Not Found or
`response_check` returning False.
HTTP Error codes other than 404 (like 403) or Connection Refused Error
would raise an exception and fail the sensor itself directly (no more
poking).
diff --git a/airflow/providers/imap/hooks/imap.py
b/airflow/providers/imap/hooks/imap.py
index 7368eedfdb..4a00e6965a 100644
--- a/airflow/providers/imap/hooks/imap.py
+++ b/airflow/providers/imap/hooks/imap.py
@@ -16,8 +16,8 @@
# specific language governing permissions and limitations
# under the License.
"""
-This module provides everything to be able to search in mails for a specific
attachment
-and also to download it.
+This module provides everything to search mail for a specific attachment and
download it.
+
It uses the imaplib library that is already integrated in python 3.
"""
from __future__ import annotations
diff --git a/airflow/providers/jenkins/hooks/jenkins.py
b/airflow/providers/jenkins/hooks/jenkins.py
index e9024b6a32..3a91fd151b 100644
--- a/airflow/providers/jenkins/hooks/jenkins.py
+++ b/airflow/providers/jenkins/hooks/jenkins.py
@@ -27,8 +27,10 @@ from airflow.hooks.base import BaseHook
def _ensure_prefixes(conn_type):
"""
- Remove when provider min airflow version >= 2.5.0 since this is handled by
- provider manager from that version.
+ Deprecated.
+
+ Remove when provider min airflow version >= 2.5.0 since
+ this is handled by provider manager from that version.
"""
def dec(func):
diff --git a/airflow/providers/jenkins/operators/jenkins_job_trigger.py
b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
index 5b00007748..006c199f59 100644
--- a/airflow/providers/jenkins/operators/jenkins_job_trigger.py
+++ b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
@@ -21,7 +21,7 @@ import ast
import json
import socket
import time
-from typing import Any, Iterable, List, Mapping, Sequence, Union
+from typing import Any, Iterable, Mapping, Sequence, Union
from urllib.error import HTTPError, URLError
import jenkins
@@ -33,7 +33,7 @@ from airflow.models import BaseOperator
from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
JenkinsRequest = Mapping[str, Any]
-ParamType = Union[str, dict, List, None]
+ParamType = Union[str, dict, list, None]
def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) ->
JenkinsRequest | None:
diff --git a/airflow/providers/microsoft/azure/hooks/adx.py
b/airflow/providers/microsoft/azure/hooks/adx.py
index 96518b7e0a..e3cda1d08f 100644
--- a/airflow/providers/microsoft/azure/hooks/adx.py
+++ b/airflow/providers/microsoft/azure/hooks/adx.py
@@ -186,8 +186,9 @@ class AzureDataExplorerHook(BaseHook):
def run_query(self, query: str, database: str, options: dict | None =
None) -> KustoResponseDataSetV2:
"""
- Run KQL query using provided configuration, and return
- `azure.kusto.data.response.KustoResponseDataSet` instance.
+ Run KQL query using provided configuration, and return
KustoResponseDataSet instance.
+
+ See: `azure.kusto.data.response.KustoResponseDataSet`
If query is unsuccessful AirflowException is raised.
:param query: KQL query to run
diff --git a/airflow/providers/microsoft/azure/hooks/base_azure.py
b/airflow/providers/microsoft/azure/hooks/base_azure.py
index 4190a2bf03..2a4d250e1c 100644
--- a/airflow/providers/microsoft/azure/hooks/base_azure.py
+++ b/airflow/providers/microsoft/azure/hooks/base_azure.py
@@ -27,8 +27,10 @@ from airflow.hooks.base import BaseHook
class AzureBaseHook(BaseHook):
"""
- This hook acts as a base hook for azure services. It offers several
authentication mechanisms to
- authenticate the client library used for upstream azure hooks.
+ This hook acts as a base hook for azure services.
+
+ It offers several authentication mechanisms to authenticate
+ the client library used for upstream azure hooks.
:param sdk_client: The SDKClient to use.
:param conn_id: The :ref:`Azure connection id<howto/connection:azure>`
diff --git a/airflow/providers/microsoft/azure/hooks/cosmos.py
b/airflow/providers/microsoft/azure/hooks/cosmos.py
index 238abc4c70..45b1b0dab2 100644
--- a/airflow/providers/microsoft/azure/hooks/cosmos.py
+++ b/airflow/providers/microsoft/azure/hooks/cosmos.py
@@ -237,10 +237,7 @@ class AzureCosmosDBHook(BaseHook):
)
def upsert_document(self, document, database_name=None,
collection_name=None, document_id=None):
- """
- Inserts a new document (or updates an existing one) into an existing
- collection in the CosmosDB database.
- """
+ """Insert or update a document into an existing collection in the
CosmosDB database."""
# Assign unique ID if one isn't provided
if document_id is None:
document_id = str(uuid.uuid4())
diff --git a/airflow/providers/microsoft/azure/hooks/fileshare.py
b/airflow/providers/microsoft/azure/hooks/fileshare.py
index 8067907706..0244b7ce72 100644
--- a/airflow/providers/microsoft/azure/hooks/fileshare.py
+++ b/airflow/providers/microsoft/azure/hooks/fileshare.py
@@ -28,6 +28,8 @@ from airflow.hooks.base import BaseHook
def _ensure_prefixes(conn_type):
"""
+ Deprecated.
+
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""
diff --git a/airflow/providers/microsoft/azure/hooks/wasb.py
b/airflow/providers/microsoft/azure/hooks/wasb.py
index ad9c4754c5..794da8dd85 100644
--- a/airflow/providers/microsoft/azure/hooks/wasb.py
+++ b/airflow/providers/microsoft/azure/hooks/wasb.py
@@ -53,6 +53,8 @@ AsyncCredentials = Union[AsyncClientSecretCredential,
AsyncDefaultAzureCredentia
def _ensure_prefixes(conn_type):
"""
+ Deprecated.
+
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""
diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index 51c0c4dc18..96c87219ca 100644
--- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -45,9 +45,9 @@ def get_default_delete_local_copy():
class WasbTaskHandler(FileTaskHandler, LoggingMixin):
"""
- WasbTaskHandler is a python log handler that handles and reads
- task instance logs. It extends airflow FileTaskHandler and
- uploads to and reads from Wasb remote storage.
+ WasbTaskHandler is a python log handler that handles and reads task
instance logs.
+
+ It extends airflow FileTaskHandler and uploads to and reads from Wasb
remote storage.
"""
trigger_should_wrap = True
@@ -171,6 +171,7 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
) -> tuple[str, dict[str, bool]]:
"""
Read logs of given task instance and try_number from Wasb remote
storage.
+
If failed, read the log from task instance host machine.
todo: when min airflow version >= 2.6, remove this method
@@ -207,8 +208,7 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
def wasb_read(self, remote_log_location: str, return_error: bool = False):
"""
- Returns the log found at the remote_log_location. Returns '' if no
- logs are found or there is an error.
+ Return the log found at the remote_log_location. Returns '' if no logs
are found or there is an error.
:param remote_log_location: the log's location in remote storage
:param return_error: if True, returns a string error message if an
@@ -226,8 +226,7 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
def wasb_write(self, log: str, remote_log_location: str, append: bool =
True) -> bool:
"""
- Writes the log to the remote_log_location. Fails silently if no hook
- was created.
+ Writes the log to the remote_log_location. Fails silently if no hook
was created.
:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
diff --git a/airflow/providers/microsoft/azure/operators/asb.py
b/airflow/providers/microsoft/azure/operators/asb.py
index 333fc82d2b..d9a460b77d 100644
--- a/airflow/providers/microsoft/azure/operators/asb.py
+++ b/airflow/providers/microsoft/azure/operators/asb.py
@@ -155,10 +155,7 @@ class AzureServiceBusReceiveMessageOperator(BaseOperator):
self.max_wait_time = max_wait_time
def execute(self, context: Context) -> None:
- """
- Receive Message in specific queue in Service Bus namespace,
- by connecting to Service Bus client.
- """
+ """Receive Message in specific queue in Service Bus namespace by
connecting to Service Bus client."""
# Create the hook
hook =
MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
@@ -527,10 +524,7 @@ class ASBReceiveSubscriptionMessageOperator(BaseOperator):
self.azure_service_bus_conn_id = azure_service_bus_conn_id
def execute(self, context: Context) -> None:
- """
- Receive Message in specific queue in Service Bus namespace,
- by connecting to Service Bus client.
- """
+ """Receive Message in specific queue in Service Bus namespace by
connecting to Service Bus client."""
# Create the hook
hook =
MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
diff --git a/airflow/providers/microsoft/azure/operators/data_factory.py
b/airflow/providers/microsoft/azure/operators/data_factory.py
index a2b2c528bf..1823212473 100644
--- a/airflow/providers/microsoft/azure/operators/data_factory.py
+++ b/airflow/providers/microsoft/azure/operators/data_factory.py
@@ -233,8 +233,8 @@ class AzureDataFactoryRunPipelineOperator(BaseOperator):
def execute_complete(self, context: Context, event: dict[str, str]) ->
None:
"""
Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
+
+ Relies on trigger to throw an exception, otherwise it assumes
execution was successful.
"""
if event:
if event["status"] == "error":
diff --git a/airflow/providers/microsoft/azure/secrets/key_vault.py
b/airflow/providers/microsoft/azure/secrets/key_vault.py
index df696b0582..177da5d235 100644
--- a/airflow/providers/microsoft/azure/secrets/key_vault.py
+++ b/airflow/providers/microsoft/azure/secrets/key_vault.py
@@ -162,6 +162,7 @@ class AzureKeyVaultBackend(BaseSecretsBackend,
LoggingMixin):
def build_path(path_prefix: str, secret_id: str, sep: str = "-") -> str:
"""
Given a path_prefix and secret_id, build a valid secret name for the
Azure Key Vault Backend.
+
Also replaces underscore in the path with dashes to support easy
switching between
environment variables, so ``connection_default`` becomes
``connection-default``.
diff --git a/airflow/providers/microsoft/azure/sensors/data_factory.py
b/airflow/providers/microsoft/azure/sensors/data_factory.py
index b4ebedce69..f0651494bf 100644
--- a/airflow/providers/microsoft/azure/sensors/data_factory.py
+++ b/airflow/providers/microsoft/azure/sensors/data_factory.py
@@ -113,8 +113,8 @@ class
AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
def execute_complete(self, context: Context, event: dict[str, str]) ->
None:
"""
Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
+
+ Relies on trigger to throw an exception, otherwise it assumes
execution was successful.
"""
if event:
if event["status"] == "error":
diff --git a/airflow/providers/microsoft/azure/sensors/wasb.py
b/airflow/providers/microsoft/azure/sensors/wasb.py
index 0c227f2ea3..18ac2331e0 100644
--- a/airflow/providers/microsoft/azure/sensors/wasb.py
+++ b/airflow/providers/microsoft/azure/sensors/wasb.py
@@ -97,8 +97,8 @@ class WasbBlobSensor(BaseSensorOperator):
def execute_complete(self, context: Context, event: dict[str, str]) ->
None:
"""
Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
+
+ Relies on trigger to throw an exception, otherwise it assumes
execution was successful.
"""
if event:
if event["status"] == "error":
@@ -193,8 +193,8 @@ class WasbPrefixSensor(BaseSensorOperator):
def execute_complete(self, context: Context, event: dict[str, str]) ->
None:
"""
Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
+
+ Relies on trigger to throw an exception, otherwise it assumes
execution was successful.
"""
if event:
if event["status"] == "error":
diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py
b/airflow/providers/microsoft/azure/transfers/local_to_adls.py
index f39f837c98..7eee5009af 100644
--- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py
+++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py
@@ -102,6 +102,7 @@ class LocalFilesystemToADLSOperator(BaseOperator):
class LocalToAzureDataLakeStorageOperator(LocalFilesystemToADLSOperator):
"""
This class is deprecated.
+
Please use
`airflow.providers.microsoft.azure.transfers.local_to_adls.LocalFilesystemToADLSOperator`.
"""
diff --git
a/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
b/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
index 93e8a7b14d..f891e080a4 100644
--- a/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
+++ b/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
@@ -32,9 +32,7 @@ if TYPE_CHECKING:
class OracleToAzureDataLakeOperator(BaseOperator):
"""
- Moves data from Oracle to Azure Data Lake. The operator runs the query
against
- Oracle and stores the file locally before loading it into Azure Data Lake.
-
+ Runs the query against Oracle and stores the file locally before loading
it into Azure Data Lake.
:param filename: file name to be used by the csv file.
:param azure_data_lake_conn_id: destination azure data lake connection.
diff --git a/airflow/providers/microsoft/azure/triggers/wasb.py
b/airflow/providers/microsoft/azure/triggers/wasb.py
index 0bc75f065f..944c7ddae1 100644
--- a/airflow/providers/microsoft/azure/triggers/wasb.py
+++ b/airflow/providers/microsoft/azure/triggers/wasb.py
@@ -25,8 +25,9 @@ from airflow.triggers.base import BaseTrigger, TriggerEvent
class WasbBlobSensorTrigger(BaseTrigger):
"""
- WasbBlobSensorTrigger is fired as deferred class with params to run the
task in
- trigger worker to check for existence of the given blob in the provided
container.
+ Checks for existence of the given blob in the provided container.
+
+ WasbBlobSensorTrigger is fired as deferred class with params to run the
task in trigger worker.
:param container_name: name of the container in which the blob should be
searched for
:param blob_name: name of the blob to check existence for
@@ -90,8 +91,9 @@ class WasbBlobSensorTrigger(BaseTrigger):
class WasbPrefixSensorTrigger(BaseTrigger):
"""
+ Checks for the existence of a blob with the given prefix in the provided
container.
+
WasbPrefixSensorTrigger is fired as a deferred class with params to run
the task in trigger.
- It checks for the existence of a blob with the given prefix in the
provided container.
:param container_name: name of the container in which the blob should be
searched for
:param prefix: prefix of the blob to check existence for
diff --git a/airflow/providers/microsoft/azure/utils.py
b/airflow/providers/microsoft/azure/utils.py
index 8c01100469..e4161a1b12 100644
--- a/airflow/providers/microsoft/azure/utils.py
+++ b/airflow/providers/microsoft/azure/utils.py
@@ -23,6 +23,8 @@ from functools import wraps
def _ensure_prefixes(conn_type):
"""
+ Deprecated.
+
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""
diff --git a/airflow/providers/microsoft/mssql/hooks/mssql.py
b/airflow/providers/microsoft/mssql/hooks/mssql.py
index 01e1c9e47b..7b21e77c7f 100644
--- a/airflow/providers/microsoft/mssql/hooks/mssql.py
+++ b/airflow/providers/microsoft/mssql/hooks/mssql.py
@@ -26,7 +26,14 @@ from airflow.providers.common.sql.hooks.sql import DbApiHook
class MsSqlHook(DbApiHook):
- """Interact with Microsoft SQL Server."""
+ """
+ Interact with Microsoft SQL Server.
+
+ :param args: passed to DBApiHook
+ :param sqlalchemy_scheme: Scheme sqlalchemy connection. Default is
``mssql+pymssql`` Only used for
+ ``get_sqlalchemy_engine`` and ``get_sqlalchemy_connection`` methods.
+ :param kwargs: passed to DbApiHook
+ """
conn_name_attr = "mssql_conn_id"
default_conn_name = "mssql_default"
@@ -41,12 +48,6 @@ class MsSqlHook(DbApiHook):
sqlalchemy_scheme: str | None = None,
**kwargs,
) -> None:
- """
- :param args: passed to DBApiHook
- :param sqlalchemy_scheme: Scheme sqlalchemy connection. Default is
``mssql+pymssql`` Only used for
- ``get_sqlalchemy_engine`` and ``get_sqlalchemy_connection`` methods.
- :param kwargs: passed to DbApiHook
- """
super().__init__(*args, **kwargs)
self.schema = kwargs.pop("schema", None)
self._sqlalchemy_scheme = sqlalchemy_scheme
@@ -55,6 +56,7 @@ class MsSqlHook(DbApiHook):
def connection_extra_lower(self) -> dict:
"""
``connection.extra_dejson`` but where keys are converted to lower case.
+
This is used internally for case-insensitive access of mssql params.
"""
conn = self.get_connection(self.mssql_conn_id) # type:
ignore[attr-defined]
diff --git a/airflow/providers/microsoft/psrp/hooks/psrp.py
b/airflow/providers/microsoft/psrp/hooks/psrp.py
index fe7f4c271f..06e3eb93a7 100644
--- a/airflow/providers/microsoft/psrp/hooks/psrp.py
+++ b/airflow/providers/microsoft/psrp/hooks/psrp.py
@@ -158,8 +158,9 @@ class PsrpHook(BaseHook):
@contextmanager
def invoke(self) -> Generator[PowerShell, None, None]:
"""
- Context manager that yields a PowerShell object to which commands can
be
- added. Upon exit, the commands will be invoked.
+ Yields a PowerShell object to which commands can be added.
+
+ Upon exit, the commands will be invoked.
"""
logger = copy(self.log)
logger.setLevel(self._logging_level)
diff --git a/airflow/providers/mongo/hooks/mongo.py
b/airflow/providers/mongo/hooks/mongo.py
index b49a9365af..cfa34e0e75 100644
--- a/airflow/providers/mongo/hooks/mongo.py
+++ b/airflow/providers/mongo/hooks/mongo.py
@@ -98,6 +98,7 @@ class MongoHook(BaseHook):
def _create_uri(self) -> str:
"""
Create URI string from the given credentials.
+
:return: URI string.
"""
srv = self.extras.pop("srv", False)
diff --git a/airflow/providers/mysql/transfers/presto_to_mysql.py
b/airflow/providers/mysql/transfers/presto_to_mysql.py
index b38e6b8654..0d80b58bff 100644
--- a/airflow/providers/mysql/transfers/presto_to_mysql.py
+++ b/airflow/providers/mysql/transfers/presto_to_mysql.py
@@ -29,9 +29,10 @@ if TYPE_CHECKING:
class PrestoToMySqlOperator(BaseOperator):
"""
- Moves data from Presto to MySQL, note that for now the data is loaded
- into memory before being pushed to MySQL, so this operator should
- be used for smallish amount of data.
+ Moves data from Presto to MySQL.
+
+ Note that for now the data is loaded into memory before being pushed
+ to MySQL, so this operator should be used for smallish amount of data.
:param sql: SQL query to execute against Presto. (templated)
:param mysql_table: target MySQL table, use dot notation to target a
diff --git a/airflow/providers/mysql/transfers/trino_to_mysql.py
b/airflow/providers/mysql/transfers/trino_to_mysql.py
index 8ff5ed0446..e96aaafd5b 100644
--- a/airflow/providers/mysql/transfers/trino_to_mysql.py
+++ b/airflow/providers/mysql/transfers/trino_to_mysql.py
@@ -29,9 +29,10 @@ if TYPE_CHECKING:
class TrinoToMySqlOperator(BaseOperator):
"""
- Moves data from Trino to MySQL, note that for now the data is loaded
- into memory before being pushed to MySQL, so this operator should
- be used for smallish amount of data.
+ Moves data from Trino to MySQL.
+
+ Note that for now the data is loaded into memory before being pushed
+ to MySQL, so this operator should be used for smallish amount of data.
:param sql: SQL query to execute against Trino. (templated)
:param mysql_table: target MySQL table, use dot notation to target a
diff --git a/airflow/providers/neo4j/hooks/neo4j.py
b/airflow/providers/neo4j/hooks/neo4j.py
index ed1df254a8..137b1e00c0 100644
--- a/airflow/providers/neo4j/hooks/neo4j.py
+++ b/airflow/providers/neo4j/hooks/neo4j.py
@@ -48,10 +48,7 @@ class Neo4jHook(BaseHook):
self.client: Driver | None = None
def get_conn(self) -> Driver:
- """
- Function that initiates a new Neo4j connection
- with username, password and database schema.
- """
+ """Function that initiates a new Neo4j connection with username,
password and database schema."""
if self.client is not None:
return self.client
@@ -112,8 +109,7 @@ class Neo4jHook(BaseHook):
def run(self, query) -> list[Any]:
"""
- Function to create a neo4j session
- and execute the query in the session.
+ Function to create a neo4j session and execute the query in the
session.
:param query: Neo4j query
:return: Result
diff --git a/airflow/providers/openlineage/extractors/bash.py
b/airflow/providers/openlineage/extractors/bash.py
index 5a2bc92f65..9d7c40b114 100644
--- a/airflow/providers/openlineage/extractors/bash.py
+++ b/airflow/providers/openlineage/extractors/bash.py
@@ -32,9 +32,11 @@ from openlineage.client.facet import SourceCodeJobFacet
class BashExtractor(BaseExtractor):
"""
+ Extract executed bash command and put it into SourceCodeJobFacet.
+
This extractor provides visibility on what bash task does by extracting
- executed bash command and putting it into SourceCodeJobFacet. It does not
extract
- datasets.
+ executed bash command and putting it into SourceCodeJobFacet. It does
+ not extract datasets.
:meta private:
"""
diff --git a/airflow/providers/openlineage/extractors/python.py
b/airflow/providers/openlineage/extractors/python.py
index 0c5720ac86..50d84014fd 100644
--- a/airflow/providers/openlineage/extractors/python.py
+++ b/airflow/providers/openlineage/extractors/python.py
@@ -35,6 +35,8 @@ from openlineage.client.facet import SourceCodeJobFacet
class PythonExtractor(BaseExtractor):
"""
+ Extract executed source code and put it into SourceCodeJobFacet.
+
This extractor provides visibility on what particular task does by
extracting
executed source code and putting it into SourceCodeJobFacet. It does not
extract
datasets yet.
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index aecd0a6531..0e530e5c53 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -58,10 +58,7 @@ set_producer(_PRODUCER)
class OpenLineageAdapter(LoggingMixin):
- """
- Adapter for translating Airflow metadata to OpenLineage events,
- instead of directly creating them from Airflow code.
- """
+ """Translate Airflow metadata to OpenLineage events instead of creating
them from Airflow code."""
def __init__(self, client: OpenLineageClient | None = None,
secrets_masker: SecretsMasker | None = None):
super().__init__()
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 9b57bf919f..99394863f5 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -38,10 +38,7 @@ if TYPE_CHECKING:
class OpenLineageListener:
- """
- OpenLineage listener
- Sends events on task instance and dag run starts, completes and failures.
- """
+ """OpenLineage listener sends events on task instance and dag run starts,
completes and failures."""
def __init__(self):
self.log = logging.getLogger(__name__)
diff --git a/airflow/providers/openlineage/plugins/macros.py
b/airflow/providers/openlineage/plugins/macros.py
index 19cbb59f29..61af81a1eb 100644
--- a/airflow/providers/openlineage/plugins/macros.py
+++ b/airflow/providers/openlineage/plugins/macros.py
@@ -30,9 +30,9 @@ _JOB_NAMESPACE = conf.get("openlineage", "namespace",
fallback=os.getenv("OPENLI
def lineage_run_id(task_instance: TaskInstance):
"""
- Macro function which returns the generated run id for a given task. This
- can be used to forward the run id from a task to a child run so the job
- hierarchy is preserved.
+ Macro function which returns the generated run id for a given task.
+
+ This can be used to forward the run id from a task to a child run so the
job hierarchy is preserved.
.. seealso::
For more information on how to use this operator, take a look at the
guide:
@@ -45,8 +45,9 @@ def lineage_run_id(task_instance: TaskInstance):
def lineage_parent_id(run_id: str, task_instance: TaskInstance):
"""
- Macro function which returns the generated job and run id for a given
task. This
- can be used to forward the ids from a task to a child run so the job
+ Macro function which returns the generated job and run id for a given task.
+
+ This can be used to forward the ids from a task to a child run so the job
hierarchy is preserved. Child run can create ParentRunFacet from those ids.
.. seealso::
diff --git a/airflow/providers/openlineage/plugins/openlineage.py
b/airflow/providers/openlineage/plugins/openlineage.py
index 9aade083b4..2ec0801147 100644
--- a/airflow/providers/openlineage/plugins/openlineage.py
+++ b/airflow/providers/openlineage/plugins/openlineage.py
@@ -31,8 +31,11 @@ def _is_disabled() -> bool:
class OpenLineageProviderPlugin(AirflowPlugin):
- """OpenLineage Plugin provides listener that emits OL events on DAG start,
complete and failure
- and TaskInstances start, complete and failure.
+ """
+ Listener that emits numerous Events.
+
+ OpenLineage Plugin provides listener that emits OL events on DAG start,
+ complete and failure and TaskInstances start, complete and failure.
"""
name = "OpenLineageProviderPlugin"
diff --git a/airflow/providers/openlineage/sqlparser.py
b/airflow/providers/openlineage/sqlparser.py
index ed3e92e58b..8e85d3706c 100644
--- a/airflow/providers/openlineage/sqlparser.py
+++ b/airflow/providers/openlineage/sqlparser.py
@@ -207,7 +207,8 @@ class SQLParser:
@classmethod
def split_sql_string(cls, sql: list[str] | str) -> list[str]:
"""
- Split SQL string into list of statements
+ Split SQL string into list of statements.
+
Tries to use `DbApiHook.split_sql_string` if available.
Otherwise, uses the same logic.
"""
diff --git a/airflow/providers/openlineage/utils/utils.py
b/airflow/providers/openlineage/utils/utils.py
index 84ad41e237..9d1cab8eca 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -101,6 +101,7 @@ def url_to_https(url) -> str | None:
def redacted_connection_uri(conn: Connection, filtered_params=None,
filtered_prefixes=None):
"""
Return the connection URI for the given Connection.
+
This method additionally filters URI by removing query parameters that are
known to carry sensitive data
like username, password, access key.
"""
@@ -324,7 +325,9 @@ def get_airflow_run_facet(
class OpenLineageRedactor(SecretsMasker):
- """This class redacts sensitive data similar to SecretsMasker in Airflow
logs.
+ """
+ This class redacts sensitive data similar to SecretsMasker in Airflow logs.
+
The difference is that our default max recursion depth is way higher - due
to
the structure of OL events we need more depth.
Additionally, we allow data structures to specify data that needs not to be
diff --git a/airflow/providers/opsgenie/hooks/opsgenie.py
b/airflow/providers/opsgenie/hooks/opsgenie.py
index 7c28b2a76e..239f67a7e2 100644
--- a/airflow/providers/opsgenie/hooks/opsgenie.py
+++ b/airflow/providers/opsgenie/hooks/opsgenie.py
@@ -33,6 +33,7 @@ from airflow.hooks.base import BaseHook
class OpsgenieAlertHook(BaseHook):
"""
This hook allows you to post alerts to Opsgenie.
+
Accepts a connection that has an Opsgenie API key as the connection's
password.
This hook sets the domain to conn_id.host, and if not set will default
to ``https://api.opsgenie.com``.
diff --git a/airflow/providers/opsgenie/operators/opsgenie.py
b/airflow/providers/opsgenie/operators/opsgenie.py
index a4777a8eb7..d12db20873 100644
--- a/airflow/providers/opsgenie/operators/opsgenie.py
+++ b/airflow/providers/opsgenie/operators/opsgenie.py
@@ -29,6 +29,7 @@ if TYPE_CHECKING:
class OpsgenieCreateAlertOperator(BaseOperator):
"""
This operator allows you to post alerts to Opsgenie.
+
Accepts a connection that has an Opsgenie API key as the connection's
password.
This operator sets the domain to conn_id.host, and if not set will default
to ``https://api.opsgenie.com``.
@@ -101,8 +102,9 @@ class OpsgenieCreateAlertOperator(BaseOperator):
def _build_opsgenie_payload(self) -> dict[str, Any]:
"""
- Construct the Opsgenie JSON payload. All relevant parameters are
combined here
- to a valid Opsgenie JSON payload.
+ Construct the Opsgenie JSON payload.
+
+ All relevant parameters are combined here to a valid Opsgenie JSON
payload.
:return: Opsgenie payload (dict) to send
"""
@@ -137,6 +139,7 @@ class OpsgenieCreateAlertOperator(BaseOperator):
class OpsgenieCloseAlertOperator(BaseOperator):
"""
This operator allows you to close alerts to Opsgenie.
+
Accepts a connection that has an Opsgenie API key as the connection's
password.
This operator sets the domain to conn_id.host, and if not set will default
to ``https://api.opsgenie.com``.
@@ -183,8 +186,9 @@ class OpsgenieCloseAlertOperator(BaseOperator):
def _build_opsgenie_close_alert_payload(self) -> dict[str, Any]:
"""
- Construct the Opsgenie JSON payload. All relevant parameters are
combined here
- to a valid Opsgenie JSON payload.
+ Construct the Opsgenie JSON payload.
+
+ All relevant parameters are combined here to a valid Opsgenie JSON
payload.
:return: Opsgenie close alert payload (dict) to send
"""
@@ -214,6 +218,7 @@ class OpsgenieCloseAlertOperator(BaseOperator):
class OpsgenieDeleteAlertOperator(BaseOperator):
"""
This operator allows you to delete alerts in Opsgenie.
+
Accepts a connection that has an Opsgenie API key as the connection's
password.
This operator sets the domain to conn_id.host, and if not set will default
to ``https://api.opsgenie.com``.