This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7e79997594 D205 Support - Providers - Final Pass (#33303)
7e79997594 is described below
commit 7e799975948573ca2a1c4b2051d3eadc32bb8ba7
Author: D. Ferruzzi <[email protected]>
AuthorDate: Thu Aug 10 23:34:23 2023 -0700
D205 Support - Providers - Final Pass (#33303)
---
airflow/providers/apache/beam/hooks/beam.py | 2 ++
airflow/providers/apache/beam/operators/beam.py | 1 +
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py | 1 +
airflow/providers/cncf/kubernetes/utils/k8s_hashlib_wrapper.py | 3 +--
airflow/providers/elasticsearch/log/es_response.py | 1 +
airflow/providers/elasticsearch/log/es_task_handler.py | 5 +++--
airflow/providers/ftp/operators/ftp.py | 4 +++-
airflow/providers/google/cloud/transfers/gcs_to_gcs.py | 1 +
airflow/providers/google/cloud/transfers/s3_to_gcs.py | 1 +
airflow/providers/redis/log/redis_task_handler.py | 1 +
airflow/providers/sftp/operators/sftp.py | 4 +++-
11 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/apache/beam/hooks/beam.py
b/airflow/providers/apache/beam/hooks/beam.py
index d92ade2050..f538152330 100644
--- a/airflow/providers/apache/beam/hooks/beam.py
+++ b/airflow/providers/apache/beam/hooks/beam.py
@@ -388,6 +388,7 @@ class BeamHook(BaseHook):
class BeamAsyncHook(BeamHook):
"""
Asynchronous hook for Apache Beam.
+
:param runner: Runner type.
"""
@@ -411,6 +412,7 @@ class BeamAsyncHook(BeamHook):
async def _cleanup_tmp_dir(tmp_dir: str) -> None:
"""
Helper method to delete temporary directory after finishing work with
it.
+
Is uses `rmtree` method to recursively remove the temporary directory.
"""
shutil.rmtree(tmp_dir)
diff --git a/airflow/providers/apache/beam/operators/beam.py
b/airflow/providers/apache/beam/operators/beam.py
index c39ab916f7..4cec4ba31a 100644
--- a/airflow/providers/apache/beam/operators/beam.py
+++ b/airflow/providers/apache/beam/operators/beam.py
@@ -402,6 +402,7 @@ class
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
def execute_complete(self, context: Context, event: dict[str, Any]):
"""
Callback for when the trigger fires - returns immediately.
+
Relies on trigger to throw an exception, otherwise it assumes
execution was
successful.
"""
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index a364bf677c..f7f2bd68d7 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -182,6 +182,7 @@ class KubernetesExecutor(BaseExecutor):
def _make_safe_label_value(self, input_value: str | datetime) -> str:
"""
Normalize a provided label to be of valid length and characters.
+
See
airflow.providers.cncf.kubernetes.pod_generator.make_safe_label_value for more
details.
"""
# airflow.providers.cncf.kubernetes is an expensive import, locally
import it here to
diff --git a/airflow/providers/cncf/kubernetes/utils/k8s_hashlib_wrapper.py
b/airflow/providers/cncf/kubernetes/utils/k8s_hashlib_wrapper.py
index 72a1f8dac6..3cb52b0814 100644
--- a/airflow/providers/cncf/kubernetes/utils/k8s_hashlib_wrapper.py
+++ b/airflow/providers/cncf/kubernetes/utils/k8s_hashlib_wrapper.py
@@ -31,8 +31,7 @@ from airflow import PY39
def md5(__string: ReadableBuffer = b"") -> hashlib._Hash:
"""
- Safely allows calling the ``hashlib.md5`` function when
``usedforsecurity`` is disabled in
- the configuration.
+ Safely allows calling the ``hashlib.md5`` function when
``usedforsecurity`` is disabled in configuration.
:param __string: The data to hash. Default to empty str byte.
:return: The hashed value.
diff --git a/airflow/providers/elasticsearch/log/es_response.py
b/airflow/providers/elasticsearch/log/es_response.py
index 9a13847a82..e39496deff 100644
--- a/airflow/providers/elasticsearch/log/es_response.py
+++ b/airflow/providers/elasticsearch/log/es_response.py
@@ -66,6 +66,7 @@ class AttributeDict:
class Hit(AttributeDict):
"""
The Hit class is used to manage and access elements in a document.
+
It inherits from the AttributeDict class and provides
attribute-like access to its elements, similar to a dictionary.
"""
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py
b/airflow/providers/elasticsearch/log/es_task_handler.py
index 97d46be8b6..7f6c5c8873 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -136,6 +136,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
def format_url(host: str) -> str:
"""
Formats the given host string to ensure it starts with 'http'.
+
Checks if the host string represents a valid URL.
:params host: The host string to format and check.
@@ -444,6 +445,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) ->
type[Hit]:
"""
Resolves nested hits from Elasticsearch by iteratively navigating the
`_nested` field.
+
The result is used to fetch the appropriate document class to handle
the hit.
This method can be used with nested Elasticsearch fields which are
structured
@@ -468,8 +470,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:
"""
- This method processes a hit (i.e., a result) from an Elasticsearch
response and transforms it into an
- appropriate class instance.
+ Process a hit (i.e., a result) from an Elasticsearch response and
transform it into a class instance.
The transformation depends on the contents of the hit. If the document
in hit contains a nested field,
the '_resolve_nested' method is used to determine the appropriate
class (based on the nested path).
diff --git a/airflow/providers/ftp/operators/ftp.py
b/airflow/providers/ftp/operators/ftp.py
index 45bccbea4c..4489839d8f 100644
--- a/airflow/providers/ftp/operators/ftp.py
+++ b/airflow/providers/ftp/operators/ftp.py
@@ -139,7 +139,9 @@ class FTPFileTransmitOperator(BaseOperator):
def get_openlineage_facets_on_start(self):
"""
- Returns OpenLineage datasets with following naming structure:
+ Returns OpenLineage datasets.
+
+ Dataset will have the following structure:
input: file://hostname/path
output file://<conn.host>:<conn.port>/path.
"""
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index d4c3bd679f..55849f8db2 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -556,6 +556,7 @@ class GCSToGCSOperator(BaseOperator):
def get_openlineage_facets_on_complete(self, task_instance):
"""
Implementing _on_complete because execute method does preprocessing on
internals.
+
This means we won't have to normalize self.source_object and
self.source_objects,
destination bucket and so on.
"""
diff --git a/airflow/providers/google/cloud/transfers/s3_to_gcs.py
b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
index 7484379043..2561e32e66 100644
--- a/airflow/providers/google/cloud/transfers/s3_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
@@ -334,6 +334,7 @@ class S3ToGCSOperator(S3ListOperator):
def execute_complete(self, context: Context, event: dict[str, Any]) ->
None:
"""
Callback for when the trigger fires - returns immediately.
+
Relies on trigger to throw an exception, otherwise it assumes
execution was
successful.
"""
diff --git a/airflow/providers/redis/log/redis_task_handler.py
b/airflow/providers/redis/log/redis_task_handler.py
index b2e4a8fc16..03e2767418 100644
--- a/airflow/providers/redis/log/redis_task_handler.py
+++ b/airflow/providers/redis/log/redis_task_handler.py
@@ -33,6 +33,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
class RedisTaskHandler(FileTaskHandler, LoggingMixin):
"""
RedisTaskHandler is a Python log handler that handles and reads task
instance logs.
+
It extends airflow FileTaskHandler and uploads to and reads from Redis.
:param base_log_folder:
diff --git a/airflow/providers/sftp/operators/sftp.py
b/airflow/providers/sftp/operators/sftp.py
index 8da4b3f332..95b3a8eeb9 100644
--- a/airflow/providers/sftp/operators/sftp.py
+++ b/airflow/providers/sftp/operators/sftp.py
@@ -194,7 +194,9 @@ class SFTPOperator(BaseOperator):
def get_openlineage_facets_on_start(self):
"""
- This returns OpenLineage datasets in format:
+ Returns OpenLineage datasets.
+
+ Dataset will have the following structure:
input: file://<local_host>/path
output: file://<remote_host>:<remote_port>/path.
"""